diff --git a/.changeset/tender-jobs-collect.md b/.changeset/tender-jobs-collect.md new file mode 100644 index 0000000000..829c628b6d --- /dev/null +++ b/.changeset/tender-jobs-collect.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +TriggerApiError 4xx errors will no longer cause tasks to be retried diff --git a/.vscode/launch.json b/.vscode/launch.json index e2659d148a..b82a2d5228 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -25,8 +25,8 @@ { "type": "node-terminal", "request": "launch", - "name": "Debug fairDequeuingStrategy.test.ts", - "command": "pnpm run test -t FairDequeuingStrategy", + "name": "Debug triggerTask.test.ts", + "command": "pnpm run test --run ./test/engine/triggerTask.test.ts", "envFile": "${workspaceFolder}/.env", "cwd": "${workspaceFolder}/apps/webapp", "sourceMaps": true diff --git a/apps/webapp/app/components/runs/v3/SpanEvents.tsx b/apps/webapp/app/components/runs/v3/SpanEvents.tsx index 679cf579ae..df99dc0ec1 100644 --- a/apps/webapp/app/components/runs/v3/SpanEvents.tsx +++ b/apps/webapp/app/components/runs/v3/SpanEvents.tsx @@ -83,7 +83,13 @@ export function SpanEventError({ time={spanEvent.time} titleClassName="text-rose-500" /> - {enhancedException.message && {enhancedException.message}} + {enhancedException.message && ( + + + {enhancedException.message} + + + )} {enhancedException.link && (enhancedException.link.magic === "CONTACT_FORM" ? ( {name} - {enhancedError.message && {enhancedError.message}} + {enhancedError.message && ( + + + {enhancedError.message} + + + )} {enhancedError.link && (enhancedError.link.magic === "CONTACT_FORM" ? ( { + const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey; + const idempotencyKeyExpiresAt = + request.options?.idempotencyKeyExpiresAt ?? + resolveIdempotencyKeyTTL(request.body.options?.idempotencyKeyTTL) ?? + new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days + + if (!idempotencyKey) { + return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; + } + + const existingRun = idempotencyKey + ? await this.prisma.taskRun.findFirst({ + where: { + runtimeEnvironmentId: request.environment.id, + idempotencyKey, + taskIdentifier: request.taskId, + }, + include: { + associatedWaitpoint: true, + }, + }) + : undefined; + + if (existingRun) { + if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) { + logger.debug("[TriggerTaskService][call] Idempotency key has expired", { + idempotencyKey: request.options?.idempotencyKey, + run: existingRun, + }); + + // Update the existing run to remove the idempotency key + await this.prisma.taskRun.updateMany({ + where: { id: existingRun.id, idempotencyKey }, + data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, + }); + } else { + const associatedWaitpoint = existingRun.associatedWaitpoint; + const parentRunId = request.body.options?.parentRunId; + const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion; + //We're using `andWait` so we need to block the parent run with a waitpoint + if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) { + await this.traceEventConcern.traceIdempotentRun( + request, + { + existingRun, + idempotencyKey, + incomplete: associatedWaitpoint.status === "PENDING", + isError: associatedWaitpoint.outputIsError, + }, + async (event) => { + //block run with waitpoint + await this.engine.blockRunWithWaitpoint({ + runId: RunId.fromFriendlyId(parentRunId), + waitpoints: associatedWaitpoint.id, + spanIdToComplete: event.spanId, + batch: request.options?.batchId + ? { + id: request.options.batchId, + index: request.options.batchIndex ?? 0, + } + : undefined, + projectId: request.environment.projectId, + organizationId: request.environment.organizationId, + tx: this.prisma, + releaseConcurrency: request.body.options?.releaseConcurrency, + }); + } + ); + } + + return { isCached: true, run: existingRun }; + } + } + + return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; + } +} diff --git a/apps/webapp/app/runEngine/concerns/payloads.server.ts b/apps/webapp/app/runEngine/concerns/payloads.server.ts new file mode 100644 index 0000000000..637a54d584 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/payloads.server.ts @@ -0,0 +1,63 @@ +import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3"; +import { PayloadProcessor, TriggerTaskRequest } from "../types"; +import { env } from "~/env.server"; +import { startActiveSpan } from "~/v3/tracer.server"; +import { uploadPacketToObjectStore } from "~/v3/r2.server"; +import { EngineServiceValidationError } from "./errors"; + +export class DefaultPayloadProcessor implements PayloadProcessor { + async process(request: TriggerTaskRequest): Promise { + return await startActiveSpan("handlePayloadPacket()", async (span) => { + const payload = request.body.payload; + const payloadType = request.body.options?.payloadType ?? "application/json"; + + const packet = this.#createPayloadPacket(payload, payloadType); + + if (!packet.data) { + return packet; + } + + const { needsOffloading, size } = packetRequiresOffloading( + packet, + env.TASK_PAYLOAD_OFFLOAD_THRESHOLD + ); + + span.setAttribute("needsOffloading", needsOffloading); + span.setAttribute("size", size); + + if (!needsOffloading) { + return packet; + } + + const filename = `${request.friendlyId}/payload.json`; + + const [uploadError] = await tryCatch( + uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment) + ); + + if (uploadError) { + throw new EngineServiceValidationError( + "Failed to upload large payload to object store", + 500 + ); // This is retryable + } + + return { + data: filename, + dataType: "application/store", + }; + }); + } + + #createPayloadPacket(payload: any, payloadType: string): IOPacket { + if (payloadType === "application/json") { + return { data: JSON.stringify(payload), dataType: "application/json" }; + } + + if (typeof payload === "string") { + return { data: payload, dataType: payloadType }; + } + + return { dataType: payloadType }; + } +} diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts new file mode 100644 index 0000000000..7701135516 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -0,0 +1,248 @@ +import { sanitizeQueueName } from "@trigger.dev/core/v3/isomorphic"; +import { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; +import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server"; +import { + LockedBackgroundWorker, + QueueManager, + QueueProperties, + QueueValidationResult, + TriggerTaskRequest, +} from "../types"; +import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server"; +import type { RunEngine } from "~/v3/runEngine.server"; +import { env } from "~/env.server"; +import { EngineServiceValidationError } from "./errors"; + +export class DefaultQueueManager implements QueueManager { + constructor( + private readonly prisma: PrismaClientOrTransaction, + private readonly engine: RunEngine + ) {} + + async resolveQueueProperties( + request: TriggerTaskRequest, + lockedBackgroundWorker?: LockedBackgroundWorker + ): Promise { + let queueName: string; + let lockedQueueId: string | undefined; + + // Determine queue name based on lockToVersion and provided options + if (lockedBackgroundWorker) { + // Task is locked to a specific worker version + if (request.body.options?.queue?.name) { + const specifiedQueueName = request.body.options.queue.name; + // A specific queue name is provided + const specifiedQueue = await this.prisma.taskQueue.findFirst({ + // Validate it exists for the locked worker + where: { + name: specifiedQueueName, + runtimeEnvironmentId: request.environment.id, + workers: { some: { id: lockedBackgroundWorker.id } }, // Ensure the queue is associated with any task of the locked worker + }, + }); + + if (!specifiedQueue) { + throw new EngineServiceValidationError( + `Specified queue '${specifiedQueueName}' not found or not associated with locked version '${ + lockedBackgroundWorker.version ?? "" + }'.` + ); + } + // Use the validated queue name directly + queueName = specifiedQueue.name; + lockedQueueId = specifiedQueue.id; + } else { + // No specific queue name provided, use the default queue for the task on the locked worker + const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: lockedBackgroundWorker.id, + runtimeEnvironmentId: request.environment.id, + slug: request.taskId, + }, + include: { + queue: true, + }, + }); + + if (!lockedTask) { + throw new EngineServiceValidationError( + `Task '${request.taskId}' not found on locked version '${ + lockedBackgroundWorker.version ?? "" + }'.` + ); + } + + if (!lockedTask.queue) { + // This case should ideally be prevented by earlier checks or schema constraints, + // but handle it defensively. + logger.error("Task found on locked version, but has no associated queue record", { + taskId: request.taskId, + workerId: lockedBackgroundWorker.id, + version: lockedBackgroundWorker.version, + }); + throw new EngineServiceValidationError( + `Default queue configuration for task '${request.taskId}' missing on locked version '${ + lockedBackgroundWorker.version ?? "" + }'.` + ); + } + // Use the task's default queue name + queueName = lockedTask.queue.name; + lockedQueueId = lockedTask.queue.id; + } + } else { + // Task is not locked to a specific version, use regular logic + if (request.body.options?.lockToVersion) { + // This should only happen if the findFirst failed, indicating the version doesn't exist + throw new EngineServiceValidationError( + `Task locked to version '${request.body.options.lockToVersion}', but no worker found with that version.` + ); + } + + // Get queue name using the helper for non-locked case (handles provided name or finds default) + queueName = await this.getQueueName(request); + } + + // Sanitize the final determined queue name once + const sanitizedQueueName = sanitizeQueueName(queueName); + + // Check that the queuename is not an empty string + if (!sanitizedQueueName) { + queueName = sanitizeQueueName(`task/${request.taskId}`); // Fallback if sanitization results in empty + } else { + queueName = sanitizedQueueName; + } + + return { + queueName, + lockedQueueId, + }; + } + + async getQueueName(request: TriggerTaskRequest): Promise { + const { taskId, environment, body } = request; + const { queue } = body.options ?? {}; + + if (queue?.name) { + return queue.name; + } + + const defaultQueueName = `task/${taskId}`; + + // Find the current worker for the environment + const worker = await findCurrentWorkerFromEnvironment(environment, this.prisma); + + if (!worker) { + logger.debug("Failed to get queue name: No worker found", { + taskId, + environmentId: environment.id, + }); + + return defaultQueueName; + } + + const task = await this.prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: worker.id, + runtimeEnvironmentId: environment.id, + slug: taskId, + }, + include: { + queue: true, + }, + }); + + if (!task) { + console.log("Failed to get queue name: No task found", { + taskId, + environmentId: environment.id, + }); + + return defaultQueueName; + } + + if (!task.queue) { + console.log("Failed to get queue name: No queue found", { + taskId, + environmentId: environment.id, + queueConfig: task.queueConfig, + }); + + return defaultQueueName; + } + + return task.queue.name ?? defaultQueueName; + } + + async validateQueueLimits(environment: AuthenticatedEnvironment): Promise { + const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment); + + logger.debug("Queue size guard result", { + queueSizeGuard, + environment: { + id: environment.id, + type: environment.type, + organization: environment.organization, + project: environment.project, + }, + }); + + return { + ok: queueSizeGuard.isWithinLimits, + maximumSize: queueSizeGuard.maximumSize ?? 0, + queueSize: queueSizeGuard.queueSize ?? 0, + }; + } + + async getMasterQueue(environment: AuthenticatedEnvironment): Promise { + if (environment.type === "DEVELOPMENT") { + return; + } + + const workerGroupService = new WorkerGroupService({ + prisma: this.prisma, + engine: this.engine, + }); + + const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ + projectId: environment.projectId, + }); + + if (!workerGroup) { + throw new EngineServiceValidationError("No worker group found"); + } + + return workerGroup.masterQueue; + } +} + +function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined { + if (environment.type === "DEVELOPMENT") { + return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE; + } else { + return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE; + } +} + +async function guardQueueSizeLimitsForEnv( + engine: RunEngine, + environment: AuthenticatedEnvironment, + itemsToAdd: number = 1 +) { + const maximumSize = getMaximumSizeForEnvironment(environment); + + if (typeof maximumSize === "undefined") { + return { isWithinLimits: true }; + } + + const queueSize = await engine.lengthOfEnvQueue(environment); + const projectedSize = queueSize + itemsToAdd; + + return { + isWithinLimits: projectedSize <= maximumSize, + maximumSize, + queueSize, + }; +} diff --git a/apps/webapp/app/runEngine/concerns/runChainStates.server.ts b/apps/webapp/app/runEngine/concerns/runChainStates.server.ts new file mode 100644 index 0000000000..b0e9bf2064 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/runChainStates.server.ts @@ -0,0 +1,276 @@ +import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database"; +import { RunChainStateManager, TriggerTaskRequest } from "../types"; +import { RunChainState } from "@trigger.dev/core/v3/schemas"; +import { logger } from "~/services/logger.server"; +import { EngineServiceValidationError } from "./errors"; + +export class DefaultRunChainStateManager implements RunChainStateManager { + private readonly prisma: PrismaClientOrTransaction; + private readonly isReleaseConcurrencyEnabled: boolean; + + constructor(prisma: PrismaClientOrTransaction, isReleaseConcurrencyEnabled: boolean) { + this.prisma = prisma; + this.isReleaseConcurrencyEnabled = isReleaseConcurrencyEnabled; + } + + async validateRunChain( + request: TriggerTaskRequest, + { + parentRun, + queueName, + lockedQueueId, + }: { parentRun?: TaskRun; queueName: string; lockedQueueId?: string } + ): Promise { + // if there is no parent run, the chain resets + if (!parentRun) { + return {}; + } + + const parsedParentRunChainState = RunChainState.safeParse(parentRun.runChainState ?? {}); + + if (!parsedParentRunChainState.success) { + logger.error("Invalid run chain state for parent run", { + runId: parentRun.id, + runState: parentRun.runChainState, + error: parsedParentRunChainState.error, + }); + + return {}; + } + + const parentRunChainState = parsedParentRunChainState.data; + + if ( + typeof request.body.options?.resumeParentOnCompletion === "boolean" && + !request.body.options.resumeParentOnCompletion + ) { + return parentRunChainState; + } + + // Now we need to check if the parent run will hold concurrency, or if it will release it + // if it will hold concurrency, we need to account for the parent run's concurrency + // Then, along with the new run's concurrency, + // we need to determine if the new run will ever be able to run, or are we in a deadlock situation + // We need to check the concurrency limit against the concurrency limit of the environment, and the queue of the new run + // We'll also need the queue of the parent run, to determine if the parent run will release and which queue to add to + // Since the parent run is already running, it will definitely have a locked queue associated with it + const { concurrency } = parentRunChainState; + + const parentLockedQueueId = parentRun.lockedQueueId; + + if (!parentLockedQueueId) { + logger.error("Parent run has no locked queue, cannot determine run chain state", { + runId: parentRun.id, + runState: parentRun.runChainState, + }); + + return {}; + } + + const parentQueueState = await this.#getParentQueueState( + parentRunChainState, + parentLockedQueueId + ); + + // We first need to check if the release concurrency system is enabled, + // If it is not enabled, then we can assume the parent run will hold the concurrency, + // for the env and the queue + // If it is enabled, we never hold the concurrency for the env, just for the queue + if (!this.isReleaseConcurrencyEnabled) { + parentQueueState.holding += 1; + + const newRunChainState = { + ...parentRunChainState, + concurrency: { + queues: [ + ...(concurrency?.queues ?? []).filter((queue) => queue.id !== parentLockedQueueId), + parentQueueState, + ], + environment: (concurrency?.environment ?? 0) + 1, + }, + }; + + return await this.#validateNewRunChainState(request, newRunChainState, { + parentRun, + queueName, + lockedQueueId, + }); + } + + // Now we need to determine if the parent run will release the concurrency + // if it does, we will add to the holding count for the queue + const willReleaseConcurrency = await this.#determineIfParentRunWillReleaseConcurrency( + request, + parentLockedQueueId + ); + + if (!willReleaseConcurrency) { + parentQueueState.holding += 1; + } + + const newRunChainState = { + ...parentRunChainState, + concurrency: { + queues: [ + ...(concurrency?.queues ?? []).filter((queue) => queue.id !== parentLockedQueueId), + parentQueueState, + ], + environment: concurrency?.environment ?? 0, + }, + }; + + return await this.#validateNewRunChainState(request, newRunChainState, { + parentRun, + queueName, + lockedQueueId, + }); + } + + // Performs the deadlock detection logic once the new run chain state is determined + // Needs to account for everything held, plus the new run's concurrency + async #validateNewRunChainState( + request: TriggerTaskRequest, + runChainState: RunChainState, + { + parentRun, + queueName, + lockedQueueId, + }: { parentRun?: TaskRun; queueName: string; lockedQueueId?: string } + ) { + logger.debug("Validating new run chain state", { + runChainState, + }); + + const environmentConcurrency = (runChainState.concurrency?.environment ?? 0) + 1; + + if ( + request.environment.maximumConcurrencyLimit > 0 && + environmentConcurrency > request.environment.maximumConcurrencyLimit + ) { + const environmentDetails = `The environment has a concurrency limit of ${request.environment.maximumConcurrencyLimit}, and the chain would require ${environmentConcurrency}`; + throw new EngineServiceValidationError(this.createDeadlockErrorMessage(environmentDetails)); + } + + if (!lockedQueueId) { + return runChainState; + } + + const queueConcurrencyState = runChainState.concurrency?.queues.find( + (queue) => queue.id === lockedQueueId + ); + + if (!queueConcurrencyState) { + return runChainState; + } + + const queueConcurrency = queueConcurrencyState.holding + 1; + + const queue = await this.prisma.taskQueue.findFirst({ + where: { + id: lockedQueueId, + }, + select: { + concurrencyLimit: true, + }, + }); + + if (!queue) { + return runChainState; + } + + const queueConcurrencyLimit = queue.concurrencyLimit; + + if ( + typeof queueConcurrencyLimit === "number" && + queueConcurrencyLimit !== 0 && + queueConcurrency > queueConcurrencyLimit + ) { + const queueDetails = `The queue '${queueName}' has a concurrency limit of ${queueConcurrencyLimit}, and the chain would require ${queueConcurrency}`; + throw new EngineServiceValidationError(this.createDeadlockErrorMessage(queueDetails)); + } + + return runChainState; + } + + async #determineIfParentRunWillReleaseConcurrency( + request: TriggerTaskRequest, + parentLockedQueueId: string + ) { + if (typeof request.body.options?.releaseConcurrency === "boolean") { + return request.body.options.releaseConcurrency; + } + + const parentQueue = await this.prisma.taskQueue.findFirst({ + where: { + id: parentLockedQueueId, + }, + select: { + releaseConcurrencyOnWaitpoint: true, + concurrencyLimit: true, + }, + }); + + logger.debug("Determining if parent run will release concurrency", { + parentQueue, + }); + + if ( + typeof parentQueue?.concurrencyLimit === "undefined" || + parentQueue.concurrencyLimit === null + ) { + return true; + } + + if (typeof parentQueue?.releaseConcurrencyOnWaitpoint === "boolean") { + return parentQueue.releaseConcurrencyOnWaitpoint; + } + + return false; + } + + async #getParentQueueState(runChainState: RunChainState, parentLockedQueueId: string) { + const newQueueState = runChainState.concurrency?.queues.find( + (queue) => queue.id === parentLockedQueueId + ); + + if (newQueueState) { + return newQueueState; + } + + const parentQueue = await this.prisma.taskQueue.findFirst({ + where: { + id: parentLockedQueueId, + }, + }); + + if (!parentQueue) { + throw new Error("Deadlock detection failed, parent queue not found"); + } + + return { + id: parentQueue.id, + name: parentQueue.name, + holding: 0, + }; + } + + private createDeadlockErrorMessage(details: string) { + return `Deadlock detected: This task run cannot be triggered because it would create a concurrency deadlock. + +A deadlock occurs when a chain of task runs (parent -> child) would collectively hold more concurrency than is available, making it impossible for the child run to ever execute. + +Current situation: +${details} + +This usually happens when: +1. A parent task triggers a child task using triggerAndWait() +2. Both tasks use the same queue +3. The parent task doesn't release its concurrency while waiting (releaseConcurrency: false) + +To fix this, you can: +1. Enable releaseConcurrencyOnWaitpoint on the queue +2. Use a different queue for the child task +3. Increase the concurrency limits +4. Use trigger() instead of triggerAndWait() if you don't need to wait`; + } +} diff --git a/apps/webapp/app/runEngine/concerns/runNumbers.server.ts b/apps/webapp/app/runEngine/concerns/runNumbers.server.ts new file mode 100644 index 0000000000..39033f2623 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/runNumbers.server.ts @@ -0,0 +1,14 @@ +import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; +import { RunNumberIncrementer, TriggerTaskRequest } from "../types"; + +export class DefaultRunNumberIncrementer implements RunNumberIncrementer { + async incrementRunNumber( + request: TriggerTaskRequest, + callback: (num: number) => Promise + ): Promise { + return await autoIncrementCounter.incrementInTransaction( + `v3-run:${request.environment.id}:${request.taskId}`, + callback + ); + } +} diff --git a/apps/webapp/app/runEngine/concerns/traceEvents.server.ts b/apps/webapp/app/runEngine/concerns/traceEvents.server.ts new file mode 100644 index 0000000000..f8193ff112 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/traceEvents.server.ts @@ -0,0 +1,122 @@ +import { EventRepository } from "~/v3/eventRepository.server"; +import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types"; +import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes"; +import { BatchId } from "@trigger.dev/core/v3/isomorphic"; +import { TaskRun } from "@trigger.dev/database"; + +export class DefaultTraceEventsConcern implements TraceEventConcern { + private readonly eventRepository: EventRepository; + + constructor(eventRepository: EventRepository) { + this.eventRepository = eventRepository; + } + + async traceRun( + request: TriggerTaskRequest, + callback: (span: TracedEventSpan) => Promise + ): Promise { + return await this.eventRepository.traceEvent( + request.taskId, + { + context: request.options?.traceContext, + spanParentAsLink: request.options?.spanParentAsLink, + parentAsLinkType: request.options?.parentAsLinkType, + kind: "SERVER", + environment: request.environment, + taskSlug: request.taskId, + attributes: { + properties: { + [SemanticInternalAttributes.SHOW_ACTIONS]: true, + }, + style: { + icon: request.options?.customIcon ?? "task", + }, + runIsTest: request.body.options?.test ?? false, + batchId: request.options?.batchId + ? BatchId.toFriendlyId(request.options.batchId) + : undefined, + idempotencyKey: request.options?.idempotencyKey, + }, + incomplete: true, + immediate: true, + }, + async (event, traceContext, traceparent) => { + return await callback({ + traceId: event.traceId, + spanId: event.spanId, + traceContext, + traceparent, + setAttribute: (key, value) => event.setAttribute(key as any, value), + failWithError: event.failWithError.bind(event), + }); + } + ); + } + + async traceIdempotentRun( + request: TriggerTaskRequest, + options: { + existingRun: TaskRun; + idempotencyKey: string; + incomplete: boolean; + isError: boolean; + }, + callback: (span: TracedEventSpan) => Promise + ): Promise { + const { existingRun, idempotencyKey, incomplete, isError } = options; + + return await this.eventRepository.traceEvent( + `${request.taskId} (cached)`, + { + context: request.options?.traceContext, + spanParentAsLink: request.options?.spanParentAsLink, + parentAsLinkType: request.options?.parentAsLinkType, + kind: "SERVER", + environment: request.environment, + taskSlug: request.taskId, + attributes: { + properties: { + [SemanticInternalAttributes.SHOW_ACTIONS]: true, + [SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId, + }, + style: { + icon: "task-cached", + }, + runIsTest: request.body.options?.test ?? false, + batchId: request.options?.batchId + ? BatchId.toFriendlyId(request.options.batchId) + : undefined, + idempotencyKey, + runId: existingRun.friendlyId, + }, + incomplete, + isError, + immediate: true, + }, + async (event, traceContext, traceparent) => { + //log a message + await this.eventRepository.recordEvent( + `There's an existing run for idempotencyKey: ${idempotencyKey}`, + { + taskSlug: request.taskId, + environment: request.environment, + attributes: { + runId: existingRun.friendlyId, + }, + context: request.options?.traceContext, + parentId: event.spanId, + } + ); + + return await callback({ + traceId: event.traceId, + spanId: event.spanId, + traceContext, + traceparent, + setAttribute: (key, value) => event.setAttribute(key as any, value), + failWithError: event.failWithError.bind(event), + }); + } + ); + } +} diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 5e8c48cb7b..74ebf307f5 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -16,9 +16,9 @@ import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; import { workerQueue } from "~/services/worker.server"; import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server"; -import { startActiveSpan } from "../../v3/tracer.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server"; +import { startActiveSpan } from "../../v3/tracer.server"; const PROCESSING_BATCH_SIZE = 50; const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; @@ -112,7 +112,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { } catch (error) { // Detect a prisma transaction Unique constraint violation if (error instanceof Prisma.PrismaClientKnownRequestError) { - logger.debug("BatchTriggerV3: Prisma transaction error", { + logger.debug("RunEngineBatchTrigger: Prisma transaction error", { code: error.code, message: error.message, meta: error.meta, @@ -188,7 +188,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { switch (result.status) { case "COMPLETE": { - logger.debug("[BatchTriggerV3][call] Batch inline processing complete", { + logger.debug("[RunEngineBatchTrigger][call] Batch inline processing complete", { batchId: batch.friendlyId, currentIndex: 0, }); @@ -196,7 +196,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { return batch; } case "INCOMPLETE": { - logger.debug("[BatchTriggerV3][call] Batch inline processing incomplete", { + logger.debug("[RunEngineBatchTrigger][call] Batch inline processing incomplete", { batchId: batch.friendlyId, currentIndex: result.workingIndex, }); @@ -218,7 +218,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { return batch; } case "ERROR": { - logger.error("[BatchTriggerV3][call] Batch inline processing error", { + logger.error("[RunEngineBatchTrigger][call] Batch inline processing error", { batchId: batch.friendlyId, currentIndex: result.workingIndex, error: result.error, @@ -317,7 +317,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { } async processBatchTaskRun(options: BatchProcessingOptions) { - logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch", { + logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch", { options, }); @@ -325,7 +325,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { // Add early return if max attempts reached if ($attemptCount > MAX_ATTEMPTS) { - logger.error("[BatchTriggerV3][processBatchTaskRun] Max attempts reached", { + logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Max attempts reached", { options, attemptCount: $attemptCount, }); @@ -351,12 +351,15 @@ export class RunEngineBatchTriggerService extends WithRunEngine { // Check to make sure the currentIndex is not greater than the runCount if (options.range.start >= batch.runCount) { - logger.debug("[BatchTriggerV3][processBatchTaskRun] currentIndex is greater than runCount", { - options, - batchId: batch.friendlyId, - runCount: batch.runCount, - attemptCount: $attemptCount, - }); + logger.debug( + "[RunEngineBatchTrigger][processBatchTaskRun] currentIndex is greater than runCount", + { + options, + batchId: batch.friendlyId, + runCount: batch.runCount, + attemptCount: $attemptCount, + } + ); return; } @@ -373,7 +376,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { const payload = await parsePacket(payloadPacket); if (!payload) { - logger.debug("[BatchTriggerV3][processBatchTaskRun] Failed to parse payload", { + logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Failed to parse payload", { options, batchId: batch.friendlyId, attemptCount: $attemptCount, @@ -399,7 +402,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { switch (result.status) { case "COMPLETE": { - logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing complete", { + logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Batch processing complete", { options, batchId: batch.friendlyId, attemptCount: $attemptCount, @@ -408,7 +411,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { return; } case "INCOMPLETE": { - logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing incomplete", { + logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Batch processing incomplete", { batchId: batch.friendlyId, currentIndex: result.workingIndex, attemptCount: $attemptCount, @@ -434,7 +437,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { return; } case "ERROR": { - logger.error("[BatchTriggerV3][processBatchTaskRun] Batch processing error", { + logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Batch processing error", { batchId: batch.friendlyId, currentIndex: result.workingIndex, error: result.error, @@ -505,7 +508,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { // Grab the next PROCESSING_BATCH_SIZE items const itemsToProcess = items.slice(currentIndex, currentIndex + batchSize); - logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch items", { + logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch items", { batchId: batch.friendlyId, currentIndex, runCount: batch.runCount, @@ -528,19 +531,19 @@ export class RunEngineBatchTriggerService extends WithRunEngine { }); if (!run) { - logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", { + logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Failed to process item", { batchId: batch.friendlyId, currentIndex: workingIndex, }); - throw new Error("[BatchTriggerV3][processBatchTaskRun] Failed to process item"); + throw new Error("[RunEngineBatchTrigger][processBatchTaskRun] Failed to process item"); } runIds.push(run.friendlyId); workingIndex++; } catch (error) { - logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", { + logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Failed to process item", { batchId: batch.friendlyId, currentIndex: workingIndex, error, @@ -595,7 +598,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { parentRunId: string | undefined; resumeParentOnCompletion: boolean | undefined; }) { - logger.debug("[BatchTriggerV3][processBatchTaskRunItem] Processing item", { + logger.debug("[RunEngineBatchTrigger][processBatchTaskRunItem] Processing item", { batchId: batch.friendlyId, currentIndex, }); @@ -634,7 +637,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { await workerQueue.enqueue("runengine.processBatchTaskRun", options, { tx, - jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`, + jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`, }); } diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 74d230b444..b077daac7f 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -1,46 +1,78 @@ -import { RunDuplicateIdempotencyKeyError, RunEngine } from "@internal/run-engine"; import { - IOPacket, - packetRequiresOffloading, - SemanticInternalAttributes, + RunDuplicateIdempotencyKeyError, + RunEngine, + RunOneTimeUseTokenError, +} from "@internal/run-engine"; +import { Tracer } from "@opentelemetry/api"; +import { tryCatch } from "@trigger.dev/core/utils"; +import { TaskRunError, taskRunErrorEnhancer, taskRunErrorToString, TriggerTaskRequestBody, } from "@trigger.dev/core/v3"; -import { - BatchId, - RunId, - sanitizeQueueName, - stringifyDuration, -} from "@trigger.dev/core/v3/isomorphic"; -import { Prisma } from "@trigger.dev/database"; -import { env } from "~/env.server"; -import { createTags, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; -import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; -import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; +import { RunId, stringifyDuration } from "@trigger.dev/core/v3/isomorphic"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { createTags } from "~/models/taskRunTag.server"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; -import { getEntitlement } from "~/services/platform.v3.server"; import { parseDelay } from "~/utils/delays"; -import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { handleMetadataPacket } from "~/utils/packets"; -import { eventRepository } from "../../v3/eventRepository.server"; -import { findCurrentWorkerFromEnvironment } from "../../v3/models/workerDeployment.server"; -import { uploadPacketToObjectStore } from "../../v3/r2.server"; -import { getTaskEventStore } from "../../v3/taskEventStore.server"; -import { isFinalRunStatus } from "../../v3/taskStatus"; -import { startActiveSpan } from "../../v3/tracer.server"; -import { clampMaxDuration } from "../../v3/utils/maxDuration"; -import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; -import { - MAX_ATTEMPTS, - OutOfEntitlementError, +import { startSpan } from "~/v3/tracing.server"; +import type { TriggerTaskServiceOptions, TriggerTaskServiceResult, } from "../../v3/services/triggerTask.server"; -import { WorkerGroupService } from "../../v3/services/worker/workerGroupService.server"; +import { getTaskEventStore } from "../../v3/taskEventStore.server"; +import { clampMaxDuration } from "../../v3/utils/maxDuration"; +import { EngineServiceValidationError } from "../concerns/errors"; +import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server"; +import type { + PayloadProcessor, + QueueManager, + RunChainStateManager, + RunNumberIncrementer, + TraceEventConcern, + TriggerTaskRequest, + TriggerTaskValidator, +} from "../types"; + +export class RunEngineTriggerTaskService { + private readonly queueConcern: QueueManager; + private readonly validator: TriggerTaskValidator; + private readonly payloadProcessor: PayloadProcessor; + private readonly idempotencyKeyConcern: IdempotencyKeyConcern; + private readonly runNumberIncrementer: RunNumberIncrementer; + private readonly prisma: PrismaClientOrTransaction; + private readonly engine: RunEngine; + private readonly tracer: Tracer; + private readonly traceEventConcern: TraceEventConcern; + private readonly runChainStateManager: RunChainStateManager; + + constructor(opts: { + prisma: PrismaClientOrTransaction; + engine: RunEngine; + queueConcern: QueueManager; + validator: TriggerTaskValidator; + payloadProcessor: PayloadProcessor; + idempotencyKeyConcern: IdempotencyKeyConcern; + runNumberIncrementer: RunNumberIncrementer; + traceEventConcern: TraceEventConcern; + runChainStateManager: RunChainStateManager; + tracer: Tracer; + }) { + this.prisma = opts.prisma; + this.engine = opts.engine; + this.queueConcern = opts.queueConcern; + this.validator = opts.validator; + this.payloadProcessor = opts.payloadProcessor; + this.idempotencyKeyConcern = opts.idempotencyKeyConcern; + this.runNumberIncrementer = opts.runNumberIncrementer; + this.tracer = opts.tracer; + this.traceEventConcern = opts.traceEventConcern; + this.runChainStateManager = opts.runChainStateManager; + } -export class RunEngineTriggerTaskService extends WithRunEngine { public async call({ taskId, environment, @@ -54,139 +86,91 @@ export class RunEngineTriggerTaskService extends WithRunEngine { options?: TriggerTaskServiceOptions; attempt?: number; }): Promise { - return await this.traceWithEnv("call()", environment, async (span) => { + return await startSpan(this.tracer, "RunEngineTriggerTaskService.call()", async (span) => { span.setAttribute("taskId", taskId); span.setAttribute("attempt", attempt); - if (attempt > MAX_ATTEMPTS) { - throw new ServiceValidationError( - `Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.` - ); + const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId; + const triggerRequest = { + taskId, + friendlyId: runFriendlyId, + environment, + body, + options, + } satisfies TriggerTaskRequest; + + // Validate max attempts + const maxAttemptsValidation = this.validator.validateMaxAttempts({ + taskId, + attempt, + }); + + if (!maxAttemptsValidation.ok) { + throw maxAttemptsValidation.error; } - const idempotencyKey = options.idempotencyKey ?? body.options?.idempotencyKey; - const idempotencyKeyExpiresAt = - options.idempotencyKeyExpiresAt ?? - resolveIdempotencyKeyTTL(body.options?.idempotencyKeyTTL) ?? - new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days + // Validate tags + const tagValidation = this.validator.validateTags({ + tags: body.options?.tags, + }); - const delayUntil = await parseDelay(body.options?.delay); + if (!tagValidation.ok) { + throw tagValidation.error; + } + + // Validate entitlement + const entitlementValidation = await this.validator.validateEntitlement({ + environment, + }); + + if (!entitlementValidation.ok) { + throw entitlementValidation.error; + } + + const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay)); + + if (parseDelayError) { + throw new EngineServiceValidationError(`Invalid delay ${body.options?.delay}`); + } const ttl = typeof body.options?.ttl === "number" ? stringifyDuration(body.options?.ttl) : body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined); - const existingRun = idempotencyKey - ? await this._prisma.taskRun.findFirst({ + // Get parent run if specified + const parentRun = body.options?.parentRunId + ? await this.prisma.taskRun.findFirst({ where: { + id: RunId.fromFriendlyId(body.options.parentRunId), runtimeEnvironmentId: environment.id, - idempotencyKey, - taskIdentifier: taskId, - }, - include: { - associatedWaitpoint: true, }, }) : undefined; - if (existingRun) { - if ( - existingRun.idempotencyKeyExpiresAt && - existingRun.idempotencyKeyExpiresAt < new Date() - ) { - logger.debug("[TriggerTaskService][call] Idempotency key has expired", { - idempotencyKey: options.idempotencyKey, - run: existingRun, - }); - - // Update the existing run to remove the idempotency key - await this._prisma.taskRun.update({ - where: { id: existingRun.id }, - data: { idempotencyKey: null }, - }); - } else { - span.setAttribute("runId", existingRun.friendlyId); - - //We're using `andWait` so we need to block the parent run with a waitpoint - if ( - existingRun.associatedWaitpoint && - body.options?.resumeParentOnCompletion && - body.options?.parentRunId - ) { - await eventRepository.traceEvent( - `${taskId} (cached)`, - { - context: options.traceContext, - spanParentAsLink: options.spanParentAsLink, - parentAsLinkType: options.parentAsLinkType, - kind: "SERVER", - environment, - taskSlug: taskId, - attributes: { - properties: { - [SemanticInternalAttributes.SHOW_ACTIONS]: true, - [SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId, - }, - style: { - icon: "task-cached", - }, - runIsTest: body.options?.test ?? false, - batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, - idempotencyKey, - runId: existingRun.friendlyId, - }, - incomplete: existingRun.associatedWaitpoint.status === "PENDING", - isError: existingRun.associatedWaitpoint.outputIsError, - immediate: true, - }, - async (event) => { - //log a message - await eventRepository.recordEvent( - `There's an existing run for idempotencyKey: ${idempotencyKey}`, - { - taskSlug: taskId, - environment, - attributes: { - runId: existingRun.friendlyId, - }, - context: options.traceContext, - parentId: event.spanId, - } - ); - //block run with waitpoint - await this._engine.blockRunWithWaitpoint({ - runId: RunId.fromFriendlyId(body.options!.parentRunId!), - waitpoints: existingRun.associatedWaitpoint!.id, - spanIdToComplete: event.spanId, - batch: options?.batchId - ? { - id: options.batchId, - index: options.batchIndex ?? 0, - } - : undefined, - projectId: environment.projectId, - organizationId: environment.organizationId, - tx: this._prisma, - releaseConcurrency: body.options?.releaseConcurrency, - }); - } - ); - } + // Validate parent run + const parentRunValidation = this.validator.validateParentRun({ + taskId, + parentRun: parentRun ?? undefined, + resumeParentOnCompletion: body.options?.resumeParentOnCompletion, + }); - return { run: existingRun, isCached: true }; - } + if (!parentRunValidation.ok) { + throw parentRunValidation.error; } - if (environment.type !== "DEVELOPMENT") { - const result = await getEntitlement(environment.organizationId); - if (result && result.hasAccess === false) { - throw new OutOfEntitlementError(); - } + const idempotencyKeyConcernResult = await this.idempotencyKeyConcern.handleTriggerRequest( + triggerRequest + ); + + if (idempotencyKeyConcernResult.isCached) { + return idempotencyKeyConcernResult; } + const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult; + if (!options.skipChecks) { - const queueSizeGuard = await guardQueueSizeLimitsForEnv(this._engine, environment); + const queueSizeGuard = await this.queueConcern.validateQueueLimits(environment); logger.debug("Queue size guard result", { queueSizeGuard, @@ -198,32 +182,13 @@ export class RunEngineTriggerTaskService extends WithRunEngine { }, }); - if (!queueSizeGuard.isWithinLimits) { - throw new ServiceValidationError( + if (!queueSizeGuard.ok) { + throw new EngineServiceValidationError( `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` ); } } - if ( - body.options?.tags && - typeof body.options.tags !== "string" && - body.options.tags.length > MAX_TAGS_PER_RUN - ) { - throw new ServiceValidationError( - `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${body.options.tags.length}.` - ); - } - - const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId; - - const payloadPacket = await this.#handlePayloadPacket( - body.payload, - body.options?.payloadType ?? "application/json", - runFriendlyId, - environment - ); - const metadataPacket = body.options?.metadata ? handleMetadataPacket( body.options?.metadata, @@ -231,369 +196,151 @@ export class RunEngineTriggerTaskService extends WithRunEngine { ) : undefined; - const parentRun = body.options?.parentRunId - ? await this._prisma.taskRun.findFirst({ - where: { id: RunId.fromFriendlyId(body.options.parentRunId) }, + const lockedToBackgroundWorker = body.options?.lockToVersion + ? await this.prisma.backgroundWorker.findFirst({ + where: { + projectId: environment.projectId, + runtimeEnvironmentId: environment.id, + version: body.options?.lockToVersion, + }, + select: { + id: true, + version: true, + sdkVersion: true, + cliVersion: true, + }, }) : undefined; - if ( - parentRun && - isFinalRunStatus(parentRun.status) && - body.options?.resumeParentOnCompletion - ) { - logger.debug("Parent run is in a terminal state", { - parentRun, - }); + const { queueName, lockedQueueId } = await this.queueConcern.resolveQueueProperties( + triggerRequest, + lockedToBackgroundWorker ?? undefined + ); - throw new ServiceValidationError( - `Cannot trigger ${taskId} as the parent run has a status of ${parentRun.status}` - ); - } + //upsert tags + const tags = await createTags( + { + tags: body.options?.tags, + projectId: environment.projectId, + }, + this.prisma + ); + + const depth = parentRun ? parentRun.depth + 1 : 0; + + const runChainState = await this.runChainStateManager.validateRunChain(triggerRequest, { + parentRun: parentRun ?? undefined, + queueName, + lockedQueueId, + }); + + const masterQueue = await this.queueConcern.getMasterQueue(environment); try { - return await eventRepository.traceEvent( - taskId, - { - context: options.traceContext, - spanParentAsLink: options.spanParentAsLink, - parentAsLinkType: options.parentAsLinkType, - kind: "SERVER", - environment, - taskSlug: taskId, - attributes: { - properties: { - [SemanticInternalAttributes.SHOW_ACTIONS]: true, - }, - style: { - icon: options.customIcon ?? "task", - }, - runIsTest: body.options?.test ?? false, - batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, - idempotencyKey, - }, - incomplete: true, - immediate: true, - }, - async (event, traceContext, traceparent) => { - const result = await autoIncrementCounter.incrementInTransaction( - `v3-run:${environment.id}:${taskId}`, - async (num, tx) => { - const lockedToBackgroundWorker = body.options?.lockToVersion - ? await tx.backgroundWorker.findFirst({ - where: { - projectId: environment.projectId, - runtimeEnvironmentId: environment.id, - version: body.options?.lockToVersion, - }, - select: { - id: true, - version: true, - sdkVersion: true, - cliVersion: true, - }, - }) - : undefined; - - let queueName = sanitizeQueueName( - await this.#getQueueName(taskId, environment, body.options?.queue?.name) - ); - - // Check that the queuename is not an empty string - if (!queueName) { - queueName = sanitizeQueueName(`task/${taskId}`); - } - - event.setAttribute("queueName", queueName); - span.setAttribute("queueName", queueName); - - //upsert tags - const tags = await createTags( - { - tags: body.options?.tags, - projectId: environment.projectId, - }, - this._prisma - ); - - const depth = parentRun ? parentRun.depth + 1 : 0; - - event.setAttribute("runId", runFriendlyId); - span.setAttribute("runId", runFriendlyId); - - const masterQueue = await this.#getMasterQueueForEnvironment(environment); - - const taskRun = await this._engine.trigger( - { - number: num, - friendlyId: runFriendlyId, - environment: environment, - idempotencyKey, - idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, - taskIdentifier: taskId, - payload: payloadPacket.data ?? "", - payloadType: payloadPacket.dataType, - context: body.context, - traceContext: traceContext, - traceId: event.traceId, - spanId: event.spanId, - parentSpanId: - options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId, - lockedToVersionId: lockedToBackgroundWorker?.id, - taskVersion: lockedToBackgroundWorker?.version, - sdkVersion: lockedToBackgroundWorker?.sdkVersion, - cliVersion: lockedToBackgroundWorker?.cliVersion, - concurrencyKey: body.options?.concurrencyKey, - queue: queueName, - masterQueue: masterQueue, - isTest: body.options?.test ?? false, - delayUntil, - queuedAt: delayUntil ? undefined : new Date(), - maxAttempts: body.options?.maxAttempts, - taskEventStore: getTaskEventStore(), - ttl, - tags, - oneTimeUseToken: options.oneTimeUseToken, - parentTaskRunId: parentRun?.id, - rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, - batch: options?.batchId - ? { - id: options.batchId, - index: options.batchIndex ?? 0, - } - : undefined, - resumeParentOnCompletion: body.options?.resumeParentOnCompletion, - depth, - metadata: metadataPacket?.data, - metadataType: metadataPacket?.dataType, - seedMetadata: metadataPacket?.data, - seedMetadataType: metadataPacket?.dataType, - maxDurationInSeconds: body.options?.maxDuration - ? clampMaxDuration(body.options.maxDuration) + return await this.traceEventConcern.traceRun(triggerRequest, async (event) => { + const result = await this.runNumberIncrementer.incrementRunNumber( + triggerRequest, + async (num) => { + event.setAttribute("queueName", queueName); + span.setAttribute("queueName", queueName); + event.setAttribute("runId", runFriendlyId); + span.setAttribute("runId", runFriendlyId); + + const payloadPacket = await this.payloadProcessor.process(triggerRequest); + + const taskRun = await this.engine.trigger( + { + number: num, + friendlyId: runFriendlyId, + environment: environment, + idempotencyKey, + idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, + taskIdentifier: taskId, + payload: payloadPacket.data ?? "", + payloadType: payloadPacket.dataType, + context: body.context, + traceContext: event.traceContext, + traceId: event.traceId, + spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId, + lockedToVersionId: lockedToBackgroundWorker?.id, + taskVersion: lockedToBackgroundWorker?.version, + sdkVersion: lockedToBackgroundWorker?.sdkVersion, + cliVersion: lockedToBackgroundWorker?.cliVersion, + concurrencyKey: body.options?.concurrencyKey, + queue: queueName, + lockedQueueId, + masterQueue: masterQueue, + isTest: body.options?.test ?? false, + delayUntil, + queuedAt: delayUntil ? undefined : new Date(), + maxAttempts: body.options?.maxAttempts, + taskEventStore: getTaskEventStore(), + ttl, + tags, + oneTimeUseToken: options.oneTimeUseToken, + parentTaskRunId: parentRun?.id, + rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, + batch: options?.batchId + ? { + id: options.batchId, + index: options.batchIndex ?? 0, + } + : undefined, + resumeParentOnCompletion: body.options?.resumeParentOnCompletion, + depth, + metadata: metadataPacket?.data, + metadataType: metadataPacket?.dataType, + seedMetadata: metadataPacket?.data, + seedMetadataType: metadataPacket?.dataType, + maxDurationInSeconds: body.options?.maxDuration + ? clampMaxDuration(body.options.maxDuration) + : undefined, + machine: body.options?.machine, + priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, + releaseConcurrency: body.options?.releaseConcurrency, + queueTimestamp: + parentRun && body.options?.resumeParentOnCompletion + ? parentRun.queueTimestamp ?? undefined : undefined, - machine: body.options?.machine, - priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, - releaseConcurrency: body.options?.releaseConcurrency, - queueTimestamp: - parentRun && body.options?.resumeParentOnCompletion - ? parentRun.queueTimestamp ?? undefined - : undefined, - }, - this._prisma - ); - - const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined; - - if (error) { - event.failWithError(error); - } - - return { run: taskRun, error, isCached: false }; - }, - async (_, tx) => { - const counter = await tx.taskRunNumberCounter.findFirst({ - where: { - taskIdentifier: taskId, - environmentId: environment.id, - }, - select: { lastNumber: true }, - }); - - return counter?.lastNumber; - }, - this._prisma - ); - - if (result?.error) { - throw new ServiceValidationError( - taskRunErrorToString(taskRunErrorEnhancer(result.error)) + runChainState, + }, + this.prisma ); + + const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined; + + if (error) { + event.failWithError(error); + } + + return { run: taskRun, error, isCached: false }; } + ); - return result; + if (result?.error) { + throw new EngineServiceValidationError( + taskRunErrorToString(taskRunErrorEnhancer(result.error)) + ); } - ); + + return result; + }); } catch (error) { if (error instanceof RunDuplicateIdempotencyKeyError) { //retry calling this function, because this time it will return the idempotent run return await this.call({ taskId, environment, body, options, attempt: attempt + 1 }); } - // Detect a prisma transaction Unique constraint violation - if (error instanceof Prisma.PrismaClientKnownRequestError) { - logger.debug("TriggerTask: Prisma transaction error", { - code: error.code, - message: error.message, - meta: error.meta, - }); - - if (error.code === "P2002") { - const target = error.meta?.target; - - if ( - Array.isArray(target) && - target.length > 0 && - typeof target[0] === "string" && - target[0].includes("oneTimeUseToken") - ) { - throw new ServiceValidationError( - `Cannot trigger ${taskId} with a one-time use token as it has already been used.` - ); - } else { - throw new ServiceValidationError( - `Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.` - ); - } - } + if (error instanceof RunOneTimeUseTokenError) { + throw new EngineServiceValidationError( + `Cannot trigger ${taskId} with a one-time use token as it has already been used.` + ); } throw error; } }); } - - async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) { - if (environment.type === "DEVELOPMENT") { - return; - } - - const workerGroupService = new WorkerGroupService({ - prisma: this._prisma, - engine: this._engine, - }); - - const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ - projectId: environment.projectId, - }); - - if (!workerGroup) { - throw new ServiceValidationError("No worker group found"); - } - - return workerGroup.masterQueue; - } - - async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) { - if (queueName) { - return queueName; - } - - const defaultQueueName = `task/${taskId}`; - - const worker = await findCurrentWorkerFromEnvironment(environment); - - if (!worker) { - logger.debug("Failed to get queue name: No worker found", { - taskId, - environmentId: environment.id, - }); - - return defaultQueueName; - } - - const task = await this._prisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - slug: taskId, - }, - include: { - queue: true, - }, - }); - - if (!task) { - console.log("Failed to get queue name: No task found", { - taskId, - environmentId: environment.id, - }); - - return defaultQueueName; - } - - if (!task.queue) { - console.log("Failed to get queue name: No queue found", { - taskId, - environmentId: environment.id, - queueConfig: task.queueConfig, - }); - - return defaultQueueName; - } - - return task.queue.name ?? defaultQueueName; - } - - async #handlePayloadPacket( - payload: any, - payloadType: string, - pathPrefix: string, - environment: AuthenticatedEnvironment - ) { - return await startActiveSpan("handlePayloadPacket()", async (span) => { - const packet = this.#createPayloadPacket(payload, payloadType); - - if (!packet.data) { - return packet; - } - - const { needsOffloading, size } = packetRequiresOffloading( - packet, - env.TASK_PAYLOAD_OFFLOAD_THRESHOLD - ); - - if (!needsOffloading) { - return packet; - } - - const filename = `${pathPrefix}/payload.json`; - - await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment); - - return { - data: filename, - dataType: "application/store", - }; - }); - } - - #createPayloadPacket(payload: any, payloadType: string): IOPacket { - if (payloadType === "application/json") { - return { data: JSON.stringify(payload), dataType: "application/json" }; - } - - if (typeof payload === "string") { - return { data: payload, dataType: payloadType }; - } - - return { dataType: payloadType }; - } -} - -function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined { - if (environment.type === "DEVELOPMENT") { - return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE; - } else { - return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE; - } -} - -export async function guardQueueSizeLimitsForEnv( - engine: RunEngine, - environment: AuthenticatedEnvironment, - itemsToAdd: number = 1 -) { - const maximumSize = getMaximumSizeForEnvironment(environment); - - if (typeof maximumSize === "undefined") { - return { isWithinLimits: true }; - } - - const queueSize = await engine.lengthOfEnvQueue(environment); - const projectedSize = queueSize + itemsToAdd; - - return { - isWithinLimits: projectedSize <= maximumSize, - maximumSize, - queueSize, - }; } diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts new file mode 100644 index 0000000000..c6a4fe1868 --- /dev/null +++ b/apps/webapp/app/runEngine/types.ts @@ -0,0 +1,153 @@ +import { BackgroundWorker, TaskRun } from "@trigger.dev/database"; + +import { + IOPacket, + RunChainState, + TaskRunError, + TriggerTaskRequestBody, +} from "@trigger.dev/core/v3"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { z } from "zod"; + +export type TriggerTaskServiceOptions = { + idempotencyKey?: string; + idempotencyKeyExpiresAt?: Date; + triggerVersion?: string; + traceContext?: Record; + spanParentAsLink?: boolean; + parentAsLinkType?: "replay" | "trigger"; + batchId?: string; + batchIndex?: number; + customIcon?: string; + runFriendlyId?: string; + skipChecks?: boolean; + oneTimeUseToken?: string; +}; + +// domain/triggerTask.ts +export type TriggerTaskRequest = { + taskId: string; + friendlyId: string; + environment: AuthenticatedEnvironment; + body: TriggerTaskRequestBody; + options?: TriggerTaskServiceOptions; +}; + +export type TriggerTaskResult = { + run: TaskRun; + isCached: boolean; + error?: TaskRunError; +}; + +export type QueueValidationResult = + | { + ok: true; + } + | { + ok: false; + maximumSize: number; + queueSize: number; + }; + +export type QueueProperties = { + queueName: string; + lockedQueueId?: string; +}; + +export type LockedBackgroundWorker = Pick< + BackgroundWorker, + "id" | "version" | "sdkVersion" | "cliVersion" +>; + +// Core domain interfaces +export interface QueueManager { + resolveQueueProperties( + request: TriggerTaskRequest, + lockedBackgroundWorker?: LockedBackgroundWorker + ): Promise; + getQueueName(request: TriggerTaskRequest): Promise; + validateQueueLimits(env: AuthenticatedEnvironment): Promise; + getMasterQueue(env: AuthenticatedEnvironment): Promise; +} + +export interface PayloadProcessor { + process(request: TriggerTaskRequest): Promise; +} + +export interface RunNumberIncrementer { + incrementRunNumber( + request: TriggerTaskRequest, + callback: (num: number) => Promise + ): Promise; +} + +export interface TagValidationParams { + tags?: string[] | string; +} + +export interface EntitlementValidationParams { + environment: AuthenticatedEnvironment; +} + +export interface MaxAttemptsValidationParams { + taskId: string; + attempt: number; +} + +export interface ParentRunValidationParams { + taskId: string; + parentRun?: TaskRun; + resumeParentOnCompletion?: boolean; +} + +export type ValidationResult = + | { + ok: true; + } + | { + ok: false; + error: Error; + }; + +export interface TriggerTaskValidator { + validateTags(params: TagValidationParams): ValidationResult; + validateEntitlement(params: EntitlementValidationParams): Promise; + validateMaxAttempts(params: MaxAttemptsValidationParams): ValidationResult; + validateParentRun(params: ParentRunValidationParams): ValidationResult; +} + +export type TracedEventSpan = { + traceId: string; + spanId: string; + traceContext: Record; + traceparent?: { + traceId: string; + spanId: string; + }; + setAttribute: (key: string, value: string) => void; + failWithError: (error: TaskRunError) => void; +}; + +export interface TraceEventConcern { + traceRun( + request: TriggerTaskRequest, + callback: (span: TracedEventSpan) => Promise + ): Promise; + traceIdempotentRun( + request: TriggerTaskRequest, + options: { + existingRun: TaskRun; + idempotencyKey: string; + incomplete: boolean; + isError: boolean; + }, + callback: (span: TracedEventSpan) => Promise + ): Promise; +} + +export interface RunChainStateManager { + validateRunChain( + request: TriggerTaskRequest, + options: { parentRun?: TaskRun; queueName: string; lockedQueueId?: string } + ): Promise; +} diff --git a/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts b/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts new file mode 100644 index 0000000000..cd903d7763 --- /dev/null +++ b/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts @@ -0,0 +1,103 @@ +import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; +import { logger } from "~/services/logger.server"; +import { getEntitlement } from "~/services/platform.v3.server"; +import { MAX_ATTEMPTS, OutOfEntitlementError } from "~/v3/services/triggerTask.server"; +import { isFinalRunStatus } from "~/v3/taskStatus"; +import { EngineServiceValidationError } from "../concerns/errors"; +import { + EntitlementValidationParams, + MaxAttemptsValidationParams, + ParentRunValidationParams, + TagValidationParams, + TriggerTaskValidator, + ValidationResult, +} from "../types"; + +export class DefaultTriggerTaskValidator implements TriggerTaskValidator { + validateTags(params: TagValidationParams): ValidationResult { + const { tags } = params; + + if (!tags) { + return { ok: true }; + } + + if (typeof tags === "string") { + return { ok: true }; + } + + if (tags.length > MAX_TAGS_PER_RUN) { + return { + ok: false, + error: new EngineServiceValidationError( + `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${tags.length}.` + ), + }; + } + + return { ok: true }; + } + + async validateEntitlement(params: EntitlementValidationParams): Promise { + const { environment } = params; + + if (environment.type === "DEVELOPMENT") { + return { ok: true }; + } + + const result = await getEntitlement(environment.organizationId); + + if (!result || result.hasAccess === false) { + return { + ok: false, + error: new OutOfEntitlementError(), + }; + } + + return { ok: true }; + } + + validateMaxAttempts(params: MaxAttemptsValidationParams): ValidationResult { + const { taskId, attempt } = params; + + if (attempt > MAX_ATTEMPTS) { + return { + ok: false, + error: new EngineServiceValidationError( + `Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.` + ), + }; + } + + return { ok: true }; + } + + validateParentRun(params: ParentRunValidationParams): ValidationResult { + const { taskId, parentRun, resumeParentOnCompletion } = params; + + // If there's no parent run specified, that's fine + if (!parentRun) { + return { ok: true }; + } + + // If we're not resuming the parent, we don't need to validate its status + if (!resumeParentOnCompletion) { + return { ok: true }; + } + + // Check if the parent run is in a final state + if (isFinalRunStatus(parentRun.status)) { + logger.debug("Parent run is in a terminal state", { + parentRun, + }); + + return { + ok: false, + error: new EngineServiceValidationError( + `Cannot trigger ${taskId} as the parent run has a status of ${parentRun.status}` + ), + }; + } + + return { ok: true }; + } +} diff --git a/apps/webapp/app/v3/models/workerDeployment.server.ts b/apps/webapp/app/v3/models/workerDeployment.server.ts index 12629b274d..cdec37d1b7 100644 --- a/apps/webapp/app/v3/models/workerDeployment.server.ts +++ b/apps/webapp/app/v3/models/workerDeployment.server.ts @@ -1,5 +1,10 @@ import type { Prettify } from "@trigger.dev/core"; -import { BackgroundWorker, RunEngineVersion, WorkerDeploymentType } from "@trigger.dev/database"; +import { + BackgroundWorker, + PrismaClientOrTransaction, + RunEngineVersion, + WorkerDeploymentType, +} from "@trigger.dev/database"; import { CURRENT_DEPLOYMENT_LABEL, CURRENT_UNMANAGED_DEPLOYMENT_LABEL, @@ -68,12 +73,16 @@ export async function findCurrentWorkerDeployment({ environmentId, label = CURRENT_DEPLOYMENT_LABEL, type, + prismaClient, }: { environmentId: string; label?: string; type?: WorkerDeploymentType; + prismaClient?: PrismaClientOrTransaction; }): Promise { - const promotion = await prisma.workerDeploymentPromotion.findFirst({ + const $prisma = prismaClient ?? prisma; + + const promotion = await $prisma.workerDeploymentPromotion.findFirst({ where: { environmentId, label, @@ -187,13 +196,14 @@ export async function findCurrentUnmanagedWorkerDeployment( export async function findCurrentWorkerFromEnvironment( environment: Pick, + prismaClient: PrismaClientOrTransaction = prisma, label = CURRENT_DEPLOYMENT_LABEL ): Promise | null> { if (environment.type === "DEVELOPMENT") { - const latestDevWorker = await prisma.backgroundWorker.findFirst({ + const latestDevWorker = await prismaClient.backgroundWorker.findFirst({ where: { runtimeEnvironmentId: environment.id, }, @@ -206,13 +216,15 @@ export async function findCurrentWorkerFromEnvironment( const deployment = await findCurrentWorkerDeployment({ environmentId: environment.id, label, + prismaClient, }); return deployment?.worker ?? null; } } export async function findCurrentUnmanagedWorkerFromEnvironment( - environment: Pick + environment: Pick, + prismaClient: PrismaClientOrTransaction = prisma ): Promise { + const traceEventConcern = new DefaultTraceEventsConcern(eventRepository); + const service = new RunEngineTriggerTaskService({ prisma: this._prisma, engine: this._engine, + queueConcern: new DefaultQueueManager(this._prisma, this._engine), + validator: new DefaultTriggerTaskValidator(), + payloadProcessor: new DefaultPayloadProcessor(), + idempotencyKeyConcern: new IdempotencyKeyConcern( + this._prisma, + this._engine, + traceEventConcern + ), + runNumberIncrementer: new DefaultRunNumberIncrementer(), + traceEventConcern, + runChainStateManager: new DefaultRunChainStateManager( + this._prisma, + env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "1" + ), + tracer: tracer, }); return await service.call({ taskId, diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts new file mode 100644 index 0000000000..9e6a75b01d --- /dev/null +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -0,0 +1,1256 @@ +import { describe, expect, vi } from "vitest"; + +// Mock the db prisma client +vi.mock("~/db.server", () => ({ + prisma: {}, +})); + +vi.mock("~/services/platform.v3.server", () => ({ + getEntitlement: vi.fn(), +})); + +import { RunEngine } from "@internal/run-engine"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "@internal/run-engine/tests"; +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@opentelemetry/api"; +import { IOPacket } from "@trigger.dev/core/v3"; +import { TaskRun } from "@trigger.dev/database"; +import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; +import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; +import { DefaultRunChainStateManager } from "~/runEngine/concerns/runChainStates.server"; +import { + EntitlementValidationParams, + MaxAttemptsValidationParams, + ParentRunValidationParams, + PayloadProcessor, + RunNumberIncrementer, + TagValidationParams, + TracedEventSpan, + TraceEventConcern, + TriggerTaskRequest, + TriggerTaskValidator, + ValidationResult, +} from "~/runEngine/types"; +import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server"; +import { getEntitlement } from "~/services/platform.v3.server"; + +vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout + +class MockRunNumberIncrementer implements RunNumberIncrementer { + async incrementRunNumber( + request: TriggerTaskRequest, + callback: (num: number) => Promise + ): Promise { + return await callback(1); + } +} + +class MockPayloadProcessor implements PayloadProcessor { + async process(request: TriggerTaskRequest): Promise { + return { + data: JSON.stringify(request.body.payload), + dataType: "application/json", + }; + } +} + +class MockTriggerTaskValidator implements TriggerTaskValidator { + validateTags(params: TagValidationParams): ValidationResult { + return { ok: true }; + } + validateEntitlement(params: EntitlementValidationParams): Promise { + return Promise.resolve({ ok: true }); + } + validateMaxAttempts(params: MaxAttemptsValidationParams): ValidationResult { + return { ok: true }; + } + validateParentRun(params: ParentRunValidationParams): ValidationResult { + return { ok: true }; + } +} + +class MockTraceEventConcern implements TraceEventConcern { + async traceRun( + request: TriggerTaskRequest, + callback: (span: TracedEventSpan) => Promise + ): Promise { + return await callback({ + traceId: "test", + spanId: "test", + traceContext: {}, + traceparent: undefined, + setAttribute: () => {}, + failWithError: () => {}, + }); + } + + async traceIdempotentRun( + request: TriggerTaskRequest, + options: { + existingRun: TaskRun; + idempotencyKey: string; + incomplete: boolean; + isError: boolean; + }, + callback: (span: TracedEventSpan) => Promise + ): Promise { + return await callback({ + traceId: "test", + spanId: "test", + traceContext: {}, + traceparent: undefined, + setAttribute: () => {}, + failWithError: () => {}, + }); + } +} + +describe("RunEngineTriggerTaskService", () => { + containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const queuesManager = new DefaultQueueManager(prisma, engine); + + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { payload: { test: "test" } }, + }); + + expect(result).toBeDefined(); + expect(result?.run.friendlyId).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + expect(result?.isCached).toBe(false); + + const run = await prisma.taskRun.findUnique({ + where: { + id: result?.run.id, + }, + }); + + expect(run).toBeDefined(); + expect(run?.friendlyId).toBe(result?.run.friendlyId); + expect(run?.engine).toBe("V2"); + expect(run?.queuedAt).toBeDefined(); + expect(run?.queue).toBe(`task/${taskIdentifier}`); + + // Lets make sure the task is in the queue + const queueLength = await engine.runQueue.lengthOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + expect(queueLength).toBe(1); + + await engine.quit(); + }); + + containerTest("should handle idempotency keys correctly", async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const queuesManager = new DefaultQueueManager(prisma, engine); + + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + idempotencyKey: "test-idempotency-key", + }, + }, + }); + + expect(result).toBeDefined(); + expect(result?.run.friendlyId).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + expect(result?.isCached).toBe(false); + + const run = await prisma.taskRun.findUnique({ + where: { + id: result?.run.id, + }, + }); + + expect(run).toBeDefined(); + expect(run?.friendlyId).toBe(result?.run.friendlyId); + expect(run?.engine).toBe("V2"); + expect(run?.queuedAt).toBeDefined(); + expect(run?.queue).toBe(`task/${taskIdentifier}`); + + // Lets make sure the task is in the queue + const queueLength = await engine.runQueue.lengthOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + expect(queueLength).toBe(1); + + // Now lets try to trigger the same task with the same idempotency key + const cachedResult = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + idempotencyKey: "test-idempotency-key", + }, + }, + }); + + expect(cachedResult).toBeDefined(); + expect(cachedResult?.run.friendlyId).toBe(result?.run.friendlyId); + expect(cachedResult?.isCached).toBe(true); + + await engine.quit(); + }); + + containerTest( + "should resolve queue names correctly when locked to version", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "test-task"; + + // Create a background worker with a specific version + const worker = await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, { + preset: "small-1x", + }); + + // Create a specific queue for this worker + const specificQueue = await prisma.taskQueue.create({ + data: { + name: "specific-queue", + friendlyId: "specific-queue", + projectId: authenticatedEnvironment.projectId, + runtimeEnvironmentId: authenticatedEnvironment.id, + workers: { + connect: { + id: worker.worker.id, + }, + }, + }, + }); + + // Associate the task with the queue + await prisma.backgroundWorkerTask.update({ + where: { + workerId_slug: { + workerId: worker.worker.id, + slug: taskIdentifier, + }, + }, + data: { + queueId: specificQueue.id, + }, + }); + + const queuesManager = new DefaultQueueManager(prisma, engine); + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + // Test case 1: Trigger with lockToVersion but no specific queue + const result1 = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.worker.version, + }, + }, + }); + + expect(result1).toBeDefined(); + expect(result1?.run.queue).toBe("specific-queue"); + + // Test case 2: Trigger with lockToVersion and specific queue + const result2 = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.worker.version, + queue: { + name: "specific-queue", + }, + }, + }, + }); + + expect(result2).toBeDefined(); + expect(result2?.run.queue).toBe("specific-queue"); + expect(result2?.run.lockedQueueId).toBe(specificQueue.id); + + // Test case 3: Try to use non-existent queue with locked version (should throw) + await expect( + triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.worker.version, + queue: { + name: "non-existent-queue", + }, + }, + }, + }) + ).rejects.toThrow( + `Specified queue 'non-existent-queue' not found or not associated with locked version '${worker.worker.version}'` + ); + + // Test case 4: Trigger with a non-existent queue without a locked version + const result4 = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + queue: { + name: "non-existent-queue", + }, + }, + }, + }); + + expect(result4).toBeDefined(); + expect(result4?.run.queue).toBe("non-existent-queue"); + expect(result4?.run.status).toBe("PENDING"); + + await engine.quit(); + } + ); + + containerTest( + "should handle run chains correctly when release concurrency is enabled", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const taskIdentifier = "test-task"; + + //create background worker + const { worker } = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier, + undefined, + undefined, + { + releaseConcurrencyOnWaitpoint: false, + concurrencyLimit: 2, + } + ); + + const queuesManager = new DefaultQueueManager(prisma, engine); + + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { payload: { test: "test" } }, + }); + + console.log(result); + + expect(result).toBeDefined(); + expect(result?.run.friendlyId).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + expect(result?.isCached).toBe(false); + + // Lets make sure the task is in the queue + const queueLength = await engine.runQueue.lengthOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + expect(queueLength).toBe(1); + + // Now we need to dequeue the run so so we can trigger a subtask + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: result?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeued.length).toBe(1); + expect(dequeued[0].run.id).toBe(result?.run.id); + + // Now, lets trigger a subtask, with the same task identifier and queue + const subtaskResult = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: result?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }); + + expect(subtaskResult).toBeDefined(); + expect(subtaskResult?.run.status).toBe("PENDING"); + expect(subtaskResult?.run.parentTaskRunId).toBe(result?.run.id); + expect(subtaskResult?.run.lockedQueueId).toBeDefined(); + expect(subtaskResult?.run.runChainState).toEqual({ + concurrency: { + queues: [ + { id: subtaskResult?.run.lockedQueueId, name: subtaskResult?.run.queue, holding: 1 }, + ], + environment: 0, + }, + }); + + // Okay, now lets dequeue the subtask + const dequeuedSubtask = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: subtaskResult?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeuedSubtask.length).toBe(1); + expect(dequeuedSubtask[0].run.id).toBe(subtaskResult?.run.id); + + // Now, when we trigger the subtask, it should raise a deadlock error + await expect( + triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: subtaskResult?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }) + ).rejects.toThrow("Deadlock detected"); + + await engine.quit(); + } + ); + + containerTest( + "should handle run chains with multiple queues correctly", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier1 = "test-task-1"; + const taskIdentifier2 = "test-task-2"; + + // Create a background worker + const { worker } = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + [taskIdentifier1, taskIdentifier2], + undefined, + undefined, + { + releaseConcurrencyOnWaitpoint: false, + concurrencyLimit: 2, + } + ); + + const queuesManager = new DefaultQueueManager(prisma, engine); + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + // Trigger parent run on queue1 + const parentResult = await triggerTaskService.call({ + taskId: taskIdentifier1, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.version, + }, + }, + }); + + expect(parentResult).toBeDefined(); + expect(parentResult?.run.queue).toBe(`task/${taskIdentifier1}`); + expect(parentResult?.run.lockedQueueId).toBeDefined(); + + // Dequeue the parent run to simulate it running + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: parentResult?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeued.length).toBe(1); + expect(dequeued[0].run.id).toBe(parentResult?.run.id); + + // Now trigger a child run on queue2 + const childResult = await triggerTaskService.call({ + taskId: taskIdentifier2, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: parentResult?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }); + + expect(childResult).toBeDefined(); + expect(childResult?.run.queue).toBe(`task/${taskIdentifier2}`); + expect(childResult?.run.lockedQueueId).toBeDefined(); + expect(childResult?.run.parentTaskRunId).toBe(parentResult?.run.id); + + // Verify the run chain state + expect(childResult?.run.runChainState).toEqual({ + concurrency: { + queues: [ + { id: parentResult?.run.lockedQueueId, name: parentResult?.run.queue, holding: 1 }, + ], + environment: 0, + }, + }); + + // Now lets trigger task 1 again, and it should be able to run + const childResult2 = await triggerTaskService.call({ + taskId: taskIdentifier1, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: childResult?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }); + + expect(childResult2).toBeDefined(); + expect(childResult2?.run.status).toBe("PENDING"); + expect(childResult2?.run.parentTaskRunId).toBe(childResult?.run.id); + expect(childResult2?.run.lockedQueueId).toBeDefined(); + expect(childResult2?.run.runChainState).toMatchObject({ + concurrency: { + queues: [ + { id: parentResult?.run.lockedQueueId, name: parentResult?.run.queue, holding: 1 }, + { id: childResult?.run.lockedQueueId, name: childResult?.run.queue, holding: 1 }, + ], + environment: 0, + }, + }); + + // Now lets trigger task 2 again, and it should be able to run + const childResult3 = await triggerTaskService.call({ + taskId: taskIdentifier2, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: childResult2?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }); + + expect(childResult3).toBeDefined(); + expect(childResult3?.run.status).toBe("PENDING"); + expect(childResult3?.run.parentTaskRunId).toBe(childResult2?.run.id); + expect(childResult3?.run.lockedQueueId).toBeDefined(); + expect(childResult3?.run.runChainState).toMatchObject({ + concurrency: { + queues: [ + { id: childResult?.run.lockedQueueId, name: childResult?.run.queue, holding: 1 }, + { id: parentResult?.run.lockedQueueId, name: parentResult?.run.queue, holding: 2 }, + ], + environment: 0, + }, + }); + + // Now lets trigger task 1 again, and it should deadlock + await expect( + triggerTaskService.call({ + taskId: taskIdentifier1, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: childResult3?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }) + ).rejects.toThrow("Deadlock detected"); + + await engine.quit(); + } + ); + + containerTest( + "should handle run chains with explicit releaseConcurrency option", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier1 = "test-task-1"; + const taskIdentifier2 = "test-task-2"; + + // Create a background worker + const { worker } = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + [taskIdentifier1, taskIdentifier2], + undefined, + undefined, + { + releaseConcurrencyOnWaitpoint: false, + concurrencyLimit: 2, + } + ); + + const queuesManager = new DefaultQueueManager(prisma, engine); + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + // Trigger parent run on queue1 + const parentResult = await triggerTaskService.call({ + taskId: taskIdentifier1, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.version, + }, + }, + }); + + expect(parentResult).toBeDefined(); + expect(parentResult?.run.queue).toBe(`task/${taskIdentifier1}`); + expect(parentResult?.run.lockedQueueId).toBeDefined(); + + // Dequeue the parent run to simulate it running + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: parentResult?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeued.length).toBe(1); + expect(dequeued[0].run.id).toBe(parentResult?.run.id); + + // Now trigger a child run on queue2 + const childResult = await triggerTaskService.call({ + taskId: taskIdentifier2, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: parentResult?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + releaseConcurrency: true, + }, + }, + }); + + expect(childResult).toBeDefined(); + expect(childResult?.run.queue).toBe(`task/${taskIdentifier2}`); + expect(childResult?.run.lockedQueueId).toBeDefined(); + expect(childResult?.run.parentTaskRunId).toBe(parentResult?.run.id); + + // Verify the run chain state + expect(childResult?.run.runChainState).toEqual({ + concurrency: { + queues: [ + { id: parentResult?.run.lockedQueueId, name: parentResult?.run.queue, holding: 0 }, + ], + environment: 0, + }, + }); + + await engine.quit(); + } + ); + + containerTest( + "should handle run chains when release concurrency is disabled", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier1 = "test-task-1"; + const taskIdentifier2 = "test-task-2"; + + // Create a background worker + const { worker } = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + [taskIdentifier1, taskIdentifier2], + undefined, + undefined, + { + releaseConcurrencyOnWaitpoint: true, + concurrencyLimit: 2, + } + ); + + const queuesManager = new DefaultQueueManager(prisma, engine); + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + const runChainStateManager = new DefaultRunChainStateManager(prisma, false); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + // Trigger parent run on queue1 + const parentResult = await triggerTaskService.call({ + taskId: taskIdentifier1, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.version, + }, + }, + }); + + expect(parentResult).toBeDefined(); + expect(parentResult?.run.queue).toBe(`task/${taskIdentifier1}`); + expect(parentResult?.run.lockedQueueId).toBeDefined(); + + // Dequeue the parent run to simulate it running + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: parentResult?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeued.length).toBe(1); + expect(dequeued[0].run.id).toBe(parentResult?.run.id); + + // Now trigger a child run on queue2 + const childResult = await triggerTaskService.call({ + taskId: taskIdentifier2, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: parentResult?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }); + + expect(childResult).toBeDefined(); + expect(childResult?.run.queue).toBe(`task/${taskIdentifier2}`); + expect(childResult?.run.lockedQueueId).toBeDefined(); + expect(childResult?.run.parentTaskRunId).toBe(parentResult?.run.id); + + // Verify the run chain state + expect(childResult?.run.runChainState).toEqual({ + concurrency: { + queues: [ + { id: parentResult?.run.lockedQueueId, name: parentResult?.run.queue, holding: 1 }, + ], + environment: 1, + }, + }); + + await engine.quit(); + } + ); + + containerTest( + "should handle run chains correctly when the parent run queue doesn't have a concurrency limit", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const taskIdentifier = "test-task"; + + //create background worker + const { worker } = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier, + undefined, + undefined, + { + releaseConcurrencyOnWaitpoint: false, + concurrencyLimit: null, + } + ); + + const queuesManager = new DefaultQueueManager(prisma, engine); + + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const runChainStateManager = new DefaultRunChainStateManager(prisma, true); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + runChainStateManager, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { payload: { test: "test" } }, + }); + + expect(result).toBeDefined(); + expect(result?.run.friendlyId).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + expect(result?.isCached).toBe(false); + + // Lets make sure the task is in the queue + const queueLength = await engine.runQueue.lengthOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + expect(queueLength).toBe(1); + + // Now we need to dequeue the run so so we can trigger a subtask + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: result?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeued.length).toBe(1); + expect(dequeued[0].run.id).toBe(result?.run.id); + + // Now, lets trigger a subtask, with the same task identifier and queue + const subtaskResult = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: result?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }); + + expect(subtaskResult).toBeDefined(); + expect(subtaskResult?.run.status).toBe("PENDING"); + expect(subtaskResult?.run.parentTaskRunId).toBe(result?.run.id); + expect(subtaskResult?.run.lockedQueueId).toBeDefined(); + expect(subtaskResult?.run.runChainState).toEqual({ + concurrency: { + queues: [ + { id: subtaskResult?.run.lockedQueueId, name: subtaskResult?.run.queue, holding: 0 }, + ], + environment: 0, + }, + }); + + // Okay, now lets dequeue the subtask + const dequeuedSubtask = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: subtaskResult?.run.masterQueue!, + maxRunCount: 1, + }); + + expect(dequeuedSubtask.length).toBe(1); + expect(dequeuedSubtask[0].run.id).toBe(subtaskResult?.run.id); + + // Now, when we trigger the subtask, it should NOT raise a deadlock error + await expect( + triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + parentRunId: subtaskResult?.run.friendlyId, + resumeParentOnCompletion: true, + lockToVersion: worker.version, + }, + }, + }) + ).resolves.toBeDefined(); + + await engine.quit(); + } + ); +}); diff --git a/internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql b/internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql new file mode 100644 index 0000000000..48b79c2b48 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN "runChainState" JSONB; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index ef29fcec39..80ad979d4b 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1853,6 +1853,9 @@ model TaskRun { /// The span ID of the "trigger" span in the parent task run parentSpanId String? + /// Holds the state of the run chain for deadlock detection + runChainState Json? + /// seed run metadata seedMetadata String? seedMetadataType String @default("application/json") diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index 30c9cff7a8..f760b9f065 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -11,6 +11,12 @@ "import": "./dist/src/index.js", "types": "./dist/src/index.d.ts", "default": "./dist/src/index.js" + }, + "./tests": { + "@triggerdotdev/source": "./src/engine/tests/setup.ts", + "import": "./dist/src/engine/tests/setup.js", + "types": "./dist/src/engine/tests/setup.d.ts", + "default": "./dist/src/engine/tests/setup.js" } }, "dependencies": { diff --git a/internal-packages/run-engine/src/engine/db/worker.ts b/internal-packages/run-engine/src/engine/db/worker.ts index e61e9e8d43..3e4ce60b61 100644 --- a/internal-packages/run-engine/src/engine/db/worker.ts +++ b/internal-packages/run-engine/src/engine/db/worker.ts @@ -161,7 +161,9 @@ export async function getRunWithBackgroundWorkerTasks( } } - const queue = workerWithTasks.queues.find((queue) => queue.name === run.queue); + const queue = workerWithTasks.queues.find((queue) => + run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue + ); if (!queue) { return { diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 3f26e8033f..9e7c4c40c8 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -79,3 +79,10 @@ export class RunDuplicateIdempotencyKeyError extends Error { this.name = "RunDuplicateIdempotencyKeyError"; } } + +export class RunOneTimeUseTokenError extends Error { + constructor(message: string) { + super(message); + this.name = "RunOneTimeUseTokenError"; + } +} diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index f5480520ba..2bb9ece95e 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -341,6 +341,7 @@ export class RunEngine { concurrencyKey, masterQueue, queue, + lockedQueueId, isTest, delayUntil, queuedAt, @@ -365,6 +366,7 @@ export class RunEngine { workerId, runnerId, releaseConcurrency, + runChainState, }: TriggerParams, tx?: PrismaClientOrTransaction ): Promise { @@ -420,6 +422,7 @@ export class RunEngine { cliVersion, concurrencyKey, queue, + lockedQueueId, masterQueue, secondaryMasterQueue, isTest, @@ -449,6 +452,7 @@ export class RunEngine { seedMetadataType, maxDurationInSeconds, machinePreset: machine, + runChainState, executionSnapshots: { create: { engine: "V2", diff --git a/internal-packages/run-engine/src/engine/tests/setup.ts b/internal-packages/run-engine/src/engine/tests/setup.ts index a666107f7c..116eb043c8 100644 --- a/internal-packages/run-engine/src/engine/tests/setup.ts +++ b/internal-packages/run-engine/src/engine/tests/setup.ts @@ -11,7 +11,7 @@ import { RunEngineVersion, RuntimeEnvironmentType, } from "@trigger.dev/database"; -import { RunEngine } from "../index.js"; +import type { RunEngine } from "../index.js"; export type AuthenticatedEnvironment = Prisma.RuntimeEnvironmentGetPayload<{ include: { project: true; organization: true; orgMember: true }; @@ -30,6 +30,19 @@ export async function setupAuthenticatedEnvironment( }, }); + const workerGroup = await prisma.workerInstanceGroup.create({ + data: { + name: "default", + masterQueue: "default", + type: "MANAGED", + token: { + create: { + tokenHash: "token_hash", + }, + }, + }, + }); + const project = await prisma.project.create({ data: { name: "Test Project", @@ -37,6 +50,7 @@ export async function setupAuthenticatedEnvironment( externalRef: "proj_1234", organizationId: org.id, engine, + defaultWorkerGroupId: workerGroup.id, }, }); @@ -156,6 +170,11 @@ export async function setupBackgroundWorker( typeof queueOptions?.releaseConcurrencyOnWaitpoint === "boolean" ? queueOptions.releaseConcurrencyOnWaitpoint : undefined, + tasks: { + connect: { + id: task.id, + }, + }, }, update: { concurrencyLimit: @@ -167,6 +186,11 @@ export async function setupBackgroundWorker( id: worker.id, }, }, + tasks: { + connect: { + id: task.id, + }, + }, }, }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index ccf06434ae..54d6b668c9 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -1,7 +1,13 @@ import { type RedisOptions } from "@internal/redis"; import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker"; import { Tracer } from "@internal/tracing"; -import { MachinePreset, MachinePresetName, QueueOptions, RetryOptions } from "@trigger.dev/core/v3"; +import { + MachinePreset, + MachinePresetName, + QueueOptions, + RetryOptions, + RunChainState, +} from "@trigger.dev/core/v3"; import { PrismaClient } from "@trigger.dev/database"; import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelectionStrategy.js"; import { MinimalAuthenticatedEnvironment } from "../shared/index.js"; @@ -84,6 +90,7 @@ export type TriggerParams = { concurrencyKey?: string; masterQueue?: string; queue: string; + lockedQueueId?: string; isTest: boolean; delayUntil?: Date; queuedAt?: Date; @@ -111,6 +118,7 @@ export type TriggerParams = { workerId?: string; runnerId?: string; releaseConcurrency?: boolean; + runChainState?: RunChainState; }; export type EngineWorker = Worker; diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index ff08ab0a51..bdb8379d87 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -1,4 +1,4 @@ export { RunEngine } from "./engine/index.js"; -export { RunDuplicateIdempotencyKeyError } from "./engine/errors.js"; +export { RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./engine/errors.js"; export type { EventBusEventArgs } from "./engine/eventBus.js"; export type { AuthenticatedEnvironment } from "./shared/index.js"; diff --git a/internal-packages/testcontainers/package.json b/internal-packages/testcontainers/package.json index 5fa73d40cd..33a4e43870 100644 --- a/internal-packages/testcontainers/package.json +++ b/internal-packages/testcontainers/package.json @@ -20,4 +20,4 @@ "scripts": { "typecheck": "tsc --noEmit" } -} +} \ No newline at end of file diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 3d99beb709..7253470998 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -61,10 +61,14 @@ const prisma = async ( { postgresContainer }: { postgresContainer: StartedPostgreSqlContainer }, use: Use ) => { + const url = postgresContainer.getConnectionUri(); + + console.log("Initializing Prisma with URL:", url); + const prisma = new PrismaClient({ datasources: { db: { - url: postgresContainer.getConnectionUri(), + url, }, }, }); diff --git a/packages/core/src/v3/apiClient/errors.ts b/packages/core/src/v3/apiClient/errors.ts index 4f84298dc5..5be13b91e7 100644 --- a/packages/core/src/v3/apiClient/errors.ts +++ b/packages/core/src/v3/apiClient/errors.ts @@ -28,23 +28,32 @@ export class ApiError extends Error { } private static makeMessage(status: number | undefined, error: any, message: string | undefined) { - const msg = error?.message + const errorMessage = error?.message ? typeof error.message === "string" ? error.message : JSON.stringify(error.message) + : typeof error === "string" + ? error : error ? JSON.stringify(error) - : message; + : undefined; - if (status && msg) { - return `${status} ${msg}`; + if (errorMessage) { + return errorMessage; } + + if (status && message) { + return `${status} ${message}`; + } + if (status) { return `${status} status code (no body)`; } - if (msg) { - return msg; + + if (message) { + return message; } + return "(no status code or body)"; } diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 04fe3902e7..b585b938c8 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -690,6 +690,21 @@ export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError; + +export const RunChainState = z.object({ + concurrency: z + .object({ + queues: z.array(z.object({ id: z.string(), name: z.string(), holding: z.number() })), + environment: z.number().optional(), + }) + .optional(), +}); + +export type RunChainState = z.infer; diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index d11827d4ca..8a13ab3bc0 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -1022,6 +1022,19 @@ export class TaskExecutor { return { status: "skipped" }; } + // Check for unretryable API errors (client errors except 408 and 429) + if ( + error instanceof Error && + error.name === "TriggerApiError" && + "status" in error && + typeof error.status === "number" + ) { + const status = error.status; + if (status && status >= 400 && status < 500 && status !== 408 && status !== 429) { + return { status: "skipped", error }; + } + } + // Calculate default retry delay if retry config exists let defaultDelay: number | undefined; if (retry) { @@ -1039,7 +1052,10 @@ export class TaskExecutor { (error as ApiError).status === 429 ) { const rateLimitError = error as RateLimitError; - defaultDelay = rateLimitError.millisecondsUntilReset; + const rateLimitDelay = rateLimitError.millisecondsUntilReset; + if (rateLimitDelay) { + defaultDelay = rateLimitDelay; + } } } diff --git a/packages/core/test/taskExecutor.test.ts b/packages/core/test/taskExecutor.test.ts index 62541645b2..c8e8e10bfe 100644 --- a/packages/core/test/taskExecutor.test.ts +++ b/packages/core/test/taskExecutor.test.ts @@ -1,6 +1,8 @@ import { describe, expect, test } from "vitest"; +import { ApiError } from "../src/v3/apiClient/errors.js"; import { ConsoleInterceptor } from "../src/v3/consoleInterceptor.js"; import { + lifecycleHooks, RetryOptions, RunFnParams, ServerBackgroundWorker, @@ -8,11 +10,10 @@ import { TaskRunErrorCodes, TaskRunExecution, } from "../src/v3/index.js"; +import { StandardLifecycleHooksManager } from "../src/v3/lifecycleHooks/manager.js"; import { TracingSDK } from "../src/v3/otel/tracingSDK.js"; import { TriggerTracer } from "../src/v3/tracer.js"; import { TaskExecutor } from "../src/v3/workers/taskExecutor.js"; -import { StandardLifecycleHooksManager } from "../src/v3/lifecycleHooks/manager.js"; -import { lifecycleHooks } from "../src/v3/index.js"; describe("TaskExecutor", () => { beforeEach(() => { @@ -1664,6 +1665,150 @@ describe("TaskExecutor", () => { }, }); }); + + test("should skip retrying for unretryable API errors", async () => { + const unretryableStatusCodes = [400, 401, 403, 404, 422]; + const retryableStatusCodes = [408, 429, 500, 502, 503, 504]; + + // Register global init hook + lifecycleHooks.registerGlobalInitHook({ + id: "test-init", + fn: async () => { + return { + foo: "bar", + }; + }, + }); + + // Test each unretryable status code + for (const status of unretryableStatusCodes) { + const apiError = ApiError.generate( + status, + { error: { message: "API Error" } }, + "API Error", + {} + ); + + const task = { + id: "test-task", + fns: { + run: async () => { + throw apiError; + }, + }, + retry: { + maxAttempts: 3, + minDelay: 1000, + maxDelay: 5000, + factor: 2, + }, + }; + + const result = await executeTask(task, { test: "data" }, undefined); + + // Verify that retrying is skipped for these status codes + expect(result.result).toMatchObject({ + ok: false, + id: "test-run-id", + error: { + type: "BUILT_IN_ERROR", + message: "API Error", + name: "TriggerApiError", + stackTrace: expect.any(String), + }, + skippedRetrying: true, + }); + } + + // Test each retryable status code + for (const status of retryableStatusCodes) { + const apiError = ApiError.generate( + status, + { error: { message: "API Error" } }, + "API Error", + {} + ); + + const task = { + id: "test-task", + fns: { + run: async () => { + throw apiError; + }, + }, + retry: { + maxAttempts: 3, + minDelay: 1000, + maxDelay: 5000, + factor: 2, + }, + }; + + const result = await executeTask(task, { test: "data" }, undefined); + + // Verify that retrying is NOT skipped for these status codes + expect(result.result.ok).toBe(false); + expect(result.result).toMatchObject({ + ok: false, + skippedRetrying: false, + retry: expect.objectContaining({ + delay: expect.any(Number), + timestamp: expect.any(Number), + }), + }); + + if (status === 429) { + // Rate limit errors should use the rate limit retry delay + expect((result.result as any).retry.delay).toBeGreaterThan(0); + } else { + // Other retryable errors should use the exponential backoff + expect((result.result as any).retry.delay).toBeGreaterThan(1000); + expect((result.result as any).retry.delay).toBeLessThan(5000); + } + } + }); + + test("should respect rate limit headers for 429 errors", async () => { + const resetTime = Date.now() + 30000; // 30 seconds from now + const apiError = ApiError.generate( + 429, + { error: { message: "Rate limit exceeded" } }, + "Rate limit exceeded", + { "x-ratelimit-reset": resetTime.toString() } + ); + + const task = { + id: "test-task", + fns: { + run: async () => { + throw apiError; + }, + }, + retry: { + maxAttempts: 3, + minDelay: 1000, + maxDelay: 5000, + factor: 2, + }, + }; + + const result = await executeTask(task, { test: "data" }, undefined); + + // Verify that the retry delay matches the rate limit reset time (with some jitter) + expect(result.result.ok).toBe(false); + expect(result.result).toMatchObject({ + ok: false, + skippedRetrying: false, + retry: expect.objectContaining({ + delay: expect.any(Number), + timestamp: expect.any(Number), + }), + }); + + const delay = (result.result as any).retry.delay; + expect(delay).toBeGreaterThan(29900); // Allow for some time passing during test + expect(delay).toBeLessThan(32000); // Account for max 2000ms jitter + }); }); function executeTask( diff --git a/references/hello-world/src/trigger/deadlocks.ts b/references/hello-world/src/trigger/deadlocks.ts new file mode 100644 index 0000000000..beb838e5b4 --- /dev/null +++ b/references/hello-world/src/trigger/deadlocks.ts @@ -0,0 +1,61 @@ +import { task, queue } from "@trigger.dev/sdk"; + +const deadlockQueue = queue({ + name: "deadlock-queue", + concurrencyLimit: 1, + releaseConcurrencyOnWaitpoint: false, +}); + +export const deadlockReleasingQueue = queue({ + name: "deadlock-releasing-queue", + releaseConcurrencyOnWaitpoint: true, +}); + +export const deadlockTester = task({ + id: "deadlock-tester", + run: async (payload: any, { ctx }) => { + // await deadlockNestedTask.triggerAndWait({ + // message: "Hello, world!", + // }); + + await deadlockNestedTask.batchTriggerAndWait([ + { + payload: { + message: "Hello, world!", + }, + }, + { + payload: { + message: "Hello, world!", + }, + }, + ]); + }, +}); + +export const deadlockNestedTask = task({ + id: "deadlock-nested-task", + queue: deadlockQueue, + run: async (payload: any, { ctx }) => { + // await deadlockTester.triggerAndWait({ + // message: "Hello, world!", + // }); + + await deadlockTester.batchTriggerAndWait([ + { + payload: { + message: "Hello, world!", + }, + }, + { + payload: { + message: "Hello, world!", + }, + }, + ]); + + return { + message: "Hello, world!", + }; + }, +}); diff --git a/references/hello-world/src/trigger/queues.ts b/references/hello-world/src/trigger/queues.ts index 9cc3074009..7eb05c99b4 100644 --- a/references/hello-world/src/trigger/queues.ts +++ b/references/hello-world/src/trigger/queues.ts @@ -1,4 +1,4 @@ -import { batch, logger, queue, queues, task } from "@trigger.dev/sdk/v3"; +import { batch, logger, queue, queues, task, tasks } from "@trigger.dev/sdk/v3"; export const queuesTester = task({ id: "queues-tester", @@ -235,3 +235,30 @@ export const sharedQueueTestTask = task({ }; }, }); + +export const lockedQueueTask = task({ + id: "locked-queue-task", + run: async (payload: any) => { + logger.log("Locked queue task", { payload }); + + await lockedQueueChildTask.trigger({}, { queue: "queue_does_not_exist" }); + await lockedQueueChildTask.triggerAndWait({}, { queue: "queue_does_not_exist" }); + }, +}); + +export const lockedQueueChildTask = task({ + id: "locked-queue-child-task", + run: async (payload: any) => { + logger.log("Locked queue child task", { payload }); + }, +}); + +export const lockedTaskIdentifierTask = task({ + id: "locked-task-identifier-task", + run: async (payload: any) => { + logger.log("Locked task identifier task", { payload }); + + await tasks.trigger("task_does_not_exist", {}); + await tasks.triggerAndWait("task_does_not_exist", {}); + }, +});
+ {enhancedException.message} +
+ {enhancedError.message} +