Skip to content

Commit 6028065

Browse files
committed
fix: remove rateLimit.key function, use rateLimitKey at trigger time
1 parent 72c8ce0 commit 6028065

File tree

3 files changed

+35
-109
lines changed

3 files changed

+35
-109
lines changed

packages/core/src/v3/types/queues.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ export type QueueOptions = {
5050
* },
5151
* });
5252
*
53-
* // Per-tenant rate limiting
54-
* const perTenantQueue = queue({
55-
* name: "per-tenant-api",
56-
* rateLimit: {
57-
* limit: 100,
58-
* period: "1m",
59-
* key: (payload) => payload.tenantId,
60-
* },
53+
* // Per-tenant rate limiting - pass rateLimitKey at trigger time
54+
* await myTask.trigger(payload, {
55+
* rateLimitKey: `tenant-${payload.tenantId}`,
56+
* });
57+
*
58+
* // Also works with tasks.trigger()
59+
* await tasks.trigger("my-task", payload, {
60+
* rateLimitKey: `tenant-${tenantId}`,
6161
* });
6262
* ```
6363
*/
@@ -68,7 +68,5 @@ export type QueueOptions = {
6868
period: string;
6969
/** Optional burst allowance (defaults to limit) */
7070
burst?: number;
71-
/** Optional function to derive rate limit key from payload (evaluated at trigger time) */
72-
key?: (payload: any) => string;
7371
};
7472
};

packages/core/src/v3/types/tasks.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -240,22 +240,27 @@ type CommonTaskOptions<
240240
* ```
241241
*
242242
* @example
243-
* per-tenant rate limiting
243+
* per-tenant rate limiting - pass rateLimitKey at trigger time
244244
*
245245
* ```ts
246+
* // Define task with rate limit
246247
* export const perTenantTask = task({
247248
id: "per-tenant",
248249
queue: {
249250
rateLimit: {
250251
limit: 100,
251252
period: "1m",
252-
key: (payload) => payload.tenantId,
253253
},
254254
},
255255
run: async ({ payload, ctx }) => {
256256
//...
257257
},
258258
});
259+
*
260+
* // Trigger with rateLimitKey option
261+
* await perTenantTask.trigger(payload, {
262+
* rateLimitKey: `tenant-${payload.tenantId}`,
263+
* });
259264
* ```
260265
*/
261266
queue?: {
@@ -273,10 +278,6 @@ type CommonTaskOptions<
273278
period: string;
274279
/** Optional burst allowance (defaults to limit) */
275280
burst?: number;
276-
/** Optional function to derive rate limit key from payload (evaluated at trigger time).
277-
* This allows per-tenant or per-user rate limiting.
278-
*/
279-
key?: (payload: TPayload) => string;
280281
};
281282
};
282283
/** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on.
@@ -841,12 +842,12 @@ export type TriggerOptions = {
841842
* The `rateLimitKey` creates a separate rate limit bucket for every unique value of the key.
842843
* This allows per-tenant or per-user rate limiting.
843844
*
844-
* If the task has a rate limit key function defined, it will be evaluated at trigger time
845-
* and the result will be used as the key. This option allows you to override that behavior.
846-
*
847845
* @example
848846
* ```ts
849847
* await myTask.trigger(payload, { rateLimitKey: `tenant-${tenantId}` });
848+
*
849+
* // Also works with tasks.trigger()
850+
* await tasks.trigger("my-task", payload, { rateLimitKey: `tenant-${tenantId}` });
850851
* ```
851852
*/
852853
rateLimitKey?: string;

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 17 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,10 @@ export type Context = TaskRunContext;
129129
export { BatchTriggerError };
130130

131131
export function queue(options: QueueOptions): Queue {
132-
// Register with serializable metadata (strip key function from rateLimit)
133132
resourceCatalog.registerQueueMetadata({
134133
name: options.name,
135134
concurrencyLimit: options.concurrencyLimit,
136-
rateLimit: options.rateLimit
137-
? {
138-
limit: options.rateLimit.limit,
139-
period: options.rateLimit.period,
140-
burst: options.rateLimit.burst,
141-
}
142-
: undefined,
135+
rateLimit: options.rateLimit,
143136
});
144137

145138
// @ts-expect-error
@@ -182,11 +175,6 @@ export function createTask<
182175
description: params.description,
183176
jsonSchema: params.jsonSchema,
184177
trigger: async (payload, options) => {
185-
// Evaluate rate limit key function if defined
186-
const rateLimitKey =
187-
options?.rateLimitKey ??
188-
(params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined);
189-
190178
return await trigger_internal<RunTypes<TIdentifier, TInput, TOutput>>(
191179
"trigger()",
192180
params.id,
@@ -195,7 +183,6 @@ export function createTask<
195183
{
196184
queue: params.queue?.name,
197185
...options,
198-
rateLimitKey,
199186
}
200187
);
201188
},
@@ -207,16 +194,10 @@ export function createTask<
207194
options,
208195
undefined,
209196
undefined,
210-
params.queue?.name,
211-
params.queue?.rateLimit?.key
197+
params.queue?.name
212198
);
213199
},
214200
triggerAndWait: (payload, options, requestOptions) => {
215-
// Evaluate rate limit key function if defined
216-
const rateLimitKey =
217-
options?.rateLimitKey ??
218-
(params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined);
219-
220201
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
221202
triggerAndWait_internal<TIdentifier, TInput, TOutput>(
222203
"triggerAndWait()",
@@ -226,7 +207,6 @@ export function createTask<
226207
{
227208
queue: params.queue?.name,
228209
...options,
229-
rateLimitKey,
230210
},
231211
requestOptions
232212
)
@@ -246,8 +226,7 @@ export function createTask<
246226
undefined,
247227
options,
248228
undefined,
249-
params.queue?.name,
250-
params.queue?.rateLimit?.key
229+
params.queue?.name
251230
);
252231
},
253232
};
@@ -273,14 +252,7 @@ export function createTask<
273252
resourceCatalog.registerQueueMetadata({
274253
name: queue.name,
275254
concurrencyLimit: queue.concurrencyLimit,
276-
// Only include serializable rateLimit config (without key function)
277-
rateLimit: queue.rateLimit
278-
? {
279-
limit: queue.rateLimit.limit,
280-
period: queue.rateLimit.period,
281-
burst: queue.rateLimit.burst,
282-
}
283-
: undefined,
255+
rateLimit: queue.rateLimit,
284256
});
285257
}
286258

@@ -335,11 +307,6 @@ export function createSchemaTask<
335307
description: params.description,
336308
schema: params.schema,
337309
trigger: async (payload, options, requestOptions) => {
338-
// Evaluate rate limit key function if defined
339-
const rateLimitKey =
340-
options?.rateLimitKey ??
341-
(params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined);
342-
343310
return await trigger_internal<RunTypes<TIdentifier, inferSchemaIn<TSchema>, TOutput>>(
344311
"trigger()",
345312
params.id,
@@ -348,7 +315,6 @@ export function createSchemaTask<
348315
{
349316
queue: params.queue?.name,
350317
...options,
351-
rateLimitKey,
352318
},
353319
requestOptions
354320
);
@@ -361,16 +327,10 @@ export function createSchemaTask<
361327
options,
362328
parsePayload,
363329
requestOptions,
364-
params.queue?.name,
365-
params.queue?.rateLimit?.key
330+
params.queue?.name
366331
);
367332
},
368333
triggerAndWait: (payload, options) => {
369-
// Evaluate rate limit key function if defined
370-
const rateLimitKey =
371-
options?.rateLimitKey ??
372-
(params.queue?.rateLimit?.key ? params.queue.rateLimit.key(payload) : undefined);
373-
374334
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
375335
triggerAndWait_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
376336
"triggerAndWait()",
@@ -380,7 +340,6 @@ export function createSchemaTask<
380340
{
381341
queue: params.queue?.name,
382342
...options,
383-
rateLimitKey,
384343
}
385344
)
386345
.then((result) => {
@@ -399,8 +358,7 @@ export function createSchemaTask<
399358
parsePayload,
400359
options,
401360
undefined,
402-
params.queue?.name,
403-
params.queue?.rateLimit?.key
361+
params.queue?.name
404362
);
405363
},
406364
};
@@ -427,14 +385,7 @@ export function createSchemaTask<
427385
resourceCatalog.registerQueueMetadata({
428386
name: queue.name,
429387
concurrencyLimit: queue.concurrencyLimit,
430-
// Only include serializable rateLimit config (without key function)
431-
rateLimit: queue.rateLimit
432-
? {
433-
limit: queue.rateLimit.limit,
434-
period: queue.rateLimit.period,
435-
burst: queue.rateLimit.burst,
436-
}
437-
: undefined,
388+
rateLimit: queue.rateLimit,
438389
});
439390
}
440391

@@ -1979,8 +1930,7 @@ async function* transformSingleTaskBatchItemsStream<TPayload>(
19791930
items: AsyncIterable<BatchItem<TPayload>>,
19801931
parsePayload: SchemaParseFn<TPayload> | undefined,
19811932
options: BatchTriggerOptions | undefined,
1982-
queue: string | undefined,
1983-
rateLimitKeyFn?: (payload: TPayload) => string | undefined
1933+
queue: string | undefined
19841934
): AsyncIterable<BatchItemNDJSON> {
19851935
let index = 0;
19861936
for await (const item of items) {
@@ -1991,10 +1941,6 @@ async function* transformSingleTaskBatchItemsStream<TPayload>(
19911941
flattenIdempotencyKey([options?.idempotencyKey, `${index}`])
19921942
);
19931943

1994-
// Evaluate rate limit key for this item
1995-
const rateLimitKey =
1996-
item.options?.rateLimitKey ?? (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined);
1997-
19981944
yield {
19991945
index: index++,
20001946
task: taskIdentifier,
@@ -2006,7 +1952,7 @@ async function* transformSingleTaskBatchItemsStream<TPayload>(
20061952
? { name: queue }
20071953
: undefined,
20081954
concurrencyKey: item.options?.concurrencyKey,
2009-
rateLimitKey,
1955+
rateLimitKey: item.options?.rateLimitKey,
20101956
test: taskContext.ctx?.run.isTest,
20111957
payloadType: payloadPacket.dataType,
20121958
delay: item.options?.delay,
@@ -2037,8 +1983,7 @@ async function* transformSingleTaskBatchItemsStreamForWait<TPayload>(
20371983
items: AsyncIterable<BatchTriggerAndWaitItem<TPayload>>,
20381984
parsePayload: SchemaParseFn<TPayload> | undefined,
20391985
options: BatchTriggerAndWaitOptions | undefined,
2040-
queue: string | undefined,
2041-
rateLimitKeyFn?: (payload: TPayload) => string | undefined
1986+
queue: string | undefined
20421987
): AsyncIterable<BatchItemNDJSON> {
20431988
let index = 0;
20441989
for await (const item of items) {
@@ -2049,10 +1994,6 @@ async function* transformSingleTaskBatchItemsStreamForWait<TPayload>(
20491994
flattenIdempotencyKey([options?.idempotencyKey, `${index}`])
20501995
);
20511996

2052-
// Evaluate rate limit key for this item
2053-
const rateLimitKey =
2054-
item.options?.rateLimitKey ?? (rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined);
2055-
20561997
yield {
20571998
index: index++,
20581999
task: taskIdentifier,
@@ -2065,7 +2006,7 @@ async function* transformSingleTaskBatchItemsStreamForWait<TPayload>(
20652006
? { name: queue }
20662007
: undefined,
20672008
concurrencyKey: item.options?.concurrencyKey,
2068-
rateLimitKey,
2009+
rateLimitKey: item.options?.rateLimitKey,
20692010
test: taskContext.ctx?.run.isTest,
20702011
payloadType: payloadPacket.dataType,
20712012
delay: item.options?.delay,
@@ -2155,8 +2096,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
21552096
options?: BatchTriggerOptions,
21562097
parsePayload?: SchemaParseFn<TRunTypes["payload"]>,
21572098
requestOptions?: TriggerApiRequestOptions,
2158-
queue?: string,
2159-
rateLimitKeyFn?: (payload: TRunTypes["payload"]) => string | undefined
2099+
queue?: string
21602100
): Promise<BatchRunHandleFromTypes<TRunTypes>> {
21612101
const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig);
21622102

@@ -2175,11 +2115,6 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
21752115
flattenIdempotencyKey([options?.idempotencyKey, `${index}`])
21762116
);
21772117

2178-
// Evaluate rate limit key for this item
2179-
const rateLimitKey =
2180-
item.options?.rateLimitKey ??
2181-
(rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined);
2182-
21832118
return {
21842119
index,
21852120
task: taskIdentifier,
@@ -2191,7 +2126,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
21912126
? { name: queue }
21922127
: undefined,
21932128
concurrencyKey: item.options?.concurrencyKey,
2194-
rateLimitKey,
2129+
rateLimitKey: item.options?.rateLimitKey,
21952130
test: taskContext.ctx?.run.isTest,
21962131
payloadType: payloadPacket.dataType,
21972132
delay: item.options?.delay,
@@ -2264,8 +2199,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
22642199
asyncItems,
22652200
parsePayload,
22662201
options,
2267-
queue,
2268-
rateLimitKeyFn
2202+
queue
22692203
);
22702204

22712205
// Execute streaming 2-phase batch
@@ -2405,8 +2339,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
24052339
parsePayload?: SchemaParseFn<TPayload>,
24062340
options?: BatchTriggerAndWaitOptions,
24072341
requestOptions?: TriggerApiRequestOptions,
2408-
queue?: string,
2409-
rateLimitKeyFn?: (payload: TPayload) => string | undefined
2342+
queue?: string
24102343
): Promise<BatchResult<TIdentifier, TOutput>> {
24112344
const ctx = taskContext.ctx;
24122345

@@ -2429,11 +2362,6 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
24292362
flattenIdempotencyKey([options?.idempotencyKey, `${index}`])
24302363
);
24312364

2432-
// Evaluate rate limit key for this item
2433-
const rateLimitKey =
2434-
item.options?.rateLimitKey ??
2435-
(rateLimitKeyFn ? rateLimitKeyFn(parsedPayload) : undefined);
2436-
24372365
return {
24382366
index,
24392367
task: id,
@@ -2446,7 +2374,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
24462374
? { name: queue }
24472375
: undefined,
24482376
concurrencyKey: item.options?.concurrencyKey,
2449-
rateLimitKey,
2377+
rateLimitKey: item.options?.rateLimitKey,
24502378
test: taskContext.ctx?.run.isTest,
24512379
payloadType: payloadPacket.dataType,
24522380
delay: item.options?.delay,
@@ -2525,8 +2453,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
25252453
asyncItems,
25262454
parsePayload,
25272455
options,
2528-
queue,
2529-
rateLimitKeyFn
2456+
queue
25302457
);
25312458

25322459
return await tracer.startActiveSpan(

0 commit comments

Comments
 (0)