From 5d6be21b6239ce1489d6b00abae8a67020207104 Mon Sep 17 00:00:00 2001 From: Guillermo Date: Wed, 17 Dec 2025 22:46:31 +0100 Subject: [PATCH 1/5] feat(rate-limit): added infraestructure for rate-limit logic --- apps/webapp/app/env.server.ts | 6 + .../runEngine/services/triggerTask.server.ts | 1 + apps/webapp/app/v3/runQueue.server.ts | 30 +++++ .../services/createBackgroundWorker.server.ts | 72 ++++++++++- .../app/v3/services/enqueueRun.server.ts | 4 + apps/webapp/app/v3/utils/durations.ts | 32 +++++ apps/webapp/test/parseDurationToMs.test.ts | 47 +++++++ .../database/prisma/schema.prisma | 1 + .../run-engine/src/engine/index.ts | 4 + .../src/engine/systems/enqueueSystem.ts | 9 ++ .../run-engine/src/engine/types.ts | 1 + .../run-engine/src/run-queue/index.ts | 38 ++++++ .../run-engine/src/run-queue/keyProducer.ts | 56 +++++++++ .../run-queue/tests/enqueueMessage.test.ts | 65 ++++++++++ .../src/run-queue/tests/keyProducer.test.ts | 72 +++++++++++ .../run-engine/src/run-queue/types.ts | 11 ++ .../v3/schemas/__tests__/rateLimit.test.ts | 115 ++++++++++++++++++ packages/core/src/v3/schemas/api.ts | 10 ++ packages/core/src/v3/schemas/schemas.ts | 42 +++++++ packages/core/src/v3/types/queues.ts | 36 ++++++ packages/core/src/v3/types/tasks.ts | 68 +++++++++++ packages/trigger-sdk/package.json | 2 +- packages/trigger-sdk/src/v3/shared.ts | 107 ++++++++++++++-- 23 files changed, 815 insertions(+), 14 deletions(-) create mode 100644 apps/webapp/app/v3/utils/durations.ts create mode 100644 apps/webapp/test/parseDurationToMs.test.ts create mode 100644 packages/core/src/v3/schemas/__tests__/rateLimit.test.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c5dcbe0520..e4794c50af 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -759,6 +759,12 @@ const EnvironmentSchema = z RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"), RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"), + /** + * Disable queue rate limiting (useful for development and testing). + * When set to "1", rate limit checks on queues will be bypassed. + */ + TRIGGER_DISABLE_QUEUE_RATE_LIMITS: z.string().default("0"), + RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1), diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index f2e9ed5502..2dec336690 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -299,6 +299,7 @@ export class RunEngineTriggerTaskService { sdkVersion: lockedToBackgroundWorker?.sdkVersion, cliVersion: lockedToBackgroundWorker?.cliVersion, concurrencyKey: body.options?.concurrencyKey, + rateLimitKey: body.options?.rateLimitKey, queue: queueName, lockedQueueId, workerQueue, diff --git a/apps/webapp/app/v3/runQueue.server.ts b/apps/webapp/app/v3/runQueue.server.ts index e7aa13c5c5..b532d49340 100644 --- a/apps/webapp/app/v3/runQueue.server.ts +++ b/apps/webapp/app/v3/runQueue.server.ts @@ -2,8 +2,21 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { marqs } from "./marqs/index.server"; import { engine } from "./runEngine.server"; +// Re-export pure utility function from durations.ts (testable without env deps) +export { parseDurationToMs } from "./utils/durations"; + //This allows us to update MARQS and the RunQueue +/** Rate limit configuration for a queue */ +export type QueueRateLimitConfig = { + /** Maximum number of requests allowed in the period */ + limit: number; + /** Time window in milliseconds */ + periodMs: number; + /** Optional burst allowance (defaults to limit) */ + burst?: number; +}; + /** Updates MARQS and the RunQueue limits */ export async function updateEnvConcurrencyLimits( environment: AuthenticatedEnvironment, @@ -42,3 +55,20 @@ export async function removeQueueConcurrencyLimits( engine.runQueue.removeQueueConcurrencyLimits(environment, queueName), ]); } + +/** Updates the rate limit configuration for a queue in Redis */ +export async function updateQueueRateLimitConfig( + environment: AuthenticatedEnvironment, + queueName: string, + config: QueueRateLimitConfig +) { + await engine.runQueue.setQueueRateLimitConfig(environment, queueName, config); +} + +/** Removes the rate limit configuration for a queue from Redis */ +export async function removeQueueRateLimitConfig( + environment: AuthenticatedEnvironment, + queueName: string +) { + await engine.runQueue.removeQueueRateLimitConfig(environment, queueName); +} diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 41fbf2afe2..48073f92f1 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -9,14 +9,19 @@ import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database"; import cronstrue from "cronstrue"; import { Prisma, PrismaClientOrTransaction } from "~/db.server"; +import { env } from "~/env.server"; import { sanitizeQueueName } from "~/models/taskQueue.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { + parseDurationToMs, removeQueueConcurrencyLimits, + removeQueueRateLimitConfig, updateEnvConcurrencyLimits, updateQueueConcurrencyLimits, + updateQueueRateLimitConfig, + type QueueRateLimitConfig, } from "../runQueue.server"; import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion"; import { clampMaxDuration } from "../utils/maxDuration"; @@ -250,11 +255,12 @@ async function createWorkerTask( let queue = queues.find((queue) => queue.name === task.queue?.name); if (!queue) { - // Create a TaskQueue + // Create a TaskQueue with rate limit config if provided queue = await createWorkerQueue( { name: task.queue?.name ?? `task/${task.id}`, concurrencyLimit: task.queue?.concurrencyLimit, + rateLimit: task.queue?.rateLimit, }, task.id, task.queue?.name ? "NAMED" : "VIRTUAL", @@ -364,9 +370,28 @@ async function createWorkerQueue( ? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0) : queue.concurrencyLimit; + // Parse rate limit config if provided + let rateLimitConfig: QueueRateLimitConfig | null = null; + if (queue.rateLimit) { + try { + rateLimitConfig = { + limit: queue.rateLimit.limit, + periodMs: parseDurationToMs(queue.rateLimit.period), + burst: queue.rateLimit.burst, + }; + } catch (error) { + logger.error("createWorkerQueue: invalid rate limit period format", { + queueName, + rateLimit: queue.rateLimit, + error, + }); + } + } + const taskQueue = await upsertWorkerQueueRecord( queueName, baseConcurrencyLimit ?? null, + rateLimitConfig, orderableName, queueType, worker, @@ -376,6 +401,7 @@ async function createWorkerQueue( const newConcurrencyLimit = taskQueue.concurrencyLimit; if (!taskQueue.paused) { + // Handle concurrency limit sync if (typeof newConcurrencyLimit === "number") { logger.debug("createWorkerQueue: updating concurrency limit", { workerId: worker.id, @@ -397,8 +423,36 @@ async function createWorkerQueue( }); await removeQueueConcurrencyLimits(environment, taskQueue.name); } + + // Handle rate limit config sync to Redis + if (env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1") { + // Rate limiting disabled: remove any existing config from Redis + // This ensures clean state when toggling the flag + logger.debug("createWorkerQueue: rate limiting disabled by env flag, removing config", { + workerId: worker.id, + taskQueue: taskQueue.name, + orgId: environment.organizationId, + projectId: environment.projectId, + environmentId: environment.id, + }); + await removeQueueRateLimitConfig(environment, taskQueue.name); + } else if (rateLimitConfig) { + // Rate limiting enabled and config exists: sync to Redis + logger.debug("createWorkerQueue: updating rate limit config", { + workerId: worker.id, + taskQueue: taskQueue.name, + orgId: environment.organizationId, + projectId: environment.projectId, + environmentId: environment.id, + rateLimitConfig, + }); + await updateQueueRateLimitConfig(environment, taskQueue.name, rateLimitConfig); + } else { + // Rate limiting enabled but no config: remove any stale config + await removeQueueRateLimitConfig(environment, taskQueue.name); + } } else { - logger.debug("createWorkerQueue: queue is paused, not updating concurrency limit", { + logger.debug("createWorkerQueue: queue is paused, not updating limits", { workerId: worker.id, taskQueue, orgId: environment.organizationId, @@ -413,6 +467,7 @@ async function createWorkerQueue( async function upsertWorkerQueueRecord( queueName: string, concurrencyLimit: number | null, + rateLimitConfig: QueueRateLimitConfig | null, orderableName: string, queueType: TaskQueueType, worker: BackgroundWorker, @@ -431,6 +486,15 @@ async function upsertWorkerQueueRecord( }, }); + // Serialize rate limit config for storage (or null to clear) + const rateLimitData = rateLimitConfig + ? { + limit: rateLimitConfig.limit, + periodMs: rateLimitConfig.periodMs, + burst: rateLimitConfig.burst, + } + : Prisma.JsonNull; + if (!taskQueue) { taskQueue = await prisma.taskQueue.create({ data: { @@ -439,6 +503,7 @@ async function upsertWorkerQueueRecord( name: queueName, orderableName, concurrencyLimit, + rateLimit: rateLimitData, runtimeEnvironmentId: worker.runtimeEnvironmentId, projectId: worker.projectId, type: queueType, @@ -463,6 +528,8 @@ async function upsertWorkerQueueRecord( // If overridden, keep current limit and update base; otherwise update limit normally concurrencyLimit: hasOverride ? undefined : concurrencyLimit, concurrencyLimitBase: hasOverride ? concurrencyLimit : undefined, + // Always update rate limit config (not overrideable for now) + rateLimit: rateLimitData, }, }); } @@ -474,6 +541,7 @@ async function upsertWorkerQueueRecord( return await upsertWorkerQueueRecord( queueName, concurrencyLimit, + rateLimitConfig, orderableName, queueType, worker, diff --git a/apps/webapp/app/v3/services/enqueueRun.server.ts b/apps/webapp/app/v3/services/enqueueRun.server.ts index cb091d70d9..9b3114c88a 100644 --- a/apps/webapp/app/v3/services/enqueueRun.server.ts +++ b/apps/webapp/app/v3/services/enqueueRun.server.ts @@ -7,6 +7,7 @@ export type EnqueueRunOptions = { env: AuthenticatedEnvironment; run: TaskRun; dependentRun?: { queue: string; id: string }; + rateLimitKey?: string; }; export type EnqueueRunResult = @@ -22,6 +23,7 @@ export async function enqueueRun({ env, run, dependentRun, + rateLimitKey, }: EnqueueRunOptions): Promise { // If this is a triggerAndWait or batchTriggerAndWait, // we need to add the parent run to the reserve concurrency set @@ -39,6 +41,8 @@ export async function enqueueRun({ projectId: env.projectId, environmentId: env.id, environmentType: env.type, + // Include rateLimitKey in message payload for dequeue-time checks + rateLimitKey, }, run.concurrencyKey ?? undefined, run.queueTimestamp ?? undefined, diff --git a/apps/webapp/app/v3/utils/durations.ts b/apps/webapp/app/v3/utils/durations.ts new file mode 100644 index 0000000000..ff7a1745d0 --- /dev/null +++ b/apps/webapp/app/v3/utils/durations.ts @@ -0,0 +1,32 @@ +/** + * Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds. + * @throws Error if the duration string is invalid + */ +export function parseDurationToMs(duration: string): number { + const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/); + + if (!match) { + throw new Error( + `Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)` + ); + } + + const [, value, unit] = match; + const numValue = parseFloat(value); + + switch (unit) { + case "ms": + return Math.round(numValue); + case "s": + return Math.round(numValue * 1000); + case "m": + return Math.round(numValue * 60 * 1000); + case "h": + return Math.round(numValue * 60 * 60 * 1000); + case "d": + return Math.round(numValue * 24 * 60 * 60 * 1000); + default: + throw new Error(`Unknown duration unit: ${unit}`); + } +} + diff --git a/apps/webapp/test/parseDurationToMs.test.ts b/apps/webapp/test/parseDurationToMs.test.ts new file mode 100644 index 0000000000..6dbe1476b4 --- /dev/null +++ b/apps/webapp/test/parseDurationToMs.test.ts @@ -0,0 +1,47 @@ +import { describe, it, expect } from "vitest"; +import { parseDurationToMs } from "~/v3/utils/durations"; + +describe("parseDurationToMs", () => { + it("parses milliseconds", () => { + expect(parseDurationToMs("100ms")).toBe(100); + expect(parseDurationToMs("1500ms")).toBe(1500); + expect(parseDurationToMs("0ms")).toBe(0); + }); + + it("parses seconds", () => { + expect(parseDurationToMs("1s")).toBe(1000); + expect(parseDurationToMs("30s")).toBe(30000); + expect(parseDurationToMs("1.5s")).toBe(1500); + expect(parseDurationToMs("0.5s")).toBe(500); + }); + + it("parses minutes", () => { + expect(parseDurationToMs("1m")).toBe(60000); + expect(parseDurationToMs("5m")).toBe(300000); + expect(parseDurationToMs("0.5m")).toBe(30000); + }); + + it("parses hours", () => { + expect(parseDurationToMs("1h")).toBe(3600000); + expect(parseDurationToMs("24h")).toBe(86400000); + expect(parseDurationToMs("0.5h")).toBe(1800000); + }); + + it("parses days", () => { + expect(parseDurationToMs("1d")).toBe(86400000); + expect(parseDurationToMs("7d")).toBe(604800000); + }); + + it("throws on invalid format", () => { + expect(() => parseDurationToMs("invalid")).toThrow(); + expect(() => parseDurationToMs("1x")).toThrow(); + expect(() => parseDurationToMs("")).toThrow(); + expect(() => parseDurationToMs("ms")).toThrow(); + expect(() => parseDurationToMs("10")).toThrow(); + }); + + it("throws on negative values (invalid regex)", () => { + expect(() => parseDurationToMs("-1s")).toThrow(); + }); +}); + diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 5207ada480..7e6cafdac7 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -656,6 +656,7 @@ model TaskRun { priorityMs Int @default(0) concurrencyKey String? + rateLimitKey String? delayUntil DateTime? queuedAt DateTime? diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 9bd495f327..3465eedbd9 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -395,6 +395,7 @@ export class RunEngine { sdkVersion, cliVersion, concurrencyKey, + rateLimitKey, workerQueue, queue, lockedQueueId, @@ -471,6 +472,7 @@ export class RunEngine { sdkVersion, cliVersion, concurrencyKey, + rateLimitKey, queue, lockedQueueId, workerQueue, @@ -578,6 +580,7 @@ export class RunEngine { if (taskRun.delayUntil) { // Schedule the run to be enqueued at the delayUntil time + // Note: rateLimitKey is not passed for delayed runs - it will need to be stored on the run if needed await this.delayedRunSystem.scheduleDelayedRunEnqueuing({ runId: taskRun.id, delayUntil: taskRun.delayUntil, @@ -594,6 +597,7 @@ export class RunEngine { runnerId, tx: prisma, skipRunLock: true, + rateLimitKey, }); } diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 395e44727c..e5eccc7674 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -34,6 +34,7 @@ export class EnqueueSystem { workerId, runnerId, skipRunLock, + rateLimitKey, }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; @@ -53,6 +54,7 @@ export class EnqueueSystem { workerId?: string; runnerId?: string; skipRunLock?: boolean; + rateLimitKey?: string; }) { const prisma = tx ?? this.$.prisma; @@ -81,6 +83,12 @@ export class EnqueueSystem { const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs; + // IMPORTANT: Use provided rateLimitKey or fall back to the one stored on the run. + // This ensures re-enqueued runs (checkpoint, delay, waitpoint, pendingVersion) + // maintain their original rate limit bucket. Future callers should rely on this + // fallback rather than passing rateLimitKey explicitly for re-enqueue scenarios. + const effectiveRateLimitKey = rateLimitKey ?? run.rateLimitKey ?? undefined; + await this.$.runQueue.enqueueMessage({ env, workerQueue, @@ -93,6 +101,7 @@ export class EnqueueSystem { environmentType: env.type, queue: run.queue, concurrencyKey: run.concurrencyKey ?? undefined, + rateLimitKey: effectiveRateLimitKey, timestamp, attempt: 0, }, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index bdc6da4152..c10c64e0c3 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -128,6 +128,7 @@ export type TriggerParams = { sdkVersion?: string; cliVersion?: string; concurrencyKey?: string; + rateLimitKey?: string; workerQueue?: string; queue: string; lockedQueueId?: string; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 5127ec3c75..cbfa3c689e 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -323,6 +323,44 @@ export class RunQueue { return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue)); } + /** + * Set rate limit configuration for a queue in Redis. + * Config is stored as JSON with 7-day TTL (refreshed on deploy). + */ + public async setQueueRateLimitConfig( + env: MinimalAuthenticatedEnvironment, + queue: string, + config: { limit: number; periodMs: number; burst?: number } + ) { + const key = this.keys.queueRateLimitConfigKey(env, queue); + // Store with 7-day TTL, refreshed on each deploy + return this.redis.set(key, JSON.stringify(config), "EX", 86400 * 7); + } + + /** + * Remove rate limit configuration for a queue from Redis. + */ + public async removeQueueRateLimitConfig(env: MinimalAuthenticatedEnvironment, queue: string) { + return this.redis.del(this.keys.queueRateLimitConfigKey(env, queue)); + } + + /** + * Get rate limit configuration for a queue. + */ + public async getQueueRateLimitConfig( + env: MinimalAuthenticatedEnvironment, + queue: string + ): Promise<{ limit: number; periodMs: number; burst?: number } | undefined> { + const result = await this.redis.get(this.keys.queueRateLimitConfigKey(env, queue)); + if (!result) return undefined; + + try { + return JSON.parse(result); + } catch { + return undefined; + } + } + public async getQueueConcurrencyLimit(env: MinimalAuthenticatedEnvironment, queue: string) { const result = await this.redis.get(this.keys.queueConcurrencyLimitKey(env, queue)); diff --git a/internal-packages/run-engine/src/run-queue/keyProducer.ts b/internal-packages/run-engine/src/run-queue/keyProducer.ts index cff3b78af7..afd8bcdbc1 100644 --- a/internal-packages/run-engine/src/run-queue/keyProducer.ts +++ b/internal-packages/run-engine/src/run-queue/keyProducer.ts @@ -17,6 +17,8 @@ const constants = { DEAD_LETTER_QUEUE_PART: "deadLetter", MASTER_QUEUE_PART: "masterQueue", WORKER_QUEUE_PART: "workerQueue", + RATE_LIMIT_PART: "rl", + RATE_LIMIT_CONFIG_PART: "rl:config", } as const; export class RunQueueFullKeyProducer implements RunQueueKeyProducer { @@ -301,6 +303,60 @@ export class RunQueueFullKeyProducer implements RunQueueKeyProducer { return `*:${constants.ENV_PART}:*:queue:*:${constants.CURRENT_CONCURRENCY_PART}`; } + /** + * Key for storing rate limit configuration for a queue. + * Pattern: {org:X}:proj:Y:env:Z:queue:Q:rl:config + */ + queueRateLimitConfigKey(env: RunQueueKeyProducerEnvironment, queue: string): string { + return [this.queueKeyBase(env, queue), constants.RATE_LIMIT_CONFIG_PART].join(":"); + } + + /** + * Key for the GCRA rate limit bucket for a queue. + * If rateLimitKey is provided, creates a separate bucket per key (per-tenant). + * Pattern: {org:X}:proj:Y:env:Z:queue:Q:rl[:key] + */ + queueRateLimitBucketKey( + env: RunQueueKeyProducerEnvironment, + queue: string, + rateLimitKey?: string + ): string { + const base = [this.queueKeyBase(env, queue), constants.RATE_LIMIT_PART].join(":"); + return rateLimitKey ? `${base}:${rateLimitKey}` : base; + } + + /** + * Get rate limit config key from a queue key. + * Strips concurrency key suffix if present. + */ + queueRateLimitConfigKeyFromQueue(queue: string): string { + // Remove concurrency key suffix to get base queue + const baseQueue = queue.replace(/:ck:.+$/, ""); + return `${baseQueue}:${constants.RATE_LIMIT_CONFIG_PART}`; + } + + /** + * Get rate limit bucket key from a queue key. + */ + queueRateLimitBucketKeyFromQueue(queue: string, rateLimitKey?: string): string { + // Remove concurrency key suffix to get base queue + const baseQueue = queue.replace(/:ck:.+$/, ""); + const base = `${baseQueue}:${constants.RATE_LIMIT_PART}`; + return rateLimitKey ? `${base}:${rateLimitKey}` : base; + } + + /** + * Helper to get the base queue key (without concurrency key). + */ + private queueKeyBase(env: RunQueueKeyProducerEnvironment, queue: string): string { + return [ + this.orgKeySection(env.organization.id), + this.projKeySection(env.project.id), + this.envKeySection(env.id), + this.queueSection(queue), + ].join(":"); + } + descriptorFromQueue(queue: string): QueueDescriptor { const parts = queue.split(":"); return { diff --git a/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts b/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts index bf4ed87f29..fb937568b3 100644 --- a/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts @@ -126,4 +126,69 @@ describe("RunQueue.enqueueMessage", () => { await queue.quit(); } }); + + redisTest("should enqueue message with rateLimitKey", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const messageWithRateLimit: InputPayload = { + runId: "r-ratelimit-test", + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e4321", + environmentType: "DEVELOPMENT", + queue: "task/my-task", + timestamp: Date.now(), + attempt: 0, + rateLimitKey: "tenant-123", + }; + + // Initial queue length + const initialLength = await queue.lengthOfQueue( + authenticatedEnvDev, + messageWithRateLimit.queue + ); + expect(initialLength).toBe(0); + + // Enqueue message with rateLimitKey + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageWithRateLimit, + workerQueue: authenticatedEnvDev.id, + }); + + // Verify queue length increased + const newLength = await queue.lengthOfQueue(authenticatedEnvDev, messageWithRateLimit.queue); + expect(newLength).toBe(1); + + await setTimeout(1000); + + // Dequeue and verify rateLimitKey is preserved + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_ratelimit", + authenticatedEnvDev.id + ); + assertNonNullable(dequeued); + expect(dequeued.messageId).toEqual(messageWithRateLimit.runId); + expect(dequeued.message.rateLimitKey).toEqual("tenant-123"); + } finally { + await queue.quit(); + } + }); }); diff --git a/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts b/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts index 8b980749ea..90cda1509d 100644 --- a/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts @@ -359,4 +359,76 @@ describe("KeyProducer", () => { concurrencyKey: "c1234", }); }); + + // Rate Limit Key Tests + it("queueRateLimitConfigKey", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.queueRateLimitConfigKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:config"); + }); + + it("queueRateLimitBucketKey (without rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.queueRateLimitBucketKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl"); + }); + + it("queueRateLimitBucketKey (with rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.queueRateLimitBucketKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name", + "tenant-123" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:tenant-123"); + }); + + it("queueRateLimitConfigKeyFromQueue (strips concurrency key)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ck:c1234"; + const key = keyProducer.queueRateLimitConfigKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:config"); + }); + + it("queueRateLimitConfigKeyFromQueue (no concurrency key)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name"; + const key = keyProducer.queueRateLimitConfigKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:config"); + }); + + it("queueRateLimitBucketKeyFromQueue (with rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ck:c1234"; + const key = keyProducer.queueRateLimitBucketKeyFromQueue(queueKey, "tenant-456"); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:tenant-456"); + }); + + it("queueRateLimitBucketKeyFromQueue (without rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name"; + const key = keyProducer.queueRateLimitBucketKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl"); + }); }); diff --git a/internal-packages/run-engine/src/run-queue/types.ts b/internal-packages/run-engine/src/run-queue/types.ts index ee1ce41b79..bde5266eea 100644 --- a/internal-packages/run-engine/src/run-queue/types.ts +++ b/internal-packages/run-engine/src/run-queue/types.ts @@ -11,6 +11,7 @@ export const InputPayload = z.object({ environmentType: z.nativeEnum(RuntimeEnvironmentType), queue: z.string(), concurrencyKey: z.string().optional(), + rateLimitKey: z.string().optional(), timestamp: z.number(), attempt: z.number(), }); @@ -120,6 +121,16 @@ export interface RunQueueKeyProducer { // Concurrency sweeper methods markedForAckKey(): string; currentConcurrencySetKeyScanPattern(): string; + + // Rate limiting keys + queueRateLimitConfigKey(env: RunQueueKeyProducerEnvironment, queue: string): string; + queueRateLimitBucketKey( + env: RunQueueKeyProducerEnvironment, + queue: string, + rateLimitKey?: string + ): string; + queueRateLimitConfigKeyFromQueue(queue: string): string; + queueRateLimitBucketKeyFromQueue(queue: string, rateLimitKey?: string): string; } export type EnvQueues = { diff --git a/packages/core/src/v3/schemas/__tests__/rateLimit.test.ts b/packages/core/src/v3/schemas/__tests__/rateLimit.test.ts new file mode 100644 index 0000000000..5889116e31 --- /dev/null +++ b/packages/core/src/v3/schemas/__tests__/rateLimit.test.ts @@ -0,0 +1,115 @@ +import { describe, it, expect } from "vitest"; +import { QueueRateLimitConfigSchema, DurationStringSchema } from "../schemas.js"; + +describe("DurationStringSchema", () => { + it("validates milliseconds", () => { + expect(DurationStringSchema.safeParse("100ms").success).toBe(true); + expect(DurationStringSchema.safeParse("1500ms").success).toBe(true); + }); + + it("validates seconds", () => { + expect(DurationStringSchema.safeParse("1s").success).toBe(true); + expect(DurationStringSchema.safeParse("30s").success).toBe(true); + expect(DurationStringSchema.safeParse("1.5s").success).toBe(true); + }); + + it("validates minutes", () => { + expect(DurationStringSchema.safeParse("1m").success).toBe(true); + expect(DurationStringSchema.safeParse("60m").success).toBe(true); + }); + + it("validates hours", () => { + expect(DurationStringSchema.safeParse("1h").success).toBe(true); + expect(DurationStringSchema.safeParse("24h").success).toBe(true); + }); + + it("validates days", () => { + expect(DurationStringSchema.safeParse("1d").success).toBe(true); + expect(DurationStringSchema.safeParse("7d").success).toBe(true); + }); + + it("rejects invalid formats", () => { + expect(DurationStringSchema.safeParse("invalid").success).toBe(false); + expect(DurationStringSchema.safeParse("1x").success).toBe(false); + expect(DurationStringSchema.safeParse("").success).toBe(false); + expect(DurationStringSchema.safeParse("ms").success).toBe(false); + expect(DurationStringSchema.safeParse("10").success).toBe(false); + expect(DurationStringSchema.safeParse("-1s").success).toBe(false); + }); +}); + +describe("QueueRateLimitConfigSchema", () => { + it("parses valid config with all fields", () => { + const result = QueueRateLimitConfigSchema.safeParse({ + limit: 100, + period: "1m", + burst: 20, + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.limit).toBe(100); + expect(result.data.period).toBe("1m"); + expect(result.data.burst).toBe(20); + } + }); + + it("parses config without optional burst", () => { + const result = QueueRateLimitConfigSchema.safeParse({ + limit: 10, + period: "1s", + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.limit).toBe(10); + expect(result.data.period).toBe("1s"); + expect(result.data.burst).toBeUndefined(); + } + }); + + it("accepts various period formats", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "100ms" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "30s" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "5m" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1h" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1d" }).success).toBe(true); + }); + + it("rejects invalid period formats", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1x" }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "invalid" }).success).toBe( + false + ); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "" }).success).toBe(false); + }); + + it("requires positive limit", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 0, period: "1s" }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ limit: -1, period: "1s" }).success).toBe(false); + }); + + it("requires limit to be an integer", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10.5, period: "1s" }).success).toBe(false); + }); + + it("requires burst to be a positive integer when provided", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: 0 }).success).toBe( + false + ); + expect( + QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: -1 }).success + ).toBe(false); + expect( + QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: 5.5 }).success + ).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: 5 }).success).toBe( + true + ); + }); + + it("rejects missing required fields", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10 }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ period: "1s" }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({}).success).toBe(false); + }); +}); + diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 1782683969..836c61086d 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -188,6 +188,11 @@ export const TriggerTaskRequestBody = z.object({ }) .optional(), concurrencyKey: z.string().optional(), + /** + * Rate limit key for per-tenant/per-user rate limiting. + * Creates a separate rate limit bucket for every unique value. + */ + rateLimitKey: z.string().optional(), delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), @@ -230,6 +235,11 @@ export const BatchTriggerTaskItem = z.object({ options: z .object({ concurrencyKey: z.string().optional(), + /** + * Rate limit key for per-tenant/per-user rate limiting. + * Creates a separate rate limit bucket for every unique value. + */ + rateLimitKey: z.string().optional(), delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 233068c0b7..5b6ac61663 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -102,6 +102,19 @@ export const RateLimitOptions = z.discriminatedUnion("type", [ export type RateLimitOptions = z.infer; +// Duration string format: number + unit (ms, s, m, h, d) +export const DurationStringSchema = z.string().regex(/^\d+(\.\d+)?(ms|s|m|h|d)$/); +export type DurationString = z.infer; + +// Queue Rate limit configuration schema (uses DurationStringSchema for period validation) +export const QueueRateLimitConfigSchema = z.object({ + limit: z.number().int().positive(), + period: DurationStringSchema, + burst: z.number().int().positive().optional(), +}); + +export type QueueRateLimitConfig = z.infer; + export const RetryOptions = z.object({ /** The number of attempts before giving up */ maxAttempts: z.number().int().optional(), @@ -133,6 +146,20 @@ export const RetryOptions = z.object({ export type RetryOptions = z.infer; +/** Rate limit configuration using GCRA (Generic Cell Rate Algorithm) */ +export const RateLimitConfig = z.object({ + /** Maximum number of requests allowed in the period */ + limit: z.number().int().min(1).max(100000), + /** Time window - must be a valid duration string (e.g., "1s", "100ms", "5m", "1h") */ + period: z.string().regex(/^\d+(\.\d+)?(ms|s|m|h|d)$/, { + message: 'Period must be a valid duration string (e.g., "1s", "100ms", "5m", "1h")', + }), + /** Optional burst allowance - allows temporary exceeding of rate limit (defaults to limit) */ + burst: z.number().int().min(1).optional(), +}); + +export type RateLimitConfig = z.infer; + export const QueueManifest = z.object({ /** You can define a shared queue and then pass the name in to your task. * @@ -170,6 +197,21 @@ export const QueueManifest = z.object({ * * If this property is omitted, the task can potentially use up the full concurrency of an environment */ concurrencyLimit: z.number().int().min(0).max(100000).optional().nullable(), + /** Optional rate limit configuration for controlling request frequency. + * + * Unlike concurrencyLimit (which controls how many tasks run at once), + * rateLimit controls how frequently tasks can be dequeued. + * + * @example + * ```ts + * // Limit to 10 requests per second + * rateLimit: { limit: 10, period: "1s" } + * + * // Limit to 100 requests per minute with burst allowance + * rateLimit: { limit: 100, period: "1m", burst: 150 } + * ``` + */ + rateLimit: RateLimitConfig.optional(), }); export type QueueManifest = z.infer; diff --git a/packages/core/src/v3/types/queues.ts b/packages/core/src/v3/types/queues.ts index 9e87f136e2..d6f2772c78 100644 --- a/packages/core/src/v3/types/queues.ts +++ b/packages/core/src/v3/types/queues.ts @@ -35,4 +35,40 @@ export type QueueOptions = { * * If this property is omitted, the task can potentially use up the full concurrency of an environment */ concurrencyLimit?: number; + /** Rate limit configuration for controlling request frequency. + * + * Unlike concurrencyLimit (which controls how many tasks run at once), + * rateLimit controls how frequently tasks can be dequeued. + * + * @example + * ```ts + * const rateLimitedQueue = queue({ + * name: "api-calls", + * rateLimit: { + * limit: 10, + * period: "1s", + * }, + * }); + * + * // Per-tenant rate limiting + * const perTenantQueue = queue({ + * name: "per-tenant-api", + * rateLimit: { + * limit: 100, + * period: "1m", + * key: (payload) => payload.tenantId, + * }, + * }); + * ``` + */ + rateLimit?: { + /** Maximum number of requests allowed in the period */ + limit: number; + /** Time window as a duration string (e.g., "1s", "100ms", "5m", "1h") */ + period: string; + /** Optional burst allowance (defaults to limit) */ + burst?: number; + /** Optional function to derive rate limit key from payload (evaluated at trigger time) */ + key?: (payload: any) => string; + }; }; diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index 857f0cc2f3..a27fe6c98a 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -220,10 +220,64 @@ type CommonTaskOptions< }, }); * ``` + * + * @example + * rate limiting to 10 requests per second + * + * ```ts + * export const rateLimitedTask = task({ + id: "rate-limited", + queue: { + rateLimit: { + limit: 10, + period: "1s", + }, + }, + run: async ({ payload, ctx }) => { + //... + }, + }); + * ``` + * + * @example + * per-tenant rate limiting + * + * ```ts + * export const perTenantTask = task({ + id: "per-tenant", + queue: { + rateLimit: { + limit: 100, + period: "1m", + key: (payload) => payload.tenantId, + }, + }, + run: async ({ payload, ctx }) => { + //... + }, + }); + * ``` */ queue?: { name?: string; concurrencyLimit?: number; + /** Rate limit configuration for controlling request frequency. + * + * Unlike concurrencyLimit (which controls how many tasks run at once), + * rateLimit controls how frequently tasks can be dequeued. + */ + rateLimit?: { + /** Maximum number of requests allowed in the period */ + limit: number; + /** Time window as a duration string (e.g., "1s", "100ms", "5m", "1h") */ + period: string; + /** Optional burst allowance (defaults to limit) */ + burst?: number; + /** Optional function to derive rate limit key from payload (evaluated at trigger time). + * This allows per-tenant or per-user rate limiting. + */ + key?: (payload: TPayload) => string; + }; }; /** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on. * @@ -783,6 +837,20 @@ export type TriggerOptions = { */ concurrencyKey?: string; + /** + * The `rateLimitKey` creates a separate rate limit bucket for every unique value of the key. + * This allows per-tenant or per-user rate limiting. + * + * If the task has a rate limit key function defined, it will be evaluated at trigger time + * and the result will be used as the key. This option allows you to override that behavior. + * + * @example + * ```ts + * await myTask.trigger(payload, { rateLimitKey: `tenant-${tenantId}` }); + * ``` + */ + rateLimitKey?: string; + /** * The delay before the task is executed. This can be a string like "1h" or a Date object. * diff --git a/packages/trigger-sdk/package.json b/packages/trigger-sdk/package.json index 86a0a8a5f2..1311344d50 100644 --- a/packages/trigger-sdk/package.json +++ b/packages/trigger-sdk/package.json @@ -128,4 +128,4 @@ "main": "./dist/commonjs/v3/index.js", "types": "./dist/commonjs/v3/index.d.ts", "module": "./dist/esm/v3/index.js" -} \ No newline at end of file +} diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 1c5426ed06..959bfd03f6 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -129,7 +129,18 @@ export type Context = TaskRunContext; export { BatchTriggerError }; export function queue(options: QueueOptions): Queue { - resourceCatalog.registerQueueMetadata(options); + // Register with serializable metadata (strip key function from rateLimit) + resourceCatalog.registerQueueMetadata({ + name: options.name, + concurrencyLimit: options.concurrencyLimit, + rateLimit: options.rateLimit + ? { + limit: options.rateLimit.limit, + period: options.rateLimit.period, + burst: options.rateLimit.burst, + } + : undefined, + }); // @ts-expect-error options[Symbol.for("trigger.dev/queue")] = true; @@ -171,6 +182,11 @@ export function createTask< description: params.description, jsonSchema: params.jsonSchema, trigger: async (payload, options) => { + // Evaluate rate limit key function if defined + const rateLimitKey = + options?.rateLimitKey ?? + (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); + return await trigger_internal>( "trigger()", params.id, @@ -179,6 +195,7 @@ export function createTask< { queue: params.queue?.name, ...options, + rateLimitKey, } ); }, @@ -190,10 +207,16 @@ export function createTask< options, undefined, undefined, - params.queue?.name + params.queue?.name, + params.queue?.rateLimit?.key ); }, triggerAndWait: (payload, options, requestOptions) => { + // Evaluate rate limit key function if defined + const rateLimitKey = + options?.rateLimitKey ?? + (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); + return new TaskRunPromise((resolve, reject) => { triggerAndWait_internal( "triggerAndWait()", @@ -203,6 +226,7 @@ export function createTask< { queue: params.queue?.name, ...options, + rateLimitKey, }, requestOptions ) @@ -222,7 +246,8 @@ export function createTask< undefined, options, undefined, - params.queue?.name + params.queue?.name, + params.queue?.rateLimit?.key ); }, }; @@ -248,6 +273,14 @@ export function createTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, + // Only include serializable rateLimit config (without key function) + rateLimit: queue.rateLimit + ? { + limit: queue.rateLimit.limit, + period: queue.rateLimit.period, + burst: queue.rateLimit.burst, + } + : undefined, }); } @@ -302,6 +335,11 @@ export function createSchemaTask< description: params.description, schema: params.schema, trigger: async (payload, options, requestOptions) => { + // Evaluate rate limit key function if defined + const rateLimitKey = + options?.rateLimitKey ?? + (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); + return await trigger_internal, TOutput>>( "trigger()", params.id, @@ -310,6 +348,7 @@ export function createSchemaTask< { queue: params.queue?.name, ...options, + rateLimitKey, }, requestOptions ); @@ -322,10 +361,16 @@ export function createSchemaTask< options, parsePayload, requestOptions, - params.queue?.name + params.queue?.name, + params.queue?.rateLimit?.key ); }, triggerAndWait: (payload, options) => { + // Evaluate rate limit key function if defined + const rateLimitKey = + options?.rateLimitKey ?? + (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); + return new TaskRunPromise((resolve, reject) => { triggerAndWait_internal, TOutput>( "triggerAndWait()", @@ -335,6 +380,7 @@ export function createSchemaTask< { queue: params.queue?.name, ...options, + rateLimitKey, } ) .then((result) => { @@ -353,7 +399,8 @@ export function createSchemaTask< parsePayload, options, undefined, - params.queue?.name + params.queue?.name, + params.queue?.rateLimit?.key ); }, }; @@ -380,6 +427,14 @@ export function createSchemaTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, + // Only include serializable rateLimit config (without key function) + rateLimit: queue.rateLimit + ? { + limit: queue.rateLimit.limit, + period: queue.rateLimit.period, + burst: queue.rateLimit.burst, + } + : undefined, }); } @@ -1924,7 +1979,8 @@ async function* transformSingleTaskBatchItemsStream( items: AsyncIterable>, parsePayload: SchemaParseFn | undefined, options: BatchTriggerOptions | undefined, - queue: string | undefined + queue: string | undefined, + rateLimitKeyFn?: (payload: TPayload) => string | undefined ): AsyncIterable { let index = 0; for await (const item of items) { @@ -1935,6 +1991,10 @@ async function* transformSingleTaskBatchItemsStream( flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) ); + // Evaluate rate limit key for this item + const rateLimitKey = + item.options?.rateLimitKey ?? (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined); + yield { index: index++, task: taskIdentifier, @@ -1946,6 +2006,7 @@ async function* transformSingleTaskBatchItemsStream( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -1976,7 +2037,8 @@ async function* transformSingleTaskBatchItemsStreamForWait( items: AsyncIterable>, parsePayload: SchemaParseFn | undefined, options: BatchTriggerAndWaitOptions | undefined, - queue: string | undefined + queue: string | undefined, + rateLimitKeyFn?: (payload: TPayload) => string | undefined ): AsyncIterable { let index = 0; for await (const item of items) { @@ -1987,6 +2049,10 @@ async function* transformSingleTaskBatchItemsStreamForWait( flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) ); + // Evaluate rate limit key for this item + const rateLimitKey = + item.options?.rateLimitKey ?? (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined); + yield { index: index++, task: taskIdentifier, @@ -1999,6 +2065,7 @@ async function* transformSingleTaskBatchItemsStreamForWait( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2039,6 +2106,7 @@ async function trigger_internal( options: { queue: options?.queue ? { name: options.queue } : undefined, concurrencyKey: options?.concurrencyKey, + rateLimitKey: options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), @@ -2087,7 +2155,8 @@ async function batchTrigger_internal( options?: BatchTriggerOptions, parsePayload?: SchemaParseFn, requestOptions?: TriggerApiRequestOptions, - queue?: string + queue?: string, + rateLimitKeyFn?: (payload: TRunTypes["payload"]) => string | undefined ): Promise> { const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig); @@ -2106,6 +2175,11 @@ async function batchTrigger_internal( flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) ); + // Evaluate rate limit key for this item + const rateLimitKey = + item.options?.rateLimitKey ?? + (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined); + return { index, task: taskIdentifier, @@ -2117,6 +2191,7 @@ async function batchTrigger_internal( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2189,7 +2264,8 @@ async function batchTrigger_internal( asyncItems, parsePayload, options, - queue + queue, + rateLimitKeyFn ); // Execute streaming 2-phase batch @@ -2270,6 +2346,7 @@ async function triggerAndWait_internal, options?: BatchTriggerAndWaitOptions, requestOptions?: TriggerApiRequestOptions, - queue?: string + queue?: string, + rateLimitKeyFn?: (payload: TPayload) => string | undefined ): Promise> { const ctx = taskContext.ctx; @@ -2351,6 +2429,11 @@ async function batchTriggerAndWait_internal Date: Thu, 18 Dec 2025 01:17:54 +0100 Subject: [PATCH 2/5] feat(rate-limit): gcra and queues logic --- apps/webapp/app/v3/runEngine.server.ts | 1 + .../services/createBackgroundWorker.server.ts | 4 +- .../run-engine/src/engine/index.ts | 1 + .../run-engine/src/engine/types.ts | 1 + internal-packages/run-engine/src/index.ts | 10 + .../run-engine/src/rate-limiter/gcra.ts | 250 +++++++++++ .../run-engine/src/rate-limiter/index.ts | 10 + .../run-engine/src/run-queue/index.ts | 116 ++++- .../src/run-queue/tests/rateLimit.test.ts | 403 ++++++++++++++++++ 9 files changed, 778 insertions(+), 18 deletions(-) create mode 100644 internal-packages/run-engine/src/rate-limiter/gcra.ts create mode 100644 internal-packages/run-engine/src/rate-limiter/index.ts create mode 100644 internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index db1760755c..79d2974e63 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -80,6 +80,7 @@ function createRunEngine() { scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS, processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS, }, + disableRateLimits: env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1", }, runLock: { redis: { diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 48073f92f1..7e49b9d656 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -255,7 +255,7 @@ async function createWorkerTask( let queue = queues.find((queue) => queue.name === task.queue?.name); if (!queue) { - // Create a TaskQueue with rate limit config if provided + // Create a TaskQueue queue = await createWorkerQueue( { name: task.queue?.name ?? `task/${task.id}`, @@ -370,7 +370,6 @@ async function createWorkerQueue( ? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0) : queue.concurrencyLimit; - // Parse rate limit config if provided let rateLimitConfig: QueueRateLimitConfig | null = null; if (queue.rateLimit) { try { @@ -401,7 +400,6 @@ async function createWorkerQueue( const newConcurrencyLimit = taskQueue.concurrencyLimit; if (!taskQueue.paused) { - // Handle concurrency limit sync if (typeof newConcurrencyLimit === "number") { logger.debug("createWorkerQueue: updating concurrency limit", { workerId: worker.id, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 3465eedbd9..8bb265b0c5 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -179,6 +179,7 @@ export class RunEngine { masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs, processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs, dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds, + disableRateLimits: options.queue?.disableRateLimits, meter: options.meter, }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index c10c64e0c3..62696d611a 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -63,6 +63,7 @@ export type RunEngineOptions = { scanJitterInMs?: number; processMarkedJitterInMs?: number; }; + disableRateLimits?: boolean; }; runLock: { redis: RedisOptions; diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index 3f96045c13..be82af0c55 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -20,3 +20,13 @@ export type { ProcessBatchItemCallback, BatchCompletionCallback, } from "./batch-queue/types.js"; + +// Rate limiter exports +export { GCRARateLimiter, configToGCRAParams, parseDurationToMs } from "./rate-limiter/index.js"; +export type { + GCRARateLimiterOptions, + GCRAParams, + QueueRateLimitConfig, + StoredQueueRateLimitConfig, + RateLimitResult, +} from "./rate-limiter/index.js"; diff --git a/internal-packages/run-engine/src/rate-limiter/gcra.ts b/internal-packages/run-engine/src/rate-limiter/gcra.ts new file mode 100644 index 0000000000..f506b03842 --- /dev/null +++ b/internal-packages/run-engine/src/rate-limiter/gcra.ts @@ -0,0 +1,250 @@ +import type { Redis } from "@internal/redis"; + +/** + * Configuration for queue rate limiting (input format). + */ +export interface QueueRateLimitConfig { + /** Maximum number of requests allowed within the period */ + limit: number; + /** Time period in milliseconds */ + periodMs: number; + /** Optional burst capacity (defaults to 1) */ + burst?: number; +} + +/** + * Stored configuration for queue rate limiting (includes pre-calculated GCRA params). + * This is what gets stored in Redis and read by the Lua dequeue script. + */ +export interface StoredQueueRateLimitConfig extends QueueRateLimitConfig, GCRAParams {} + +/** + * GCRA parameters calculated from QueueRateLimitConfig. + * These are stored in Redis for use by the Lua dequeue script. + */ +export interface GCRAParams { + /** The minimum interval between requests in milliseconds */ + emissionInterval: number; + /** The burst tolerance in milliseconds */ + burstTolerance: number; + /** Key expiration in milliseconds */ + keyExpiration: number; +} + +/** + * Options for configuring the RateLimiter. + */ +export interface GCRARateLimiterOptions { + /** An instance of Redis. */ + redis: Redis; + /** + * A string prefix to namespace keys in Redis. + * Defaults to "ratelimit:". + */ + keyPrefix?: string; + /** + * The minimum interval between requests (the emission interval) in milliseconds. + * For example, 1000 ms for one request per second. + */ + emissionInterval: number; + /** + * The burst tolerance in milliseconds. This represents how much "credit" can be + * accumulated to allow short bursts beyond the average rate. + * For example, if you want to allow 3 requests in a burst with an emission interval of 1000 ms, + * you might set this to 3000. + */ + burstTolerance: number; + /** + * Expiration for the Redis key in milliseconds. + * Defaults to the larger of 60 seconds or (emissionInterval + burstTolerance). + */ + keyExpiration?: number; +} + +/** + * The result of a rate limit check. + */ +export interface RateLimitResult { + /** Whether the request is allowed. */ + allowed: boolean; + /** + * If not allowed, this is the number of milliseconds the caller should wait + * before retrying. + */ + retryAfter?: number; +} + +/** + * Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds. + * @throws Error if the duration string is invalid + */ +export function parseDurationToMs(duration: string): number { + const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/); + + if (!match) { + throw new Error( + `Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)` + ); + } + + const [, value, unit] = match; + const numValue = parseFloat(value); + + switch (unit) { + case "ms": + return Math.round(numValue); + case "s": + return Math.round(numValue * 1000); + case "m": + return Math.round(numValue * 60 * 1000); + case "h": + return Math.round(numValue * 60 * 60 * 1000); + case "d": + return Math.round(numValue * 24 * 60 * 60 * 1000); + default: + throw new Error(`Unknown duration unit: ${unit}`); + } +} + +/** + * Convert a QueueRateLimitConfig to GCRA parameters. + * + * @example + * // 10 requests per minute with burst of 3 + * configToGCRAParams({ limit: 10, periodMs: 60000, burst: 3 }) + * // => { emissionInterval: 6000, burstTolerance: 12000, keyExpiration: 60000 } + */ +export function configToGCRAParams(config: QueueRateLimitConfig): GCRAParams { + const emissionInterval = Math.ceil(config.periodMs / config.limit); + const burst = config.burst ?? 1; + // burst-1 because GCRA allows 1 request immediately, then burst-1 more within tolerance + const burstTolerance = (burst - 1) * emissionInterval; + const keyExpiration = Math.max(60_000, emissionInterval + burstTolerance); + return { emissionInterval, burstTolerance, keyExpiration }; +} + +/** + * A rate limiter using Redis and the Generic Cell Rate Algorithm (GCRA). + * + * The GCRA is implemented using a Lua script that runs atomically in Redis. + * + * When a request comes in, the algorithm: + * - Retrieves the current "Theoretical Arrival Time" (TAT) from Redis (or initializes it if missing). + * - If the current time is greater than or equal to the TAT, the request is allowed and the TAT is updated to now + emissionInterval. + * - Otherwise, if the current time plus the burst tolerance is at least the TAT, the request is allowed and the TAT is incremented. + * - If neither condition is met, the request is rejected and a Retry-After value is returned. + */ +export class GCRARateLimiter { + private redis: Redis; + private keyPrefix: string; + private emissionInterval: number; + private burstTolerance: number; + private keyExpiration: number; + + constructor(options: GCRARateLimiterOptions) { + this.redis = options.redis; + this.keyPrefix = options.keyPrefix || "gcra:ratelimit:"; + this.emissionInterval = options.emissionInterval; + this.burstTolerance = options.burstTolerance; + // Default expiration: at least 60 seconds or the sum of emissionInterval and burstTolerance + this.keyExpiration = + options.keyExpiration || Math.max(60_000, this.emissionInterval + this.burstTolerance); + + // Define a custom Redis command 'gcra' that implements the GCRA algorithm. + // Using defineCommand ensures the Lua script is loaded once and run atomically. + this.redis.defineCommand("gcra", { + numberOfKeys: 1, + lua: ` +--[[ + GCRA Lua script + KEYS[1] - The rate limit key (e.g. "ratelimit:") + ARGV[1] - Current time in ms (number) + ARGV[2] - Emission interval in ms (number) + ARGV[3] - Burst tolerance in ms (number) + ARGV[4] - Key expiration in ms (number) + + Returns: { allowedFlag, value } + allowedFlag: 1 if allowed, 0 if rate-limited. + value: 0 when allowed; if not allowed, the number of ms to wait. +]]-- + +local key = KEYS[1] +local now = tonumber(ARGV[1]) +local emission_interval = tonumber(ARGV[2]) +local burst_tolerance = tonumber(ARGV[3]) +local expire = tonumber(ARGV[4]) + +-- Get the stored Theoretical Arrival Time (TAT) or default to 0. +local tat = tonumber(redis.call("GET", key) or 0) +if tat == 0 then + tat = now +end + +local allowed, new_tat, retry_after + +if now >= tat then + -- No delay: request is on schedule. + new_tat = now + emission_interval + allowed = true +elseif (now + burst_tolerance) >= tat then + -- Within burst capacity: allow request. + new_tat = tat + emission_interval + allowed = true +else + -- Request exceeds the allowed burst; calculate wait time. + allowed = false + retry_after = tat - (now + burst_tolerance) +end + +if allowed then + redis.call("SET", key, new_tat, "PX", expire) + return {1, 0} +else + return {0, retry_after} +end +`, + }); + } + + /** + * Checks whether a request associated with the given identifier is allowed. + * + * @param identifier A unique string identifying the subject of rate limiting (e.g. user ID, IP address, or domain). + * @returns A promise that resolves to a RateLimitResult. + * + * @example + * const result = await rateLimiter.check('user:12345'); + * if (!result.allowed) { + * // Tell the client to retry after result.retryAfter milliseconds. + * } + */ + async check(identifier: string): Promise { + const key = `${this.keyPrefix}${identifier}`; + const now = Date.now(); + + try { + // Call the custom 'gcra' command. + // The script returns an array: [allowedFlag, value] + // - allowedFlag: 1 if allowed; 0 if rejected. + // - value: 0 when allowed; if rejected, the number of ms to wait before retrying. + const result: [number, number] = await (this.redis as any).gcra( + key, + now, + this.emissionInterval, + this.burstTolerance, + this.keyExpiration + ); + const allowed = result[0] === 1; + if (allowed) { + return { allowed: true }; + } else { + return { allowed: false, retryAfter: result[1] }; + } + } catch (error) { + // In a production system you might log the error and either + // allow the request (fail open) or deny it (fail closed). + // Here we choose to propagate the error. + throw error; + } + } +} diff --git a/internal-packages/run-engine/src/rate-limiter/index.ts b/internal-packages/run-engine/src/rate-limiter/index.ts new file mode 100644 index 0000000000..4d0a7e020a --- /dev/null +++ b/internal-packages/run-engine/src/rate-limiter/index.ts @@ -0,0 +1,10 @@ +export { + GCRARateLimiter, + configToGCRAParams, + parseDurationToMs, + type GCRARateLimiterOptions, + type GCRAParams, + type QueueRateLimitConfig, + type StoredQueueRateLimitConfig, + type RateLimitResult, +} from "./gcra.js"; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index cbfa3c689e..aabe840964 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -35,6 +35,11 @@ import { attributesFromAuthenticatedEnv, MinimalAuthenticatedEnvironment, } from "../shared/index.js"; +import { + configToGCRAParams, + type QueueRateLimitConfig, + type StoredQueueRateLimitConfig, +} from "../rate-limiter/index.js"; import { InputPayload, OutputPayload, @@ -92,6 +97,7 @@ export type RunQueueOptions = { processMarkedJitterInMs?: number; callback: ConcurrencySweeperCallback; }; + disableRateLimits?: boolean; }; export interface ConcurrencySweeperCallback { @@ -326,15 +332,24 @@ export class RunQueue { /** * Set rate limit configuration for a queue in Redis. * Config is stored as JSON with 7-day TTL (refreshed on deploy). + * Pre-calculates GCRA params so the Lua script doesn't need to parse duration strings. */ public async setQueueRateLimitConfig( env: MinimalAuthenticatedEnvironment, queue: string, - config: { limit: number; periodMs: number; burst?: number } + config: QueueRateLimitConfig ) { const key = this.keys.queueRateLimitConfigKey(env, queue); + const gcraParams = configToGCRAParams(config); + + // Store config with pre-calculated GCRA params for efficient Lua processing + const storedConfig = { + ...config, + ...gcraParams, + }; + // Store with 7-day TTL, refreshed on each deploy - return this.redis.set(key, JSON.stringify(config), "EX", 86400 * 7); + return this.redis.set(key, JSON.stringify(storedConfig), "EX", 86400 * 7); } /** @@ -350,12 +365,12 @@ export class RunQueue { public async getQueueRateLimitConfig( env: MinimalAuthenticatedEnvironment, queue: string - ): Promise<{ limit: number; periodMs: number; burst?: number } | undefined> { + ): Promise { const result = await this.redis.get(this.keys.queueRateLimitConfigKey(env, queue)); if (!result) return undefined; try { - return JSON.parse(result); + return JSON.parse(result) as StoredQueueRateLimitConfig; } catch { return undefined; } @@ -1601,7 +1616,8 @@ export class RunQueue { String(this.options.defaultEnvConcurrency), String(this.options.defaultEnvConcurrencyBurstFactor ?? 1), this.options.redis.keyPrefix ?? "", - String(maxCount) + String(maxCount), + this.options.disableRateLimits ? "1" : "0" ); if (!result) { @@ -2375,6 +2391,34 @@ local defaultEnvConcurrencyLimit = ARGV[3] local defaultEnvConcurrencyBurstFactor = ARGV[4] local keyPrefix = ARGV[5] local maxCount = tonumber(ARGV[6] or '1') +local rateLimitDisabled = ARGV[7] or '0' + +-- GCRA Rate Limit Check Function +-- Returns: allowed (boolean), retryAfter (number or nil) +local function gcra_check(bucket_key, now, emission_interval, burst_tolerance, expire) + local tat = tonumber(redis.call("GET", bucket_key) or 0) + if tat == 0 then + tat = now + end + + if now >= tat then + -- No delay, request is on schedule + redis.call("SET", bucket_key, now + emission_interval, "PX", expire) + return true, 0 + elseif (now + burst_tolerance) >= tat then + -- Within burst capacity + redis.call("SET", bucket_key, tat + emission_interval, "PX", expire) + return true, 0 + else + -- Exceeded rate limit + return false, tat - (now + burst_tolerance) + end +end + +-- Helper to get base queue key (strip :ck:* suffix for rate limit keys) +local function get_base_queue(queue) + return string.gsub(queue, ":ck:[^:]+$", "") +end -- Check current env concurrency against the limit local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') @@ -2405,6 +2449,18 @@ if actualMaxCount <= 0 then return nil end +-- Pre-fetch rate limit config if rate limiting is enabled +local rateLimitConfig = nil +if rateLimitDisabled ~= '1' then + local baseQueue = get_base_queue(queueName) + -- Use keyPrefix when building rate limit keys (ioredis adds prefix to stored keys) + local rateLimitConfigKey = keyPrefix .. baseQueue .. ':rl:config' + local configJson = redis.call('GET', rateLimitConfigKey) + if configJson then + rateLimitConfig = cjson.decode(configJson) + end +end + -- Attempt to dequeue messages up to actualMaxCount local messages = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'WITHSCORES', 'LIMIT', 0, actualMaxCount) @@ -2425,18 +2481,47 @@ for i = 1, #messages, 2 do local messagePayload = redis.call('GET', messageKey) if messagePayload then - -- Update concurrency - redis.call('ZREM', queueKey, messageId) - redis.call('ZREM', envQueueKey, messageId) - redis.call('SADD', queueCurrentConcurrencyKey, messageId) - redis.call('SADD', envCurrentConcurrencyKey, messageId) + -- Check rate limit BEFORE updating concurrency + local shouldDequeue = true - -- Add to results - table.insert(results, messageId) - table.insert(results, messageScore) - table.insert(results, messagePayload) + if rateLimitConfig then + local messageData = cjson.decode(messagePayload) + local rateLimitKey = messageData.rateLimitKey or 'global' + local baseQueue = get_base_queue(queueName) + -- Use keyPrefix for bucket key consistency + local bucketKey = keyPrefix .. baseQueue .. ':rl:' .. rateLimitKey + + local allowed, retryAfter = gcra_check( + bucketKey, + currentTime, + rateLimitConfig.emissionInterval, + rateLimitConfig.burstTolerance, + rateLimitConfig.keyExpiration + ) + + if not allowed then + -- Rate limited: re-score message with delay (retryAfter + jitter) + local jitter = math.random(0, 100) + local newScore = currentTime + retryAfter + jitter + redis.call('ZADD', queueKey, newScore, messageId) + shouldDequeue = false + end + end - dequeuedCount = dequeuedCount + 1 + if shouldDequeue then + -- Update concurrency + redis.call('ZREM', queueKey, messageId) + redis.call('ZREM', envQueueKey, messageId) + redis.call('SADD', queueCurrentConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + + -- Add to results + table.insert(results, messageId) + table.insert(results, messageScore) + table.insert(results, messagePayload) + + dequeuedCount = dequeuedCount + 1 + end end end @@ -2804,6 +2889,7 @@ declare module "@internal/redis" { defaultEnvConcurrencyBurstFactor: string, keyPrefix: string, maxCount: string, + rateLimitDisabled: string, callback?: Callback ): Result; diff --git a/internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts b/internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts new file mode 100644 index 0000000000..09d7635761 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts @@ -0,0 +1,403 @@ +import { assertNonNullable, redisTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; +import { describe, expect, vi } from "vitest"; +import { setTimeout } from "node:timers/promises"; +import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; +import { RunQueue } from "../index.js"; +import { RunQueueFullKeyProducer } from "../keyProducer.js"; +import { InputPayload } from "../types.js"; +import { Decimal } from "@trigger.dev/database"; + +const testOptions = { + name: "rq", + tracer: trace.getTracer("rq"), + workers: 1, + defaultEnvConcurrency: 25, + logger: new Logger("RunQueue", "warn"), + retryOptions: { + maxAttempts: 5, + factor: 1.1, + minTimeoutInMs: 100, + maxTimeoutInMs: 1_000, + randomize: true, + }, + keys: new RunQueueFullKeyProducer(), +}; + +const authenticatedEnvDev = { + id: "e1234", + type: "DEVELOPMENT" as const, + maximumConcurrencyLimit: 10, + concurrencyLimitBurstFactor: new Decimal(2.0), + project: { id: "p1234" }, + organization: { id: "o1234" }, +}; + +function createMessage( + runId: string, + queue: string = "task/my-task", + rateLimitKey?: string +): InputPayload { + return { + runId, + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e1234", + environmentType: "DEVELOPMENT", + queue, + timestamp: Date.now(), + attempt: 0, + rateLimitKey, + }; +} + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunQueue rate limiting", () => { + redisTest( + "basic rate limiting - respects limit when config exists", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-basic:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-basic:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set rate limit: 2 per 10 minutes (emissionInterval = 300000ms = 5 min) + // Using a long period ensures rate limit doesn't recover during test execution + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 2, + periodMs: 600000, // 10 minutes + burst: 2, + }); + + // Enqueue 5 messages + for (let i = 0; i < 5; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // Note: We don't verify initial queue length here because the background worker + // may have already started processing. The important test is rate limiting behavior. + + // Dequeue multiple times - with burst=2, only 2 should pass per burst window + const dequeued: (unknown | undefined)[] = []; + for (let i = 0; i < 5; i++) { + const msg = await queue.dequeueMessageFromWorkerQueue( + `test_rl_${i}`, + authenticatedEnvDev.id + ); + dequeued.push(msg); + } + + // Count how many were actually dequeued vs rate-limited + const dequeuedCount = dequeued.filter((d) => d !== undefined).length; + const rateLimitedCount = dequeued.filter((d) => d === undefined).length; + + // With burst=2 and 5 messages, at most 2 should be dequeued immediately + // (the exact count may vary due to background worker timing, but rate limiting should be active) + expect(dequeuedCount).toBeLessThanOrEqual(2); + expect(rateLimitedCount).toBeGreaterThanOrEqual(3); + + // Concurrency should not exceed burst limit + const concurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(concurrency).toBeLessThanOrEqual(2); + + // Rate-limited messages should still be in queue (rescheduled for later) + const remainingLength = await queue.lengthOfQueue(authenticatedEnvDev, "task/my-task"); + expect(remainingLength).toBeGreaterThan(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest("rate limiting disabled - all messages dequeued", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + disableRateLimits: true, // Rate limiting disabled + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-disabled:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-disabled:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set strict rate limit: 1 per minute + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 1, + periodMs: 60000, + burst: 1, + }); + + // Enqueue 3 messages + for (let i = 0; i < 3; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-disabled-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // All should be dequeued since rate limiting is disabled + const dequeued1 = await queue.dequeueMessageFromWorkerQueue( + "test_disabled_1", + authenticatedEnvDev.id + ); + const dequeued2 = await queue.dequeueMessageFromWorkerQueue( + "test_disabled_2", + authenticatedEnvDev.id + ); + const dequeued3 = await queue.dequeueMessageFromWorkerQueue( + "test_disabled_3", + authenticatedEnvDev.id + ); + + expect(dequeued1).not.toBeUndefined(); + expect(dequeued2).not.toBeUndefined(); + expect(dequeued3).not.toBeUndefined(); + + // All should be processed (concurrency = 3) + const concurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(concurrency).toBe(3); + } finally { + await queue.quit(); + } + }); + + redisTest( + "per-key rate limiting - separate buckets per rateLimitKey", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-perkey:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-perkey:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set rate limit: 1 per minute with burst of 1 + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 1, + periodMs: 60000, + burst: 1, + }); + + // Enqueue 2 messages for tenant-A + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-a1", "task/my-task", "tenant-A"), + workerQueue: authenticatedEnvDev.id, + }); + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-a2", "task/my-task", "tenant-A"), + workerQueue: authenticatedEnvDev.id, + }); + + // Enqueue 2 messages for tenant-B + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-b1", "task/my-task", "tenant-B"), + workerQueue: authenticatedEnvDev.id, + }); + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-b2", "task/my-task", "tenant-B"), + workerQueue: authenticatedEnvDev.id, + }); + + await setTimeout(500); + + // Dequeue all available + const dequeued1 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_1", + authenticatedEnvDev.id + ); + const dequeued2 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_2", + authenticatedEnvDev.id + ); + const dequeued3 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_3", + authenticatedEnvDev.id + ); + const dequeued4 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_4", + authenticatedEnvDev.id + ); + + // Should get 2 messages (1 from each tenant, since each has independent bucket) + const successfulDequeues = [dequeued1, dequeued2, dequeued3, dequeued4].filter( + (d) => d !== undefined + ); + expect(successfulDequeues.length).toBe(2); + + // Verify we got one from each tenant + const rateLimitKeys = successfulDequeues.map((d) => d!.message.rateLimitKey); + expect(rateLimitKeys).toContain("tenant-A"); + expect(rateLimitKeys).toContain("tenant-B"); + } finally { + await queue.quit(); + } + } + ); + + redisTest("no rate limit config - all messages dequeued normally", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-noconfig:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-noconfig:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // No rate limit config set + + // Enqueue 5 messages + for (let i = 0; i < 5; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-noconfig-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // All should be dequeued + const dequeued = []; + for (let i = 0; i < 5; i++) { + const d = await queue.dequeueMessageFromWorkerQueue( + `test_noconfig_${i}`, + authenticatedEnvDev.id + ); + if (d) dequeued.push(d); + } + + expect(dequeued.length).toBe(5); + } finally { + await queue.quit(); + } + }); + + redisTest("rate-limited messages do not increment concurrency", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-concurrency:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-concurrency:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set strict rate limit: 1 per minute, burst 1 + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 1, + periodMs: 60000, + burst: 1, + }); + + // Initial concurrency should be 0 + const initialConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(initialConcurrency).toBe(0); + + // Enqueue 3 messages + for (let i = 0; i < 3; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-conc-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // Try to dequeue multiple times + await queue.dequeueMessageFromWorkerQueue("test_conc_1", authenticatedEnvDev.id); + await queue.dequeueMessageFromWorkerQueue("test_conc_2", authenticatedEnvDev.id); + await queue.dequeueMessageFromWorkerQueue("test_conc_3", authenticatedEnvDev.id); + + // Only 1 should have passed rate limit, so concurrency should be 1 + const finalConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(finalConcurrency).toBe(1); + + // Queue should still have 2 messages (rescheduled for later) + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, "task/my-task"); + expect(queueLength).toBe(2); + } finally { + await queue.quit(); + } + }); +}); From 72c8ce0d445461dab5068cd3be20a12b0151c473 Mon Sep 17 00:00:00 2001 From: Guillermo Date: Thu, 18 Dec 2025 01:32:56 +0100 Subject: [PATCH 3/5] chore: add changeset for rate-limit feature --- .changeset/forty-houses-doubt.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/forty-houses-doubt.md diff --git a/.changeset/forty-houses-doubt.md b/.changeset/forty-houses-doubt.md new file mode 100644 index 0000000000..0f768b1dae --- /dev/null +++ b/.changeset/forty-houses-doubt.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": minor +"@trigger.dev/core": minor +--- + +Add GCRA rate limiting algorithm for task queue management From 602806577508527e50968255254ac52b8b939823 Mon Sep 17 00:00:00 2001 From: Guillermo Date: Thu, 18 Dec 2025 18:13:49 +0100 Subject: [PATCH 4/5] fix: remove rateLimit.key function, use rateLimitKey at trigger time --- packages/core/src/v3/types/queues.ts | 18 ++--- packages/core/src/v3/types/tasks.ts | 19 ++--- packages/trigger-sdk/src/v3/shared.ts | 107 ++++---------------------- 3 files changed, 35 insertions(+), 109 deletions(-) diff --git a/packages/core/src/v3/types/queues.ts b/packages/core/src/v3/types/queues.ts index d6f2772c78..26b69593cf 100644 --- a/packages/core/src/v3/types/queues.ts +++ b/packages/core/src/v3/types/queues.ts @@ -50,14 +50,14 @@ export type QueueOptions = { * }, * }); * - * // Per-tenant rate limiting - * const perTenantQueue = queue({ - * name: "per-tenant-api", - * rateLimit: { - * limit: 100, - * period: "1m", - * key: (payload) => payload.tenantId, - * }, + * // Per-tenant rate limiting - pass rateLimitKey at trigger time + * await myTask.trigger(payload, { + * rateLimitKey: `tenant-${payload.tenantId}`, + * }); + * + * // Also works with tasks.trigger() + * await tasks.trigger("my-task", payload, { + * rateLimitKey: `tenant-${tenantId}`, * }); * ``` */ @@ -68,7 +68,5 @@ export type QueueOptions = { period: string; /** Optional burst allowance (defaults to limit) */ burst?: number; - /** Optional function to derive rate limit key from payload (evaluated at trigger time) */ - key?: (payload: any) => string; }; }; diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index a27fe6c98a..30c63bbb8b 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -240,22 +240,27 @@ type CommonTaskOptions< * ``` * * @example - * per-tenant rate limiting + * per-tenant rate limiting - pass rateLimitKey at trigger time * * ```ts + * // Define task with rate limit * export const perTenantTask = task({ id: "per-tenant", queue: { rateLimit: { limit: 100, period: "1m", - key: (payload) => payload.tenantId, }, }, run: async ({ payload, ctx }) => { //... }, }); + * + * // Trigger with rateLimitKey option + * await perTenantTask.trigger(payload, { + * rateLimitKey: `tenant-${payload.tenantId}`, + * }); * ``` */ queue?: { @@ -273,10 +278,6 @@ type CommonTaskOptions< period: string; /** Optional burst allowance (defaults to limit) */ burst?: number; - /** Optional function to derive rate limit key from payload (evaluated at trigger time). - * This allows per-tenant or per-user rate limiting. - */ - key?: (payload: TPayload) => string; }; }; /** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on. @@ -841,12 +842,12 @@ export type TriggerOptions = { * The `rateLimitKey` creates a separate rate limit bucket for every unique value of the key. * This allows per-tenant or per-user rate limiting. * - * If the task has a rate limit key function defined, it will be evaluated at trigger time - * and the result will be used as the key. This option allows you to override that behavior. - * * @example * ```ts * await myTask.trigger(payload, { rateLimitKey: `tenant-${tenantId}` }); + * + * // Also works with tasks.trigger() + * await tasks.trigger("my-task", payload, { rateLimitKey: `tenant-${tenantId}` }); * ``` */ rateLimitKey?: string; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 959bfd03f6..a586f05660 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -129,17 +129,10 @@ export type Context = TaskRunContext; export { BatchTriggerError }; export function queue(options: QueueOptions): Queue { - // Register with serializable metadata (strip key function from rateLimit) resourceCatalog.registerQueueMetadata({ name: options.name, concurrencyLimit: options.concurrencyLimit, - rateLimit: options.rateLimit - ? { - limit: options.rateLimit.limit, - period: options.rateLimit.period, - burst: options.rateLimit.burst, - } - : undefined, + rateLimit: options.rateLimit, }); // @ts-expect-error @@ -182,11 +175,6 @@ export function createTask< description: params.description, jsonSchema: params.jsonSchema, trigger: async (payload, options) => { - // Evaluate rate limit key function if defined - const rateLimitKey = - options?.rateLimitKey ?? - (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); - return await trigger_internal>( "trigger()", params.id, @@ -195,7 +183,6 @@ export function createTask< { queue: params.queue?.name, ...options, - rateLimitKey, } ); }, @@ -207,16 +194,10 @@ export function createTask< options, undefined, undefined, - params.queue?.name, - params.queue?.rateLimit?.key + params.queue?.name ); }, triggerAndWait: (payload, options, requestOptions) => { - // Evaluate rate limit key function if defined - const rateLimitKey = - options?.rateLimitKey ?? - (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); - return new TaskRunPromise((resolve, reject) => { triggerAndWait_internal( "triggerAndWait()", @@ -226,7 +207,6 @@ export function createTask< { queue: params.queue?.name, ...options, - rateLimitKey, }, requestOptions ) @@ -246,8 +226,7 @@ export function createTask< undefined, options, undefined, - params.queue?.name, - params.queue?.rateLimit?.key + params.queue?.name ); }, }; @@ -273,14 +252,7 @@ export function createTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, - // Only include serializable rateLimit config (without key function) - rateLimit: queue.rateLimit - ? { - limit: queue.rateLimit.limit, - period: queue.rateLimit.period, - burst: queue.rateLimit.burst, - } - : undefined, + rateLimit: queue.rateLimit, }); } @@ -335,11 +307,6 @@ export function createSchemaTask< description: params.description, schema: params.schema, trigger: async (payload, options, requestOptions) => { - // Evaluate rate limit key function if defined - const rateLimitKey = - options?.rateLimitKey ?? - (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); - return await trigger_internal, TOutput>>( "trigger()", params.id, @@ -348,7 +315,6 @@ export function createSchemaTask< { queue: params.queue?.name, ...options, - rateLimitKey, }, requestOptions ); @@ -361,16 +327,10 @@ export function createSchemaTask< options, parsePayload, requestOptions, - params.queue?.name, - params.queue?.rateLimit?.key + params.queue?.name ); }, triggerAndWait: (payload, options) => { - // Evaluate rate limit key function if defined - const rateLimitKey = - options?.rateLimitKey ?? - (params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined); - return new TaskRunPromise((resolve, reject) => { triggerAndWait_internal, TOutput>( "triggerAndWait()", @@ -380,7 +340,6 @@ export function createSchemaTask< { queue: params.queue?.name, ...options, - rateLimitKey, } ) .then((result) => { @@ -399,8 +358,7 @@ export function createSchemaTask< parsePayload, options, undefined, - params.queue?.name, - params.queue?.rateLimit?.key + params.queue?.name ); }, }; @@ -427,14 +385,7 @@ export function createSchemaTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, - // Only include serializable rateLimit config (without key function) - rateLimit: queue.rateLimit - ? { - limit: queue.rateLimit.limit, - period: queue.rateLimit.period, - burst: queue.rateLimit.burst, - } - : undefined, + rateLimit: queue.rateLimit, }); } @@ -1979,8 +1930,7 @@ async function* transformSingleTaskBatchItemsStream( items: AsyncIterable>, parsePayload: SchemaParseFn | undefined, options: BatchTriggerOptions | undefined, - queue: string | undefined, - rateLimitKeyFn?: (payload: TPayload) => string | undefined + queue: string | undefined ): AsyncIterable { let index = 0; for await (const item of items) { @@ -1991,10 +1941,6 @@ async function* transformSingleTaskBatchItemsStream( flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) ); - // Evaluate rate limit key for this item - const rateLimitKey = - item.options?.rateLimitKey ?? (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined); - yield { index: index++, task: taskIdentifier, @@ -2006,7 +1952,7 @@ async function* transformSingleTaskBatchItemsStream( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, - rateLimitKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2037,8 +1983,7 @@ async function* transformSingleTaskBatchItemsStreamForWait( items: AsyncIterable>, parsePayload: SchemaParseFn | undefined, options: BatchTriggerAndWaitOptions | undefined, - queue: string | undefined, - rateLimitKeyFn?: (payload: TPayload) => string | undefined + queue: string | undefined ): AsyncIterable { let index = 0; for await (const item of items) { @@ -2049,10 +1994,6 @@ async function* transformSingleTaskBatchItemsStreamForWait( flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) ); - // Evaluate rate limit key for this item - const rateLimitKey = - item.options?.rateLimitKey ?? (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined); - yield { index: index++, task: taskIdentifier, @@ -2065,7 +2006,7 @@ async function* transformSingleTaskBatchItemsStreamForWait( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, - rateLimitKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2155,8 +2096,7 @@ async function batchTrigger_internal( options?: BatchTriggerOptions, parsePayload?: SchemaParseFn, requestOptions?: TriggerApiRequestOptions, - queue?: string, - rateLimitKeyFn?: (payload: TRunTypes["payload"]) => string | undefined + queue?: string ): Promise> { const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig); @@ -2175,11 +2115,6 @@ async function batchTrigger_internal( flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) ); - // Evaluate rate limit key for this item - const rateLimitKey = - item.options?.rateLimitKey ?? - (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined); - return { index, task: taskIdentifier, @@ -2191,7 +2126,7 @@ async function batchTrigger_internal( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, - rateLimitKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2264,8 +2199,7 @@ async function batchTrigger_internal( asyncItems, parsePayload, options, - queue, - rateLimitKeyFn + queue ); // Execute streaming 2-phase batch @@ -2405,8 +2339,7 @@ async function batchTriggerAndWait_internal, options?: BatchTriggerAndWaitOptions, requestOptions?: TriggerApiRequestOptions, - queue?: string, - rateLimitKeyFn?: (payload: TPayload) => string | undefined + queue?: string ): Promise> { const ctx = taskContext.ctx; @@ -2429,11 +2362,6 @@ async function batchTriggerAndWait_internal Date: Thu, 18 Dec 2025 18:24:41 +0100 Subject: [PATCH 5/5] fix(rate-limit): add rateLimitKey propagation to batch streaming transforms --- packages/trigger-sdk/src/v3/shared.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index c6fcfc94fc..fe4010c433 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -1753,6 +1753,7 @@ async function* transformBatchItemsStream( options: { queue: item.options?.queue ? { name: item.options.queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -1806,6 +1807,7 @@ async function* transformBatchItemsStreamForWait( lockToVersion: taskContext.worker?.version, queue: item.options?.queue ? { name: item.options.queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -1856,6 +1858,7 @@ async function* transformBatchByTaskItemsStream