Skip to content

Commit daa0b5b

Browse files
committed
handle batch failures more reliably
1 parent ef76ff7 commit daa0b5b

File tree

14 files changed

+638
-304
lines changed

14 files changed

+638
-304
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -941,8 +941,8 @@ const EnvironmentSchema = z
941941
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
942942

943943
// BatchQueue DRR settings (Run Engine v2)
944-
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().optional(),
945-
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().optional(),
944+
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(5),
945+
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(50),
946946
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().optional(),
947947
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().optional(),
948948
// Global rate limit: max items processed per second across all consumers

apps/webapp/app/runEngine/concerns/batchLimits.server.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { z } from "zod";
44
import { env } from "~/env.server";
55
import { RateLimiterConfig } from "~/services/authorizationRateLimitMiddleware.server";
66
import { createRedisRateLimitClient, Duration, RateLimiter } from "~/services/rateLimiter.server";
7+
import { singleton } from "~/utils/singleton";
78

89
const BatchLimitsConfig = z.object({
910
processingConcurrency: z.number().int().default(env.BATCH_CONCURRENCY_LIMIT_DEFAULT),
@@ -14,7 +15,9 @@ const BatchLimitsConfig = z.object({
1415
*/
1516
export type BatchLimitsConfig = z.infer<typeof BatchLimitsConfig>;
1617

17-
function createOrganizationRateLimiter(organization: Organization): RateLimiter {
18+
const batchLimitsRedisClient = singleton("batchLimitsRedisClient", createBatchLimitsRedisClient);
19+
20+
function createBatchLimitsRedisClient() {
1821
const redisClient = createRedisRateLimitClient({
1922
port: env.RATE_LIMIT_REDIS_PORT,
2023
host: env.RATE_LIMIT_REDIS_HOST,
@@ -24,6 +27,10 @@ function createOrganizationRateLimiter(organization: Organization): RateLimiter
2427
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
2528
});
2629

30+
return redisClient;
31+
}
32+
33+
function createOrganizationRateLimiter(organization: Organization): RateLimiter {
2734
const limiterConfig = resolveBatchRateLimitConfig(organization.batchRateLimitConfig);
2835

2936
const limiter =
@@ -38,7 +45,7 @@ function createOrganizationRateLimiter(organization: Organization): RateLimiter
3845
: Ratelimit.slidingWindow(limiterConfig.tokens, limiterConfig.window);
3946

4047
return new RateLimiter({
41-
redisClient,
48+
redisClient: batchLimitsRedisClient,
4249
keyPrefix: "ratelimit:batch",
4350
limiter,
4451
logSuccess: false,

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,7 @@ export function createNdjsonParserStream(
273273
* Convert a ReadableStream into an AsyncIterable.
274274
* Useful for processing streams with for-await-of loops.
275275
*/
276-
export async function* streamToAsyncIterable<T>(
277-
stream: ReadableStream<T>
278-
): AsyncIterable<T> {
276+
export async function* streamToAsyncIterable<T>(stream: ReadableStream<T>): AsyncIterable<T> {
279277
const reader = stream.getReader();
280278
try {
281279
while (true) {
@@ -287,4 +285,3 @@ export async function* streamToAsyncIterable<T>(
287285
reader.releaseLock();
288286
}
289287
}
290-

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
66
import { BatchTaskRunStatus, Prisma } from "@trigger.dev/database";
77
import { $replica, prisma } from "~/db.server";
88
import { env } from "~/env.server";
9-
import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server";
9+
import { findEnvironmentById, findEnvironmentFromRun } from "~/models/runtimeEnvironment.server";
1010
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1111
import { logger } from "~/services/logger.server";
1212
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
@@ -659,21 +659,26 @@ export function setupBatchQueueCallbacks() {
659659
},
660660
async (span) => {
661661
try {
662+
const environment = await findEnvironmentById(meta.environmentId);
663+
664+
if (!environment) {
665+
span.setAttribute("batch.result.error", "Environment not found");
666+
span.end();
667+
return {
668+
success: false as const,
669+
error: "Environment not found",
670+
errorCode: "ENVIRONMENT_NOT_FOUND",
671+
};
672+
}
673+
662674
const triggerTaskService = new TriggerTaskService();
663675

664676
// Normalize payload - for application/store (R2 paths), this passes through as-is
665677
const payload = normalizePayload(item.payload, item.payloadType);
666678

667679
const result = await triggerTaskService.call(
668680
item.task,
669-
{
670-
id: meta.environmentId,
671-
type: meta.environmentType,
672-
organizationId: meta.organizationId,
673-
projectId: meta.projectId,
674-
organization: { id: meta.organizationId },
675-
project: { id: meta.projectId },
676-
} as AuthenticatedEnvironment,
681+
environment,
677682
{
678683
payload,
679684
options: {

internal-packages/BATCH_TRIGGER_LIMITS.md

Lines changed: 0 additions & 220 deletions
This file was deleted.

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ export class BatchQueue {
6161
private itemQueueTimeHistogram?: Histogram;
6262

6363
constructor(private options: BatchQueueOptions) {
64-
this.logger = options.logger
65-
? new Logger("BatchQueue", "info")
66-
: new Logger("BatchQueue", "info");
64+
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
6765
this.defaultConcurrency = options.defaultConcurrency ?? 10;
6866

6967
// Initialize metrics if meter is provided

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { z } from "zod";
22
import { RuntimeEnvironmentType } from "@trigger.dev/database";
3+
import { Logger, LogLevel } from "@trigger.dev/core/logger";
4+
import { GlobalRateLimiter } from "@trigger.dev/redis-worker";
5+
import { Meter, Tracer } from "@internal/tracing";
36

47
// ============================================================================
58
// Batch Item Schemas
@@ -208,18 +211,14 @@ export type BatchQueueOptions = {
208211
* Optional global rate limiter to limit processing across all consumers.
209212
* When configured, limits the max items/second processed globally.
210213
*/
211-
globalRateLimiter?: import("@trigger.dev/redis-worker").GlobalRateLimiter;
214+
globalRateLimiter?: GlobalRateLimiter;
212215
/** Logger instance */
213-
logger?: {
214-
debug: (message: string, context?: Record<string, unknown>) => void;
215-
info: (message: string, context?: Record<string, unknown>) => void;
216-
warn: (message: string, context?: Record<string, unknown>) => void;
217-
error: (message: string, context?: Record<string, unknown>) => void;
218-
};
216+
logger?: Logger;
217+
logLevel?: LogLevel;
219218
/** OpenTelemetry tracer for distributed tracing */
220-
tracer?: import("@internal/tracing").Tracer;
219+
tracer?: Tracer;
221220
/** OpenTelemetry meter for metrics */
222-
meter?: import("@internal/tracing").Meter;
221+
meter?: Meter;
223222
};
224223

225224
/**

0 commit comments

Comments
 (0)