From 53f715334ac46a37d77bdbdd848d28ac087b332a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 6 Jan 2026 12:26:22 +0000 Subject: [PATCH] fix(batch): rate limiting by token bucket no longer incorrectly goes negative --- .changeset/sharp-ravens-doubt.md | 5 + apps/webapp/app/routes/api.v3.batches.ts | 2 +- .../runEngine/concerns/batchLimits.server.ts | 4 +- package.json | 3 +- packages/trigger-sdk/src/v3/index.ts | 2 +- packages/trigger-sdk/src/v3/shared.ts | 72 +++++- patches/@upstash__ratelimit.patch | 26 +++ pnpm-lock.yaml | 19 +- references/hello-world/src/trigger/batches.ts | 207 +++++++++++++++++- 9 files changed, 320 insertions(+), 20 deletions(-) create mode 100644 .changeset/sharp-ravens-doubt.md create mode 100644 patches/@upstash__ratelimit.patch diff --git a/.changeset/sharp-ravens-doubt.md b/.changeset/sharp-ravens-doubt.md new file mode 100644 index 0000000000..e23bf2116a --- /dev/null +++ b/.changeset/sharp-ravens-doubt.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Improve batch trigger error messages, especially when rate limited diff --git a/apps/webapp/app/routes/api.v3.batches.ts b/apps/webapp/app/routes/api.v3.batches.ts index a2847c9566..070e6d9f80 100644 --- a/apps/webapp/app/routes/api.v3.batches.ts +++ b/apps/webapp/app/routes/api.v3.batches.ts @@ -158,7 +158,7 @@ const { action, loader } = createActionApiRoute( status: 429, headers: { "X-RateLimit-Limit": error.limit.toString(), - "X-RateLimit-Remaining": error.remaining.toString(), + "X-RateLimit-Remaining": Math.max(0, error.remaining).toString(), "X-RateLimit-Reset": Math.floor(error.resetAt.getTime() / 1000).toString(), "Retry-After": Math.max( 1, diff --git a/apps/webapp/app/runEngine/concerns/batchLimits.server.ts b/apps/webapp/app/runEngine/concerns/batchLimits.server.ts index 0fcbe67a4e..437feadf38 100644 --- a/apps/webapp/app/runEngine/concerns/batchLimits.server.ts +++ b/apps/webapp/app/runEngine/concerns/batchLimits.server.ts @@ -115,9 +115,7 @@ export class BatchRateLimitExceededError extends Error { public readonly resetAt: Date, public readonly itemCount: number ) { - super( - `Batch rate limit exceeded. Attempted to submit ${itemCount} items but only ${remaining} remaining. Limit resets at ${resetAt.toISOString()}` - ); + super(`Batch rate limit exceeded. Limit resets at ${resetAt.toISOString()}`); this.name = "BatchRateLimitExceededError"; } } diff --git a/package.json b/package.json index 9ca9bfb671..a5584c4ea3 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,8 @@ "graphile-worker@0.16.6": "patches/graphile-worker@0.16.6.patch", "redlock@5.0.0-beta.2": "patches/redlock@5.0.0-beta.2.patch", "@kubernetes/client-node@1.0.0": "patches/@kubernetes__client-node@1.0.0.patch", - "@sentry/remix@9.46.0": "patches/@sentry__remix@9.46.0.patch" + "@sentry/remix@9.46.0": "patches/@sentry__remix@9.46.0.patch", + "@upstash/ratelimit@1.1.3": "patches/@upstash__ratelimit.patch" }, "overrides": { "express@^4>body-parser": "1.20.3", diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index dcc258455b..b2d6247699 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -1,7 +1,7 @@ export * from "./cache.js"; export * from "./config.js"; export { retry, type RetryOptions } from "./retry.js"; -export { queue } from "./shared.js"; +export { queue, BatchTriggerError } from "./shared.js"; export * from "./tasks.js"; export * from "./batch.js"; export * from "./wait.js"; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 87025f0816..d8dc511d5b 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -2,6 +2,7 @@ import { SpanKind } from "@opentelemetry/api"; import { SerializableJson } from "@trigger.dev/core"; import { accessoryAttributes, + ApiError, apiClientManager, ApiRequestOptions, conditionallyImportPacket, @@ -17,6 +18,7 @@ import { parsePacket, Queue, QueueOptions, + RateLimitError, resourceCatalog, runtime, SemanticInternalAttributes, @@ -125,9 +127,6 @@ export { SubtaskUnwrapError, TaskRunPromise }; export type Context = TaskRunContext; -// Re-export for external use (defined later in file) -export { BatchTriggerError }; - export function queue(options: QueueOptions): Queue { resourceCatalog.registerQueueMetadata(options); @@ -1592,12 +1591,28 @@ async function executeBatchTwoPhase( /** * Error thrown when batch trigger operations fail. * Includes context about which phase failed and the batch details. + * + * When the underlying error is a rate limit (429), additional properties are exposed: + * - `isRateLimited`: true + * - `retryAfterMs`: milliseconds until the rate limit resets */ -class BatchTriggerError extends Error { +export class BatchTriggerError extends Error { readonly phase: "create" | "stream"; readonly batchId?: string; readonly itemCount: number; + /** True if the error was caused by rate limiting (HTTP 429) */ + readonly isRateLimited: boolean; + + /** Milliseconds until the rate limit resets. Only set when `isRateLimited` is true. */ + readonly retryAfterMs?: number; + + /** The underlying API error, if the cause was an ApiError */ + readonly apiError?: ApiError; + + /** The underlying cause of the error */ + override readonly cause?: unknown; + constructor( message: string, options: { @@ -1607,12 +1622,59 @@ class BatchTriggerError extends Error { itemCount: number; } ) { - super(message, { cause: options.cause }); + // Build enhanced message that includes the cause's message + const fullMessage = buildBatchErrorMessage(message, options.cause); + super(fullMessage, { cause: options.cause }); + this.name = "BatchTriggerError"; + this.cause = options.cause; this.phase = options.phase; this.batchId = options.batchId; this.itemCount = options.itemCount; + + // Extract rate limit info from cause + if (options.cause instanceof RateLimitError) { + this.isRateLimited = true; + this.retryAfterMs = options.cause.millisecondsUntilReset; + this.apiError = options.cause; + } else if (options.cause instanceof ApiError) { + this.isRateLimited = options.cause.status === 429; + this.apiError = options.cause; + } else { + this.isRateLimited = false; + } + } +} + +/** + * Build an enhanced error message that includes context from the cause. + */ +function buildBatchErrorMessage(baseMessage: string, cause: unknown): string { + if (!cause) { + return baseMessage; + } + + // Handle RateLimitError specifically for better messaging + if (cause instanceof RateLimitError) { + const retryMs = cause.millisecondsUntilReset; + if (retryMs !== undefined) { + const retrySeconds = Math.ceil(retryMs / 1000); + return `${baseMessage}: Rate limit exceeded - retry after ${retrySeconds}s`; + } + return `${baseMessage}: Rate limit exceeded`; } + + // Handle other ApiErrors + if (cause instanceof ApiError) { + return `${baseMessage}: ${cause.message}`; + } + + // Handle generic errors + if (cause instanceof Error) { + return `${baseMessage}: ${cause.message}`; + } + + return baseMessage; } /** diff --git a/patches/@upstash__ratelimit.patch b/patches/@upstash__ratelimit.patch new file mode 100644 index 0000000000..840984fbaa --- /dev/null +++ b/patches/@upstash__ratelimit.patch @@ -0,0 +1,26 @@ +diff --git a/dist/index.js b/dist/index.js +index 7d1502426320957017988aed0c29974acd70e8da..062769cda055302d737503e5d1ba5e62609c934f 100644 +--- a/dist/index.js ++++ b/dist/index.js +@@ -841,7 +841,7 @@ var tokenBucketLimitScript = ` + refilledAt = refilledAt + numRefills * interval + end + +- if tokens == 0 then ++ if tokens < incrementBy then + return {-1, refilledAt + interval} + end + +diff --git a/dist/index.mjs b/dist/index.mjs +index 25a2c888be27b7c5aff41de63d5df189e0031145..53b4a4b2d2ef55f709f7404cc6a66058b7f3191a 100644 +--- a/dist/index.mjs ++++ b/dist/index.mjs +@@ -813,7 +813,7 @@ var tokenBucketLimitScript = ` + refilledAt = refilledAt + numRefills * interval + end + +- if tokens == 0 then ++ if tokens < incrementBy then + return {-1, refilledAt + interval} + end + diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6fa66945d4..0d83f4981f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -26,6 +26,9 @@ patchedDependencies: '@sentry/remix@9.46.0': hash: 146126b032581925294aaed63ab53ce3f5e0356a755f1763d7a9a76b9846943b path: patches/@sentry__remix@9.46.0.patch + '@upstash/ratelimit@1.1.3': + hash: e5922e50fbefb7b2b24950c4b1c5c9ddc4cd25464439c9548d2298c432debe74 + path: patches/@upstash__ratelimit.patch engine.io-parser@5.2.2: hash: 9011e7e16547017450c9baec0fceff0e070310c20d9e62f8d1383bb1da77104f path: patches/engine.io-parser@5.2.2.patch @@ -500,7 +503,7 @@ importers: version: 0.2.0 '@upstash/ratelimit': specifier: ^1.1.3 - version: 1.1.3 + version: 1.1.3(patch_hash=e5922e50fbefb7b2b24950c4b1c5c9ddc4cd25464439c9548d2298c432debe74) '@whatwg-node/fetch': specifier: ^0.9.14 version: 0.9.14 @@ -1054,7 +1057,7 @@ importers: version: 18.3.1 react-email: specifier: ^2.1.1 - version: 2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(bufferutil@4.0.9)(eslint@8.31.0) + version: 2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(eslint@8.31.0) resend: specifier: ^3.2.0 version: 3.2.0 @@ -31182,7 +31185,7 @@ snapshots: dependencies: '@upstash/redis': 1.29.0 - '@upstash/ratelimit@1.1.3': + '@upstash/ratelimit@1.1.3(patch_hash=e5922e50fbefb7b2b24950c4b1c5c9ddc4cd25464439c9548d2298c432debe74)': dependencies: '@upstash/core-analytics': 0.0.8 @@ -38872,7 +38875,7 @@ snapshots: react: 19.1.0 scheduler: 0.26.0 - react-email@2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(bufferutil@4.0.9)(eslint@8.31.0): + react-email@2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(eslint@8.31.0): dependencies: '@babel/parser': 7.24.1 '@radix-ui/colors': 1.0.1 @@ -38909,8 +38912,8 @@ snapshots: react: 18.3.1 react-dom: 18.2.0(react@18.3.1) shelljs: 0.8.5 - socket.io: 4.7.3(bufferutil@4.0.9) - socket.io-client: 4.7.3(bufferutil@4.0.9) + socket.io: 4.7.3 + socket.io-client: 4.7.3 sonner: 1.3.1(react-dom@18.2.0(react@18.3.1))(react@18.3.1) source-map-js: 1.0.2 stacktrace-parser: 0.1.10 @@ -40039,7 +40042,7 @@ snapshots: - supports-color - utf-8-validate - socket.io-client@4.7.3(bufferutil@4.0.9): + socket.io-client@4.7.3: dependencies: '@socket.io/component-emitter': 3.1.0 debug: 4.3.7(supports-color@10.0.0) @@ -40068,7 +40071,7 @@ snapshots: transitivePeerDependencies: - supports-color - socket.io@4.7.3(bufferutil@4.0.9): + socket.io@4.7.3: dependencies: accepts: 1.3.8 base64id: 2.0.0 diff --git a/references/hello-world/src/trigger/batches.ts b/references/hello-world/src/trigger/batches.ts index 1ca134c9d0..594f4032f1 100644 --- a/references/hello-world/src/trigger/batches.ts +++ b/references/hello-world/src/trigger/batches.ts @@ -1,4 +1,4 @@ -import { batch, logger, runs, task, tasks } from "@trigger.dev/sdk/v3"; +import { batch, BatchTriggerError, logger, runs, task, tasks } from "@trigger.dev/sdk/v3"; import { setTimeout } from "timers/promises"; // ============================================================================ @@ -344,6 +344,211 @@ export const batchTriggerAndWait = task({ }, }); +// ============================================================================ +// Rate Limit Error Testing +// ============================================================================ + +/** + * Test: Intentionally trigger a rate limit error to verify BatchTriggerError improvements + * + * This test triggers many batches in rapid succession to exceed the rate limit. + * When a rate limit is hit, it verifies that: + * 1. The error is a BatchTriggerError + * 2. The error has isRateLimited = true + * 3. The error message includes rate limit details + * 4. The retryAfterMs property is set + * + * Run this from backend code, not from inside a task (to avoid worker rate limits). + */ +export const rateLimitErrorTest = task({ + id: "rate-limit-error-test", + maxDuration: 300, + run: async (payload: { batchesPerAttempt?: number; itemsPerBatch?: number }) => { + const batchesPerAttempt = payload.batchesPerAttempt || 50; + const itemsPerBatch = payload.itemsPerBatch || 100; + + logger.info("Starting rate limit error test", { + batchesPerAttempt, + itemsPerBatch, + totalItems: batchesPerAttempt * itemsPerBatch, + }); + + const results: Array<{ + batchIndex: number; + success: boolean; + batchId?: string; + error?: { + message: string; + name: string; + isRateLimited?: boolean; + retryAfterMs?: number; + phase?: string; + }; + }> = []; + + // Try to trigger many batches rapidly + const batchPromises = Array.from({ length: batchesPerAttempt }, async (_, batchIndex) => { + const items = Array.from({ length: itemsPerBatch }, (_, i) => ({ + payload: { index: batchIndex * itemsPerBatch + i, testId: `rate-limit-test-${Date.now()}` }, + })); + + try { + const result = await retryTrackingTask.batchTrigger(items); + return { + batchIndex, + success: true as const, + batchId: result.batchId, + }; + } catch (error) { + // Log the error details for inspection + if (error instanceof BatchTriggerError) { + logger.info(`BatchTriggerError caught for batch ${batchIndex}`, { + message: error.message, + name: error.name, + isRateLimited: error.isRateLimited, + retryAfterMs: error.retryAfterMs, + phase: error.phase, + batchId: error.batchId, + itemCount: error.itemCount, + cause: error.cause instanceof Error ? error.cause.message : String(error.cause), + }); + + return { + batchIndex, + success: false as const, + error: { + message: error.message, + name: error.name, + isRateLimited: error.isRateLimited, + retryAfterMs: error.retryAfterMs, + phase: error.phase, + }, + }; + } + + // Non-BatchTriggerError + const err = error instanceof Error ? error : new Error(String(error)); + logger.warn(`Non-BatchTriggerError caught for batch ${batchIndex}`, { + message: err.message, + name: err.name, + }); + + return { + batchIndex, + success: false as const, + error: { + message: err.message, + name: err.name, + }, + }; + } + }); + + // Wait for all attempts (use allSettled to capture all results) + const settled = await Promise.allSettled(batchPromises); + + for (const result of settled) { + if (result.status === "fulfilled") { + results.push(result.value); + } else { + results.push({ + batchIndex: -1, + success: false, + error: { + message: result.reason?.message || String(result.reason), + name: result.reason?.name || "Error", + }, + }); + } + } + + // Analyze results + const successCount = results.filter((r) => r.success).length; + const rateLimitedCount = results.filter((r) => !r.success && r.error?.isRateLimited).length; + const otherErrorCount = results.filter((r) => !r.success && !r.error?.isRateLimited).length; + + // Get a sample rate limit error for inspection + const sampleRateLimitError = results.find((r) => r.error?.isRateLimited)?.error; + + return { + summary: { + totalBatches: batchesPerAttempt, + successCount, + rateLimitedCount, + otherErrorCount, + }, + sampleRateLimitError: sampleRateLimitError || null, + allResults: results, + testPassed: + rateLimitedCount > 0 && + sampleRateLimitError?.isRateLimited === true && + typeof sampleRateLimitError?.retryAfterMs === "number", + }; + }, +}); + +/** + * Simpler test: Direct batch trigger that's likely to hit rate limits + * + * This test just tries to batch trigger a very large number of items in one call. + * If the organization has rate limits configured, this should trigger them. + */ +export const simpleBatchRateLimitTest = task({ + id: "simple-batch-rate-limit-test", + maxDuration: 120, + run: async (payload: { itemCount?: number }) => { + const itemCount = payload.itemCount || 5000; // Large batch that might hit limits + + logger.info("Starting simple batch rate limit test", { itemCount }); + + const items = Array.from({ length: itemCount }, (_, i) => ({ + payload: { index: i, testId: `simple-rate-test-${Date.now()}` }, + })); + + try { + const result = await retryTrackingTask.batchTrigger(items); + + logger.info("Batch succeeded (no rate limit hit)", { + batchId: result.batchId, + runCount: result.runCount, + }); + + return { + success: true, + batchId: result.batchId, + runCount: result.runCount, + rateLimitHit: false, + }; + } catch (error) { + if (error instanceof BatchTriggerError) { + logger.info("BatchTriggerError caught", { + fullMessage: error.message, + isRateLimited: error.isRateLimited, + retryAfterMs: error.retryAfterMs, + phase: error.phase, + itemCount: error.itemCount, + apiErrorType: error.apiError?.constructor.name, + }); + + return { + success: false, + rateLimitHit: error.isRateLimited, + errorMessage: error.message, + errorDetails: { + phase: error.phase, + itemCount: error.itemCount, + isRateLimited: error.isRateLimited, + retryAfterMs: error.retryAfterMs, + hasApiError: !!error.apiError, + }, + }; + } + + throw error; // Re-throw unexpected errors + } + }, +}); + // ============================================================================ // Streaming Batch Examples // ============================================================================