diff --git a/.vscode/launch.json b/.vscode/launch.json index f0736642d3..c3e9782ea0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -22,6 +22,15 @@ "cwd": "${workspaceFolder}/apps/webapp", "sourceMaps": true }, + { + "type": "node-terminal", + "request": "launch", + "name": "Debug fairDequeuingStrategy.test.ts", + "command": "pnpm run test -t FairDequeuingStrategy", + "envFile": "${workspaceFolder}/.env", + "cwd": "${workspaceFolder}/apps/webapp", + "sourceMaps": true + }, { "type": "chrome", "request": "launch", diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index be9e5a1ef1..198062b245 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -199,6 +199,8 @@ const EnvironmentSchema = z.object({ INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED: z.string().default("0"), INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"), + INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(), + ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(), ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(), @@ -221,9 +223,12 @@ const EnvironmentSchema = z.object({ .number() .int() .default(60 * 1000 * 15), - MARQS_SHARED_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36), - MARQS_DEV_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(12), + MARQS_SHARED_QUEUE_LIMIT: z.coerce.number().int().default(1000), + MARQS_DEV_QUEUE_LIMIT: z.coerce.number().int().default(1000), MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64), + MARQS_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75), + MARQS_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3), + MARQS_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25), PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), diff --git a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts new file mode 100644 index 0000000000..4a3cd066eb --- /dev/null +++ b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts @@ -0,0 +1,580 @@ +import { flattenAttributes } from "@trigger.dev/core/v3"; +import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache"; +import { MemoryStore } from "@unkey/cache/stores"; +import { randomUUID } from "crypto"; +import { Redis } from "ioredis"; +import { MarQSFairDequeueStrategy, MarQSKeyProducer } from "./types"; +import seedrandom from "seedrandom"; +import { Tracer } from "@opentelemetry/api"; +import { startSpan } from "../tracing.server"; + +export type FairDequeuingStrategyBiases = { + /** + * How much to bias towards environments with higher concurrency limits + * 0 = no bias, 1 = full bias based on limit differences + */ + concurrencyLimitBias: number; + + /** + * How much to bias towards environments with more available capacity + * 0 = no bias, 1 = full bias based on available capacity + */ + availableCapacityBias: number; + + /** + * Controls randomization of queue ordering within environments + * 0 = strict age-based ordering (oldest first) + * 1 = completely random ordering + * Values between 0-1 blend between age-based and random ordering + */ + queueAgeRandomization: number; +}; + +export type FairDequeuingStrategyOptions = { + redis: Redis; + keys: MarQSKeyProducer; + defaultOrgConcurrency: number; + defaultEnvConcurrency: number; + parentQueueLimit: number; + checkForDisabledOrgs: boolean; + tracer: Tracer; + seed?: string; + /** + * Configure biasing for environment shuffling + * If not provided, no biasing will be applied (completely random shuffling) + */ + biases?: FairDequeuingStrategyBiases; +}; + +type FairQueueConcurrency = { + current: number; + limit: number; +}; + +type FairQueue = { id: string; age: number; org: string; env: string }; + +type FairQueueSnapshot = { + id: string; + orgs: Record; + envs: Record; + queues: Array; +}; + +type WeightedEnv = { + envId: string; + weight: number; +}; + +type WeightedQueue = { + queue: FairQueue; + weight: number; +}; + +const emptyFairQueueSnapshot: FairQueueSnapshot = { + id: "empty", + orgs: {}, + envs: {}, + queues: [], +}; + +const defaultBiases: FairDequeuingStrategyBiases = { + concurrencyLimitBias: 0, + availableCapacityBias: 0, + queueAgeRandomization: 0, // Default to completely age-based ordering +}; + +export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { + private _cache: UnkeyCache<{ + concurrencyLimit: number; + disabledConcurrency: boolean; + }>; + + private _rng: seedrandom.PRNG; + + constructor(private options: FairDequeuingStrategyOptions) { + const ctx = new DefaultStatefulContext(); + const memory = new MemoryStore({ persistentMap: new Map() }); + + this._cache = createCache({ + concurrencyLimit: new Namespace(ctx, { + stores: [memory], + fresh: 60_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. + stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. + }), + disabledConcurrency: new Namespace(ctx, { + stores: [memory], + fresh: 30_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. + stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. + }), + }); + + this._rng = seedrandom(options.seed); + } + + async distributeFairQueuesFromParentQueue( + parentQueue: string, + consumerId: string + ): Promise> { + return await startSpan( + this.options.tracer, + "distributeFairQueuesFromParentQueue", + async (span) => { + span.setAttribute("consumer_id", consumerId); + span.setAttribute("parent_queue", parentQueue); + + const snapshot = await this.#createQueueSnapshot(parentQueue, consumerId); + + span.setAttributes({ + snapshot_org_count: Object.keys(snapshot.orgs).length, + snapshot_env_count: Object.keys(snapshot.envs).length, + snapshot_queue_count: snapshot.queues.length, + }); + + const queues = snapshot.queues; + + if (queues.length === 0) { + return []; + } + + const shuffledQueues = this.#shuffleQueuesByEnv(snapshot); + + span.setAttribute("shuffled_queue_count", shuffledQueues.length); + + if (shuffledQueues[0]) { + span.setAttribute("winning_env", this.options.keys.envIdFromQueue(shuffledQueues[0])); + span.setAttribute("winning_org", this.options.keys.orgIdFromQueue(shuffledQueues[0])); + } + + return shuffledQueues; + } + ); + } + + #shuffleQueuesByEnv(snapshot: FairQueueSnapshot): Array { + const envs = Object.keys(snapshot.envs); + const biases = this.options.biases ?? defaultBiases; + + if (biases.concurrencyLimitBias === 0 && biases.availableCapacityBias === 0) { + const shuffledEnvs = this.#shuffle(envs); + return this.#orderQueuesByEnvs(shuffledEnvs, snapshot); + } + + // Find the maximum concurrency limit for normalization + const maxLimit = Math.max(...envs.map((envId) => snapshot.envs[envId].concurrency.limit)); + + // Calculate weights for each environment + const weightedEnvs: WeightedEnv[] = envs.map((envId) => { + const env = snapshot.envs[envId]; + + // Start with base weight of 1 + let weight = 1; + + // Add normalized concurrency limit bias if configured + if (biases.concurrencyLimitBias > 0) { + const normalizedLimit = env.concurrency.limit / maxLimit; + // Square or cube the bias to make it more pronounced at higher values + weight *= 1 + Math.pow(normalizedLimit * biases.concurrencyLimitBias, 2); + } + + // Add available capacity bias if configured + if (biases.availableCapacityBias > 0) { + const usedCapacityPercentage = env.concurrency.current / env.concurrency.limit; + const availableCapacityBonus = 1 - usedCapacityPercentage; + // Square or cube the bias to make it more pronounced at higher values + weight *= 1 + Math.pow(availableCapacityBonus * biases.availableCapacityBias, 2); + } + + return { envId, weight }; + }); + + const shuffledEnvs = this.#weightedShuffle(weightedEnvs); + return this.#orderQueuesByEnvs(shuffledEnvs, snapshot); + } + + #weightedShuffle(weightedItems: WeightedEnv[]): string[] { + const totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0); + const result: string[] = []; + const items = [...weightedItems]; + + while (items.length > 0) { + let random = this._rng() * totalWeight; + let index = 0; + + // Find item based on weighted random selection + while (random > 0 && index < items.length) { + random -= items[index].weight; + index++; + } + index = Math.max(0, index - 1); + + // Add selected item to result and remove from items + result.push(items[index].envId); + items.splice(index, 1); + } + + return result; + } + + // Helper method to maintain DRY principle + #orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array { + const queuesByEnv = snapshot.queues.reduce((acc, queue) => { + if (!acc[queue.env]) { + acc[queue.env] = []; + } + acc[queue.env].push(queue); + return acc; + }, {} as Record>); + + const queues = envs.reduce((acc, envId) => { + if (queuesByEnv[envId]) { + // Instead of sorting by age, use weighted random selection + acc.push(...this.#weightedRandomQueueOrder(queuesByEnv[envId])); + } + return acc; + }, [] as Array); + + return queues.map((queue) => queue.id); + } + + #weightedRandomQueueOrder(queues: FairQueue[]): FairQueue[] { + if (queues.length <= 1) return queues; + + const biases = this.options.biases ?? defaultBiases; + + // When queueAgeRandomization is 0, use strict age-based ordering + if (biases.queueAgeRandomization === 0) { + return [...queues].sort((a, b) => b.age - a.age); + } + + // Find the maximum age for normalization + const maxAge = Math.max(...queues.map((q) => q.age)); + + // Calculate weights for each queue + const weightedQueues: WeightedQueue[] = queues.map((queue) => { + // Normalize age to be between 0 and 1 + const normalizedAge = queue.age / maxAge; + + // Calculate weight: combine base weight with configurable age influence + const baseWeight = 1; + const weight = baseWeight + normalizedAge * biases.queueAgeRandomization; + + return { queue, weight }; + }); + + // Perform weighted random selection for ordering + const result: FairQueue[] = []; + let remainingQueues = [...weightedQueues]; + let totalWeight = remainingQueues.reduce((sum, wq) => sum + wq.weight, 0); + + while (remainingQueues.length > 0) { + let random = this._rng() * totalWeight; + let index = 0; + + // Find queue based on weighted random selection + while (random > 0 && index < remainingQueues.length) { + random -= remainingQueues[index].weight; + index++; + } + index = Math.max(0, index - 1); + + // Add selected queue to result and remove from remaining + result.push(remainingQueues[index].queue); + totalWeight -= remainingQueues[index].weight; + remainingQueues.splice(index, 1); + } + + return result; + } + + #shuffle(array: Array): Array { + let currentIndex = array.length; + let temporaryValue; + let randomIndex; + + const newArray = [...array]; + + while (currentIndex !== 0) { + randomIndex = Math.floor(this._rng() * currentIndex); + currentIndex -= 1; + + temporaryValue = newArray[currentIndex]; + newArray[currentIndex] = newArray[randomIndex]; + newArray[randomIndex] = temporaryValue; + } + + return newArray; + } + + async #createQueueSnapshot(parentQueue: string, consumerId: string): Promise { + return await startSpan(this.options.tracer, "createQueueSnapshot", async (span) => { + span.setAttribute("consumer_id", consumerId); + span.setAttribute("parent_queue", parentQueue); + + const now = Date.now(); + + const queues = await this.#allChildQueuesByScore(parentQueue, consumerId, now); + + span.setAttribute("parent_queue_count", queues.length); + + if (queues.length === 0) { + return emptyFairQueueSnapshot; + } + + const orgIds = new Set(); + const envIds = new Set(); + const envIdToOrgId = new Map(); + + for (const queue of queues) { + orgIds.add(queue.org); + envIds.add(queue.env); + + envIdToOrgId.set(queue.env, queue.org); + } + + const orgs = await Promise.all( + Array.from(orgIds).map(async (orgId) => { + return { id: orgId, concurrency: await this.#getOrgConcurrency(orgId) }; + }) + ); + + const orgsAtFullConcurrency = orgs.filter( + (org) => org.concurrency.current >= org.concurrency.limit + ); + + span.setAttributes({ + ...flattenAttributes(orgsAtFullConcurrency, "orgs_at_full_concurrency"), + }); + + const orgIdsAtFullConcurrency = new Set(orgsAtFullConcurrency.map((org) => org.id)); + + const orgsSnapshot = orgs.reduce((acc, org) => { + if (!orgIdsAtFullConcurrency.has(org.id)) { + acc[org.id] = org; + } + + return acc; + }, {} as Record); + + if (Object.keys(orgsSnapshot).length === 0) { + return emptyFairQueueSnapshot; + } + + const envsWithoutFullOrgs = Array.from(envIds).filter( + (envId) => !orgIdsAtFullConcurrency.has(envIdToOrgId.get(envId)!) + ); + + const envs = await Promise.all( + envsWithoutFullOrgs.map(async (envId) => { + return { + id: envId, + concurrency: await this.#getEnvConcurrency(envId, envIdToOrgId.get(envId)!), + }; + }) + ); + + const envsAtFullConcurrency = envs.filter( + (env) => env.concurrency.current >= env.concurrency.limit + ); + + span.setAttributes({ + ...flattenAttributes(envsAtFullConcurrency, "envs_at_full_concurrency"), + }); + + const envIdsAtFullConcurrency = new Set(envsAtFullConcurrency.map((env) => env.id)); + + const envsSnapshot = envs.reduce((acc, env) => { + if (!envIdsAtFullConcurrency.has(env.id)) { + acc[env.id] = env; + } + + return acc; + }, {} as Record); + + const queuesSnapshot = queues.filter( + (queue) => + !orgIdsAtFullConcurrency.has(queue.org) && !envIdsAtFullConcurrency.has(queue.env) + ); + + const snapshot = { + id: randomUUID(), + orgs: orgsSnapshot, + envs: envsSnapshot, + queues: queuesSnapshot, + }; + + return snapshot; + }); + } + + async #getOrgConcurrency(orgId: string): Promise { + return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => { + span.setAttribute("org_id", orgId); + + if (this.options.checkForDisabledOrgs) { + const isDisabled = await this.#getConcurrencyDisabled(orgId); + + if (isDisabled) { + span.setAttribute("disabled", true); + + return { current: 0, limit: 0 }; + } + } + + const [currentValue, limitValue] = await Promise.all([ + this.#getOrgCurrentConcurrency(orgId), + this.#getOrgConcurrencyLimit(orgId), + ]); + + span.setAttribute("current_value", currentValue); + span.setAttribute("limit_value", limitValue); + + return { current: currentValue, limit: limitValue }; + }); + } + + async #getEnvConcurrency(envId: string, orgId: string): Promise { + return await startSpan(this.options.tracer, "getEnvConcurrency", async (span) => { + span.setAttribute("org_id", orgId); + span.setAttribute("env_id", envId); + + const [currentValue, limitValue] = await Promise.all([ + this.#getEnvCurrentConcurrency(envId), + this.#getEnvConcurrencyLimit(envId), + ]); + + span.setAttribute("current_value", currentValue); + span.setAttribute("limit_value", limitValue); + + return { current: currentValue, limit: limitValue }; + }); + } + + async #allChildQueuesByScore( + parentQueue: string, + consumerId: string, + now: number + ): Promise> { + return await startSpan(this.options.tracer, "allChildQueuesByScore", async (span) => { + span.setAttribute("consumer_id", consumerId); + span.setAttribute("parent_queue", parentQueue); + + const valuesWithScores = await this.options.redis.zrangebyscore( + parentQueue, + "-inf", + now, + "WITHSCORES", + "LIMIT", + 0, + this.options.parentQueueLimit + ); + + const result: Array = []; + + for (let i = 0; i < valuesWithScores.length; i += 2) { + result.push({ + id: valuesWithScores[i], + age: now - Number(valuesWithScores[i + 1]), + env: this.options.keys.envIdFromQueue(valuesWithScores[i]), + org: this.options.keys.orgIdFromQueue(valuesWithScores[i]), + }); + } + + span.setAttribute("queue_count", result.length); + + return result; + }); + } + + async #getConcurrencyDisabled(orgId: string) { + return await startSpan(this.options.tracer, "getConcurrencyDisabled", async (span) => { + span.setAttribute("org_id", orgId); + + const key = this.options.keys.disabledConcurrencyLimitKey(orgId); + + const result = await this._cache.disabledConcurrency.swr(key, async () => { + const value = await this.options.redis.exists(key); + + return Boolean(value); + }); + + return typeof result.val === "boolean" ? result.val : false; + }); + } + + async #getOrgConcurrencyLimit(orgId: string) { + return await startSpan(this.options.tracer, "getOrgConcurrencyLimit", async (span) => { + span.setAttribute("org_id", orgId); + + const key = this.options.keys.orgConcurrencyLimitKey(orgId); + + const result = await this._cache.concurrencyLimit.swr(key, async () => { + const value = await this.options.redis.get(key); + + if (!value) { + return this.options.defaultOrgConcurrency; + } + + return Number(value); + }); + + return result.val ?? this.options.defaultOrgConcurrency; + }); + } + + async #getOrgCurrentConcurrency(orgId: string) { + return await startSpan(this.options.tracer, "getOrgCurrentConcurrency", async (span) => { + span.setAttribute("org_id", orgId); + + const key = this.options.keys.orgCurrentConcurrencyKey(orgId); + + const result = await this.options.redis.scard(key); + + span.setAttribute("current_value", result); + + return result; + }); + } + + async #getEnvConcurrencyLimit(envId: string) { + return await startSpan(this.options.tracer, "getEnvConcurrencyLimit", async (span) => { + span.setAttribute("env_id", envId); + + const key = this.options.keys.envConcurrencyLimitKey(envId); + + const result = await this._cache.concurrencyLimit.swr(key, async () => { + const value = await this.options.redis.get(key); + + if (!value) { + return this.options.defaultEnvConcurrency; + } + + return Number(value); + }); + + return result.val ?? this.options.defaultEnvConcurrency; + }); + } + + async #getEnvCurrentConcurrency(envId: string) { + return await startSpan(this.options.tracer, "getEnvCurrentConcurrency", async (span) => { + span.setAttribute("env_id", envId); + + const key = this.options.keys.envCurrentConcurrencyKey(envId); + + const result = await this.options.redis.scard(key); + + span.setAttribute("current_value", result); + + return result; + }); + } +} + +export class NoopFairDequeuingStrategy implements MarQSFairDequeueStrategy { + async distributeFairQueuesFromParentQueue( + parentQueue: string, + consumerId: string + ): Promise> { + return []; + } +} diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index b7b249bbbb..0fd91fa8f3 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -13,37 +13,27 @@ import { SEMATTRS_MESSAGING_OPERATION, SEMATTRS_MESSAGING_SYSTEM, } from "@opentelemetry/semantic-conventions"; -import { flattenAttributes } from "@trigger.dev/core/v3"; -import Redis, { type Callback, type RedisOptions, type Result } from "ioredis"; +import Redis, { type Callback, type Result } from "ioredis"; import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server"; -import { attributesFromAuthenticatedEnv } from "../tracer.server"; +import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server"; import { AsyncWorker } from "./asyncWorker.server"; +import { FairDequeuingStrategy } from "./fairDequeuingStrategy.server"; import { MarQSShortKeyProducer } from "./marqsKeyProducer.server"; -import { SimpleWeightedChoiceStrategy } from "./simpleWeightedPriorityStrategy.server"; import { + MarQSFairDequeueStrategy, MarQSKeyProducer, - MarQSQueuePriorityStrategy, MessagePayload, MessageQueueSubscriber, - QueueCapacities, - QueueRange, - QueueWithScores, VisibilityTimeoutStrategy, } from "./types"; import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server"; -import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache"; -import { MemoryStore } from "@unkey/cache/stores"; const KEY_PREFIX = "marqs:"; -const constants = { - MESSAGE_VISIBILITY_TIMEOUT_QUEUE: "msgVisibilityTimeout", -} as const; - const SemanticAttributes = { CONSUMER_ID: "consumer_id", QUEUE: "queue", @@ -55,15 +45,15 @@ const SemanticAttributes = { export type MarQSOptions = { name: string; tracer: Tracer; - redis: RedisOptions; + redis: Redis; defaultEnvConcurrency: number; defaultOrgConcurrency: number; windowSize?: number; visibilityTimeoutInMs?: number; workers: number; keysProducer: MarQSKeyProducer; - queuePriorityStrategy: MarQSQueuePriorityStrategy; - envQueuePriorityStrategy: MarQSQueuePriorityStrategy; + queuePriorityStrategy: MarQSFairDequeueStrategy; + envQueuePriorityStrategy: MarQSFairDequeueStrategy; visibilityTimeoutStrategy: VisibilityTimeoutStrategy; maximumNackCount: number; enableRebalancing?: boolean; @@ -79,48 +69,13 @@ export class MarQS { public keys: MarQSKeyProducer; #rebalanceWorkers: Array = []; - private _cache: UnkeyCache<{ - childQueueSize: number; - queueConcurrencyLimit: number; - envAndOrgConcurrencyLimit: number; - disabledConcurrency: boolean; - }>; - - private _consumerQueues: Map> = new Map(); - constructor(private readonly options: MarQSOptions) { - this.redis = new Redis(options.redis); + this.redis = options.redis; this.keys = options.keysProducer; this.#startRebalanceWorkers(); this.#registerCommands(); - - const ctx = new DefaultStatefulContext(); - const memory = new MemoryStore({ persistentMap: new Map() }); - - this._cache = createCache({ - childQueueSize: new Namespace(ctx, { - stores: [memory], - fresh: 5000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. - stale: 10_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. - }), - queueConcurrencyLimit: new Namespace(ctx, { - stores: [memory], - fresh: 5000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. - stale: 10_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. - }), - envAndOrgConcurrencyLimit: new Namespace(ctx, { - stores: [memory], - fresh: 60_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. - stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. - }), - disabledConcurrency: new Namespace(ctx, { - stores: [memory], - fresh: 30_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. - stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. - }), - }); } get name() { @@ -277,22 +232,12 @@ export class MarQS { span.setAttribute(SemanticAttributes.PARENT_QUEUE, parentQueue); span.setAttribute(SemanticAttributes.CONSUMER_ID, env.id); - const childQueues = await this.#allChildQueuesForConsumer(parentQueue, env.id); - - span.setAttribute("parent_queue_length", childQueues.length); - - if (childQueues.length === 0) { - return; - } - // Get prioritized list of queues to try - const queues = await this.#getPrioritizedQueueCandidates( - parentQueue, - childQueues, - this.options.envQueuePriorityStrategy, - (queue) => this.#calculateMessageQueueCapacities(queue, { checkForDisabled: false }), - env.id - ); + const queues = + await this.options.envQueuePriorityStrategy.distributeFairQueuesFromParentQueue( + parentQueue, + env.id + ); span.setAttribute("queue_count", queues.length); @@ -322,6 +267,8 @@ export class MarQS { [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success + message_timestamp: message.timestamp, + message_age: Date.now() - message.timestamp, }); await this.options.subscriber?.messageDequeued(message); @@ -379,20 +326,9 @@ export class MarQS { span.setAttribute(SemanticAttributes.PARENT_QUEUE, parentQueue); - const childQueues = await this.#allChildQueuesForConsumer(parentQueue, consumerId); - - span.setAttribute("parent_queue_length", childQueues.length); - - if (childQueues.length === 0) { - return; - } - // Get prioritized list of queues to try - const queues = await this.#getPrioritizedQueueCandidates( + const queues = await this.options.queuePriorityStrategy.distributeFairQueuesFromParentQueue( parentQueue, - childQueues, - this.options.queuePriorityStrategy, - (queue) => this.#calculateMessageQueueCapacities(queue, { checkForDisabled: true }), consumerId ); @@ -433,6 +369,8 @@ export class MarQS { [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, age_in_seconds: ageOfMessageInMs / 1000, attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success + message_timestamp: message.timestamp, + message_age: Date.now() - message.timestamp, }); await this.options.subscriber?.messageDequeued(message); @@ -829,243 +767,6 @@ export class MarQS { ); } - async #getPrioritizedQueueCandidates( - parentQueue: string, - childQueues: Array<{ value: string; score: number }>, - queuePriorityStrategy: MarQSQueuePriorityStrategy, - calculateCapacities: (queue: string) => Promise, - consumerId: string - ) { - return this.#trace( - "getPrioritizedQueueCandidates", - async (span) => { - span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); - span.setAttribute(SemanticAttributes.PARENT_QUEUE, parentQueue); - - span.setAttribute("parent_queue_length", childQueues.length); - - const { range } = await queuePriorityStrategy.nextCandidateSelection( - parentQueue, - consumerId - ); - - span.setAttribute("range_offset", range.offset); - span.setAttribute("range_count", range.count); - - const queues = childQueues.slice(range.offset, range.offset + range.count); - - span.setAttribute("queue_count", queues.length); - - const queuesWithScores = await this.#calculateQueueScores(queues, calculateCapacities); - - span.setAttribute("queues_with_scores_count", queuesWithScores.length); - - // Get weighted distribution of queues instead of a single choice - const weightedQueues = queuePriorityStrategy.distributeQueues(queuesWithScores); - - const nextRange = queuePriorityStrategy.moveToNextRange( - parentQueue, - consumerId, - range, - childQueues.length - ); - - span.setAttribute("next_range_offset", nextRange.offset); - - // Next time we dequeue we will re-fetch the queues of the parent for this consumer - if (nextRange.offset === 0) { - this.#evictConsumerQueues(parentQueue, consumerId); - } - - return weightedQueues; - }, - { - kind: SpanKind.CONSUMER, - attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "receive", - [SEMATTRS_MESSAGING_SYSTEM]: "marqs", - [SemanticAttributes.PARENT_QUEUE]: parentQueue, - }, - } - ); - } - - // Calculate the weights of the queues based on the age and the capacity - async #calculateQueueScores( - queues: Array<{ value: string; score: number }>, - calculateCapacities: (queue: string) => Promise - ): Promise> { - return await this.#trace("calculateQueueScores", async (span) => { - const now = Date.now(); - - const values = await Promise.all( - queues.map(async (queue) => { - return { - queue: queue.value, - capacities: await calculateCapacities(queue.value), - age: now - queue.score, - size: await this.#getQueueSize(queue.value), - }; - }) - ); - - return values; - }); - } - - async #getQueueSize(queue: string) { - const result = await this._cache.childQueueSize.swr(queue, async () => { - return await this.redis.zcard(queue); - }); - - return result.val ?? 0; - } - - async #calculateMessageQueueCapacities( - queue: string, - options?: { checkForDisabled?: boolean } - ): Promise { - if (options?.checkForDisabled) { - const isDisabled = await this.#getConcurrencyDisabled(queue); - - if (isDisabled) { - return { - queue: { current: 0, limit: 0 }, - env: { current: 0, limit: 0 }, - org: { current: 0, limit: 0 }, - }; - } - } - - // Now we need to calculate the queue concurrency limits, using a cache - const [queueLimit, envLimit, orgLimit, currentConcurrencies] = await Promise.all([ - this.#getQueueConcurrencyLimit(queue), - this.#getEnvConcurrencyLimit(queue), - this.#getOrgConcurrencyLimit(queue), - this.#callCalculateQueueCurrentConcurrencies({ - currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(queue), - currentEnvConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(queue), - currentOrgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(queue), - }), - ]); - - return { - queue: { current: currentConcurrencies.queue, limit: queueLimit }, - env: { current: currentConcurrencies.env, limit: envLimit }, - org: { current: currentConcurrencies.org, limit: orgLimit }, - }; - } - - async #getConcurrencyDisabled(queue: string) { - const key = this.keys.disabledConcurrencyLimitKeyFromQueue(queue); - - const result = await this._cache.disabledConcurrency.swr(key, async () => { - const value = await this.redis.exists(key); - - return Boolean(value); - }); - - return typeof result.val === "boolean" ? result.val : false; - } - - async #getOrgConcurrencyLimit(queue: string) { - const key = this.keys.orgConcurrencyLimitKeyFromQueue(queue); - - const result = await this._cache.envAndOrgConcurrencyLimit.swr(key, async () => { - const value = await this.redis.get(key); - - if (!value) { - return this.options.defaultOrgConcurrency; - } - - return Number(value); - }); - - return result.val ?? this.options.defaultOrgConcurrency; - } - - async #getEnvConcurrencyLimit(queue: string) { - const key = this.keys.envConcurrencyLimitKeyFromQueue(queue); - - const result = await this._cache.envAndOrgConcurrencyLimit.swr(key, async () => { - const value = await this.redis.get(key); - - if (!value) { - return this.options.defaultEnvConcurrency; - } - - return Number(value); - }); - - return result.val ?? this.options.defaultEnvConcurrency; - } - - async #getQueueConcurrencyLimit(queue: string) { - const key = this.keys.concurrencyLimitKeyFromQueue(queue); - const defaultValue = Math.min( - this.options.defaultEnvConcurrency, - this.options.defaultOrgConcurrency - ); - - const result = await this._cache.queueConcurrencyLimit.swr(key, async () => { - const value = await this.redis.get(key); - - if (!value) { - return defaultValue; - } - - return Number(value); - }); - - return result.val ?? defaultValue; - } - - #evictConsumerQueues(parentQueue: string, consumerId: string) { - this._consumerQueues.delete([parentQueue, consumerId].join(":")); - } - - async #allChildQueuesForConsumer( - key: string, - consumerId: string - ): Promise> { - const cachedQueues = this._consumerQueues.get([key, consumerId].join(":")); - - if (cachedQueues) { - return cachedQueues; - } - - return await this.#trace("allChildQueuesForConsumer", async (span) => { - span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); - span.setAttribute(SemanticAttributes.PARENT_QUEUE, key); - - const valuesWithScores = await this.redis.zrangebyscore( - key, - "-inf", - Date.now(), - "WITHSCORES" - ); - - const result: Array<{ value: string; score: number }> = []; - - for (let i = 0; i < valuesWithScores.length; i += 2) { - result.push({ - value: valuesWithScores[i], - score: Number(valuesWithScores[i + 1]), - }); - } - - span.setAttribute("queue_count", result.length); - - if (result.length === 0) { - return result; - } - - this._consumerQueues.set([key, consumerId].join(":"), result); - - return result; - }); - } - #startRebalanceWorkers() { if (!this.options.enableRebalancing) { return; @@ -1903,19 +1604,44 @@ function getMarQSClient() { ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }; + const redis = new Redis(redisOptions); + const keysProducer = new MarQSShortKeyProducer(KEY_PREFIX); + return new MarQS({ name: "marqs", tracer: trace.getTracer("marqs"), - keysProducer: new MarQSShortKeyProducer(KEY_PREFIX), + keysProducer, visibilityTimeoutStrategy: new V3VisibilityTimeout(), - queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ - queueSelectionCount: env.MARQS_SHARED_QUEUE_SELECTION_COUNT, + queuePriorityStrategy: new FairDequeuingStrategy({ + tracer: tracer, + redis, + parentQueueLimit: env.MARQS_SHARED_QUEUE_LIMIT, + keys: keysProducer, + defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, + defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, + checkForDisabledOrgs: true, + biases: { + concurrencyLimitBias: env.MARQS_CONCURRENCY_LIMIT_BIAS, + availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS, + queueAgeRandomization: env.MARQS_QUEUE_AGE_RANDOMIZATION_BIAS, + }, }), - envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ - queueSelectionCount: env.MARQS_DEV_QUEUE_SELECTION_COUNT, + envQueuePriorityStrategy: new FairDequeuingStrategy({ + tracer: tracer, + redis, + parentQueueLimit: env.MARQS_DEV_QUEUE_LIMIT, + keys: keysProducer, + defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, + defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, + checkForDisabledOrgs: false, + biases: { + concurrencyLimitBias: 0.0, + availableCapacityBias: 0.0, + queueAgeRandomization: 0.1, + }, }), workers: 1, - redis: redisOptions, + redis, defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, visibilityTimeoutInMs: env.MARQS_VISIBILITY_TIMEOUT_MS, diff --git a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts index 0a6dd0ecb8..227a0a9dd2 100644 --- a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts +++ b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts @@ -36,22 +36,49 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return [this.queueKey(env, queue), constants.CONCURRENCY_LIMIT_PART].join(":"); } - envConcurrencyLimitKey(env: AuthenticatedEnvironment) { - return [this.envKeySection(env.id), constants.CONCURRENCY_LIMIT_PART].join(":"); + envConcurrencyLimitKey(envId: string): string; + envConcurrencyLimitKey(env: AuthenticatedEnvironment): string; + envConcurrencyLimitKey(envOrId: AuthenticatedEnvironment | string): string { + return [ + this.envKeySection(typeof envOrId === "string" ? envOrId : envOrId.id), + constants.CONCURRENCY_LIMIT_PART, + ].join(":"); } - orgConcurrencyLimitKey(env: AuthenticatedEnvironment) { - return [this.orgKeySection(env.organizationId), constants.CONCURRENCY_LIMIT_PART].join(":"); + orgConcurrencyLimitKey(orgId: string): string; + orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string; + orgConcurrencyLimitKey(envOrOrgId: AuthenticatedEnvironment | string) { + return [ + this.orgKeySection(typeof envOrOrgId === "string" ? envOrOrgId : envOrOrgId.organizationId), + constants.CONCURRENCY_LIMIT_PART, + ].join(":"); } - queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string) { - return [ - this.orgKeySection(env.organizationId), - this.envKeySection(env.id), - this.queueSection(queue), - ] - .concat(concurrencyKey ? this.concurrencyKeySection(concurrencyKey) : []) - .join(":"); + queueKey(orgId: string, envId: string, queue: string, concurrencyKey?: string): string; + queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string; + queueKey( + envOrOrgId: AuthenticatedEnvironment | string, + queueOrEnvId: string, + queueOrConcurrencyKey: string, + concurrencyKey?: string + ): string { + if (typeof envOrOrgId === "string") { + return [ + this.orgKeySection(envOrOrgId), + this.envKeySection(queueOrEnvId), + this.queueSection(queueOrConcurrencyKey), + ] + .concat(concurrencyKey ? this.concurrencyKeySection(concurrencyKey) : []) + .join(":"); + } else { + return [ + this.orgKeySection(envOrOrgId.organizationId), + this.envKeySection(envOrOrgId.id), + this.queueSection(queueOrEnvId), + ] + .concat(queueOrConcurrencyKey ? this.concurrencyKeySection(queueOrConcurrencyKey) : []) + .join(":"); + } } envSharedQueueKey(env: AuthenticatedEnvironment) { @@ -93,6 +120,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { disabledConcurrencyLimitKeyFromQueue(queue: string) { const orgId = this.normalizeQueue(queue).split(":")[1]; + return this.disabledConcurrencyLimitKey(orgId); + } + + disabledConcurrencyLimitKey(orgId: string) { return `${constants.ORG_PART}:${orgId}:${constants.DISABLED_CONCURRENCY_LIMIT_PART}`; } @@ -120,12 +151,22 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return `${constants.ENV_PART}:${envId}:${constants.CURRENT_CONCURRENCY_PART}`; } - orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string { - return [this.orgKeySection(env.organizationId), constants.CURRENT_CONCURRENCY_PART].join(":"); + orgCurrentConcurrencyKey(orgId: string): string; + orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; + orgCurrentConcurrencyKey(envOrOrgId: AuthenticatedEnvironment | string): string { + return [ + this.orgKeySection(typeof envOrOrgId === "string" ? envOrOrgId : envOrOrgId.organizationId), + constants.CURRENT_CONCURRENCY_PART, + ].join(":"); } - envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string { - return [this.envKeySection(env.id), constants.CURRENT_CONCURRENCY_PART].join(":"); + envCurrentConcurrencyKey(envId: string): string; + envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; + envCurrentConcurrencyKey(envOrId: AuthenticatedEnvironment | string): string { + return [ + this.envKeySection(typeof envOrId === "string" ? envOrId : envOrId.id), + constants.CURRENT_CONCURRENCY_PART, + ].join(":"); } envQueueKeyFromQueue(queue: string) { @@ -146,6 +187,14 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return `${constants.MESSAGE_PART}:${messageId}:nacks`; } + orgIdFromQueue(queue: string) { + return this.normalizeQueue(queue).split(":")[1]; + } + + envIdFromQueue(queue: string) { + return this.normalizeQueue(queue).split(":")[3]; + } + private shortId(id: string) { // Return the last 12 characters of the id return id.slice(-12); diff --git a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts deleted file mode 100644 index 525046c7b6..0000000000 --- a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts +++ /dev/null @@ -1,150 +0,0 @@ -import { nanoid } from "nanoid"; -import { - MarQSQueuePriorityStrategy, - PriorityStrategyChoice, - QueueRange, - QueueWithScores, -} from "./types"; - -export type SimpleWeightedChoiceStrategyOptions = { - queueSelectionCount: number; - randomSeed?: string; - excludeEnvCapacity?: boolean; -}; - -export class SimpleWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { - private _nextRangesByParentQueue: Map = new Map(); - - constructor(private options: SimpleWeightedChoiceStrategyOptions) {} - - private nextRangeForParentQueue(parentQueue: string, consumerId: string): QueueRange { - return ( - this._nextRangesByParentQueue.get(`${consumerId}:${parentQueue}`) ?? { - offset: 0, - count: this.options.queueSelectionCount, - } - ); - } - - moveToNextRange( - parentQueue: string, - consumerId: string, - currentRange: QueueRange, - parentQueueSize: number - ): QueueRange { - const nextRange: QueueRange = { - offset: currentRange.offset + currentRange.count, - count: currentRange.count, - }; - - // if the nextRange is within the parentQueueSize, set it on the this._nextRangesByParentQueue map and return it - // if the nextRange is outside the parentQueueSize, reset the range to the beginning by deleting the entry from the map - if (nextRange.offset < parentQueueSize) { - this._nextRangesByParentQueue.set(`${consumerId}:${parentQueue}`, nextRange); - return nextRange; - } else { - this._nextRangesByParentQueue.delete(`${consumerId}:${parentQueue}`); - return { offset: 0, count: this.options.queueSelectionCount }; - } - } - - distributeQueues(queues: QueueWithScores[]): Array { - const filteredQueues = filterQueuesAtCapacity(queues); - - if (filteredQueues.length === 0) { - return []; - } - - const queueWeights = this.#calculateQueueWeights(filteredQueues); - - // Sort queues by weight in descending order - const sortedQueues = [...queueWeights].sort((a, b) => b.totalWeight - a.totalWeight); - - // Convert weights to probabilities - const totalQueueWeight = sortedQueues.reduce((sum, queue) => sum + queue.totalWeight, 0); - const weightedQueues = sortedQueues.map(({ queue, totalWeight }) => ({ - queue, - probability: totalQueueWeight > 0 ? totalWeight / totalQueueWeight : 1.0, - })); - - // Apply some randomization while maintaining general weight order - // This helps prevent all consumers from always picking the same highest-weight queue - const shuffledWeightedQueues = weightedQueues - .map((queueInfo, index) => ({ - ...queueInfo, - // Add some controlled randomness while maintaining general weight order - randomFactor: Math.random() * 0.2 - 0.1, // ±10% random adjustment - originalIndex: index, - })) - .sort((a, b) => { - // If probability difference is significant (>20%), maintain order - if (Math.abs(a.probability - b.probability) > 0.2) { - return b.probability - a.probability; - } - // Otherwise, allow some randomization while keeping similar weights roughly together - return b.probability + b.randomFactor - (a.probability + a.randomFactor); - }) - .map(({ queue }) => queue); - - return shuffledWeightedQueues; - } - - async nextCandidateSelection( - parentQueue: string, - consumerId: string - ): Promise<{ range: QueueRange }> { - return { - range: this.nextRangeForParentQueue(parentQueue, consumerId), - }; - } - - #calculateQueueWeights(queues: QueueWithScores[]) { - const avgQueueSize = queues.reduce((acc, { size }) => acc + size, 0) / queues.length; - const avgMessageAge = queues.reduce((acc, { age }) => acc + age, 0) / queues.length; - - return queues.map(({ capacities, age, queue, size }) => { - let totalWeight = 1; - - if (size > avgQueueSize) { - totalWeight += Math.min(size / avgQueueSize, 4); - } - - if (age > avgMessageAge) { - totalWeight += Math.min(age / avgMessageAge, 4); - } - - return { - queue, - totalWeight: age, - }; - }); - } -} - -function filterQueuesAtCapacity(queues: QueueWithScores[]) { - return queues.filter( - (queue) => - queue.capacities.queue.current < queue.capacities.queue.limit && - queue.capacities.env.current < queue.capacities.env.limit && - queue.capacities.org.current < queue.capacities.org.limit - ); -} - -export class NoopWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { - nextCandidateSelection(parentQueue: string): Promise<{ range: QueueRange; selectionId: string }> { - return Promise.resolve({ range: { offset: 0, count: 0 }, selectionId: nanoid(24) }); - } - - distributeQueues(queues: Array): Array { - return []; - } - - moveToNextRange( - parentQueue: string, - consumerId: string, - currentRange: QueueRange, - queueSize: number - ): QueueRange { - return { offset: 0, count: 0 }; - } -} diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index 425a143773..e769a71206 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -1,31 +1,26 @@ import { z } from "zod"; import { type AuthenticatedEnvironment } from "~/services/apiAuth.server"; -export type QueueCapacity = { - current: number; - limit: number; -}; - -export type QueueCapacities = { - queue: QueueCapacity; - env: QueueCapacity; - org: QueueCapacity; -}; - -export type QueueWithScores = { - queue: string; - capacities: QueueCapacities; - age: number; - size: number; -}; - export type QueueRange = { offset: number; count: number }; export interface MarQSKeyProducer { queueConcurrencyLimitKey(env: AuthenticatedEnvironment, queue: string): string; + + envConcurrencyLimitKey(envId: string): string; envConcurrencyLimitKey(env: AuthenticatedEnvironment): string; + + orgConcurrencyLimitKey(orgId: string): string; orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string; + + orgCurrentConcurrencyKey(orgId: string): string; + orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; + + envCurrentConcurrencyKey(envId: string): string; + envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; + + queueKey(orgId: string, envId: string, queue: string, concurrencyKey?: string): string; queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string; + envQueueKey(env: AuthenticatedEnvironment): string; envSharedQueueKey(env: AuthenticatedEnvironment): string; sharedQueueKey(): string; @@ -38,42 +33,25 @@ export interface MarQSKeyProducer { queue: string, concurrencyKey?: string ): string; + disabledConcurrencyLimitKey(orgId: string): string; disabledConcurrencyLimitKeyFromQueue(queue: string): string; orgConcurrencyLimitKeyFromQueue(queue: string): string; orgCurrentConcurrencyKeyFromQueue(queue: string): string; envConcurrencyLimitKeyFromQueue(queue: string): string; envCurrentConcurrencyKeyFromQueue(queue: string): string; - orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; - envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; envQueueKeyFromQueue(queue: string): string; messageKey(messageId: string): string; nackCounterKey(messageId: string): string; stripKeyPrefix(key: string): string; + orgIdFromQueue(queue: string): string; + envIdFromQueue(queue: string): string; } -export type PriorityStrategyChoice = string | { abort: true }; - -export interface MarQSQueuePriorityStrategy { - /** - * This function is called to get the next candidate selection for the queue - * The `range` is used to select the set of queues that will be considered for the next selection (passed to chooseQueue) - * The `selectionId` is used to identify the selection and should be passed to chooseQueue - * - * @param parentQueue The parent queue that holds the candidate queues - * @param consumerId The consumerId that is making the request - * - * @returns The scores and the selectionId for the next candidate selection - */ - nextCandidateSelection(parentQueue: string, consumerId: string): Promise<{ range: QueueRange }>; - - distributeQueues(queues: Array): Array; - - moveToNextRange( +export interface MarQSFairDequeueStrategy { + distributeFairQueuesFromParentQueue( parentQueue: string, - consumerId: string, - currentRange: QueueRange, - queueSize: number - ): QueueRange; + consumerId: string + ): Promise>; } export const MessagePayload = z.object({ diff --git a/apps/webapp/app/v3/marqs/v2.server.ts b/apps/webapp/app/v3/marqs/v2.server.ts index f0ca2c62e0..ba637715ba 100644 --- a/apps/webapp/app/v3/marqs/v2.server.ts +++ b/apps/webapp/app/v3/marqs/v2.server.ts @@ -11,11 +11,10 @@ import { generateFriendlyId } from "../friendlyIdentifiers"; import { MarQS } from "./index.server"; import { MarQSShortKeyProducer } from "./marqsKeyProducer.server"; import { RequeueV2Message } from "./requeueV2Message.server"; -import { - NoopWeightedChoiceStrategy, - SimpleWeightedChoiceStrategy, -} from "./simpleWeightedPriorityStrategy.server"; import { VisibilityTimeoutStrategy } from "./types"; +import Redis from "ioredis"; +import { FairDequeuingStrategy, NoopFairDequeuingStrategy } from "./fairDequeuingStrategy.server"; +import { tracer } from "../tracer.server"; const KEY_PREFIX = "marqsv2:"; const SHARED_QUEUE_NAME = "sharedQueue"; @@ -67,23 +66,31 @@ function getMarQSClient() { ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }; + const redis = new Redis(redisOptions); + return new MarQS({ verbose: env.V2_MARQS_VERBOSE === "1", name: "marqsv2", tracer: trace.getTracer("marqsv2"), visibilityTimeoutStrategy: new V2VisibilityTimeout(), keysProducer: new MarQSV2KeyProducer(KEY_PREFIX), - queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ - queueSelectionCount: env.V2_MARQS_QUEUE_SELECTION_COUNT, + queuePriorityStrategy: new FairDequeuingStrategy({ + tracer, + redis, + parentQueueLimit: 100, + keys: new MarQSV2KeyProducer(KEY_PREFIX), + defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, + defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, + checkForDisabledOrgs: true, }), - envQueuePriorityStrategy: new NoopWeightedChoiceStrategy(), // We don't use this in v2, since all queues go through the shared queue + envQueuePriorityStrategy: new NoopFairDequeuingStrategy(), // We don't use this in v2, since all queues go through the shared queue workers: 0, - redis: redisOptions, - defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, // this is so we aren't limited by the environment concurrency - defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, + redis, visibilityTimeoutInMs: env.V2_MARQS_VISIBILITY_TIMEOUT_MS, // 15 minutes maximumNackCount: 10, enableRebalancing: false, + defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, + defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, }); } diff --git a/apps/webapp/app/v3/tracer.server.ts b/apps/webapp/app/v3/tracer.server.ts index 04db57a762..7b6db8240c 100644 --- a/apps/webapp/app/v3/tracer.server.ts +++ b/apps/webapp/app/v3/tracer.server.ts @@ -11,7 +11,10 @@ import { diag, trace, } from "@opentelemetry/api"; +import { logs, SeverityNumber } from "@opentelemetry/api-logs"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; +import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http"; +import { BatchLogRecordProcessor, LoggerProvider } from "@opentelemetry/sdk-logs"; import { type Instrumentation, registerInstrumentations } from "@opentelemetry/instrumentation"; import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express"; import { HttpInstrumentation } from "@opentelemetry/instrumentation-http"; @@ -32,6 +35,8 @@ import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { singleton } from "~/utils/singleton"; import { LoggerSpanExporter } from "./telemetry/loggerExporter.server"; +import { logger } from "~/services/logger.server"; +import { flattenAttributes } from "@trigger.dev/core/v3"; export const SEMINTATTRS_FORCE_RECORDING = "forceRecording"; @@ -77,7 +82,7 @@ class CustomWebappSampler implements Sampler { } } -export const tracer = singleton("tracer", getTracer); +export const { tracer, logger: otelLogger } = singleton("tracer", getTracer); export async function startActiveSpan( name: string, @@ -96,7 +101,12 @@ export async function startActiveSpan( span.recordException(new Error(String(error))); } - span.setStatus({ code: SpanStatusCode.ERROR }); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + + logger.debug(`Error in span: ${name}`, { error }); throw error; } finally { @@ -105,11 +115,46 @@ export async function startActiveSpan( }); } +export async function emitDebugLog(message: string, params: Record = {}) { + otelLogger.emit({ + severityNumber: SeverityNumber.DEBUG, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + +export async function emitInfoLog(message: string, params: Record = {}) { + otelLogger.emit({ + severityNumber: SeverityNumber.INFO, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + +export async function emitErrorLog(message: string, params: Record = {}) { + otelLogger.emit({ + severityNumber: SeverityNumber.ERROR, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + +export async function emitWarnLog(message: string, params: Record = {}) { + otelLogger.emit({ + severityNumber: SeverityNumber.WARN, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + function getTracer() { if (env.INTERNAL_OTEL_TRACE_DISABLED === "1") { console.log(`🔦 Tracer disabled, returning a noop tracer`); - return trace.getTracer("trigger.dev", "3.0.0.dp.1"); + return { + tracer: trace.getTracer("trigger.dev", "3.3.12"), + logger: logs.getLogger("trigger.dev", "3.3.12"), + }; } diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.ERROR); @@ -160,6 +205,40 @@ function getTracer() { } } + if (env.INTERNAL_OTEL_LOG_EXPORTER_URL) { + const headers = parseInternalTraceHeaders() ?? {}; + + const logExporter = new OTLPLogExporter({ + url: env.INTERNAL_OTEL_LOG_EXPORTER_URL, + timeoutMillis: 15_000, + headers, + }); + + const loggerProvider = new LoggerProvider({ + resource: new Resource({ + [SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME, + }), + logRecordLimits: { + attributeCountLimit: 1000, + }, + }); + + loggerProvider.addLogRecordProcessor( + new BatchLogRecordProcessor(logExporter, { + maxExportBatchSize: 64, + scheduledDelayMillis: 200, + exportTimeoutMillis: 30000, + maxQueueSize: 512, + }) + ); + + logs.setGlobalLoggerProvider(loggerProvider); + + console.log( + `🔦 Tracer: OTLP log exporter enabled to ${env.INTERNAL_OTEL_LOG_EXPORTER_URL} (sampling = ${samplingRate})` + ); + } + provider.register(); let instrumentations: Instrumentation[] = [ @@ -173,10 +252,14 @@ function getTracer() { registerInstrumentations({ tracerProvider: provider, + loggerProvider: logs.getLoggerProvider(), instrumentations, }); - return provider.getTracer("trigger.dev", "3.0.0.dp.1"); + return { + tracer: provider.getTracer("trigger.dev", "3.3.12"), + logger: logs.getLogger("trigger.dev", "3.3.12"), + }; } const SemanticEnvResources = { diff --git a/apps/webapp/app/v3/tracing.server.ts b/apps/webapp/app/v3/tracing.server.ts new file mode 100644 index 0000000000..16359c9dd4 --- /dev/null +++ b/apps/webapp/app/v3/tracing.server.ts @@ -0,0 +1,81 @@ +import { Span, SpanOptions, SpanStatusCode, Tracer } from "@opentelemetry/api"; +import { Logger, SeverityNumber } from "@opentelemetry/api-logs"; +import { flattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes"; + +export async function startSpan( + tracer: Tracer, + name: string, + fn: (span: Span) => Promise, + options?: SpanOptions +): Promise { + return tracer.startActiveSpan(name, options ?? {}, async (span) => { + try { + return await fn(span); + } catch (error) { + if (error instanceof Error) { + span.recordException(error); + } else if (typeof error === "string") { + span.recordException(new Error(error)); + } else { + span.recordException(new Error(String(error))); + } + + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + + throw error; + } finally { + span.end(); + } + }); +} + +export async function emitDebugLog( + logger: Logger, + message: string, + params: Record = {} +) { + logger.emit({ + severityNumber: SeverityNumber.DEBUG, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + +export async function emitInfoLog( + logger: Logger, + message: string, + params: Record = {} +) { + logger.emit({ + severityNumber: SeverityNumber.INFO, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + +export async function emitErrorLog( + logger: Logger, + message: string, + params: Record = {} +) { + logger.emit({ + severityNumber: SeverityNumber.ERROR, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} + +export async function emitWarnLog( + logger: Logger, + message: string, + params: Record = {} +) { + logger.emit({ + severityNumber: SeverityNumber.WARN, + body: message, + attributes: { ...flattenAttributes(params, "params") }, + }); +} diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 6299d7709d..f7f95c831e 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -65,6 +65,7 @@ "@opentelemetry/sdk-trace-base": "1.25.1", "@opentelemetry/sdk-trace-node": "1.25.1", "@opentelemetry/semantic-conventions": "1.25.1", + "@opentelemetry/api-logs": "0.52.1", "@popperjs/core": "^2.11.8", "@prisma/instrumentation": "^5.11.0", "@radix-ui/react-alert-dialog": "^1.0.4", diff --git a/apps/webapp/test/fairDequeuingStrategy.test.ts b/apps/webapp/test/fairDequeuingStrategy.test.ts new file mode 100644 index 0000000000..466c466d0a --- /dev/null +++ b/apps/webapp/test/fairDequeuingStrategy.test.ts @@ -0,0 +1,692 @@ +import { redisTest } from "@internal/testcontainers"; +import { describe, expect, vi } from "vitest"; +import { FairDequeuingStrategy } from "../app/v3/marqs/fairDequeuingStrategy.server.js"; +import { + calculateStandardDeviation, + createKeyProducer, + setupConcurrency, + setupQueue, +} from "./utils/marqs.js"; +import { trace } from "@opentelemetry/api"; + +const tracer = trace.getTracer("test"); + +vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout + +describe("FairDequeuingStrategy", () => { + redisTest("should distribute a single queue from a single org/env", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "test-seed-1", // for deterministic shuffling + }); + + // Setup a single queue + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: Date.now() - 1000, // 1 second ago + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); + + expect(result).toHaveLength(1); + expect(result[0]).toBe("org:org-1:env:env-1:queue:queue-1"); + }); + + redisTest("should respect org concurrency limits", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 2, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "test-seed-2", + }); + + // Setup queue + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: Date.now() - 1000, + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + // Set org-1 to be at its concurrency limit + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 2, limit: 2 }, + env: { id: "env-1", currentConcurrency: 0 }, + }); + + const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); + expect(result).toHaveLength(0); + }); + + redisTest("should respect env concurrency limits", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 2, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "test-seed-3", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: Date.now() - 1000, + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 0 }, + env: { id: "env-1", currentConcurrency: 2, limit: 2 }, + }); + + const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); + expect(result).toHaveLength(0); + }); + + redisTest("should handle disabled orgs", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "test-seed-4", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: Date.now() - 1000, + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 0, isDisabled: true }, + env: { id: "env-1", currentConcurrency: 0 }, + }); + + const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); + expect(result).toHaveLength(0); + }); + + redisTest("should respect parentQueueLimit", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 2, // Only take 2 queues + checkForDisabledOrgs: true, + seed: "test-seed-6", + }); + + const now = Date.now(); + + // Setup 3 queues but parentQueueLimit is 2 + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 3000, + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 2000, + queueId: "queue-2", + orgId: "org-1", + envId: "env-1", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: "queue-3", + orgId: "org-1", + envId: "env-1", + }); + + const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); + + expect(result).toHaveLength(2); + // Should only get the two oldest queues + const queue1 = keyProducer.queueKey("org-1", "env-1", "queue-1"); + const queue2 = keyProducer.queueKey("org-1", "env-1", "queue-2"); + expect(result).toEqual([queue1, queue2]); + }); + + redisTest("should fairly distribute queues across environments over time", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "test-seed-5", + }); + + const now = Date.now(); + + // Test configuration + const orgs = ["org-1", "org-2", "org-3"]; + const envsPerOrg = 3; // Each org has 3 environments + const queuesPerEnv = 5; // Each env has 5 queues + const iterations = 1000; + + // Setup queues + for (const orgId of orgs) { + for (let envNum = 1; envNum <= envsPerOrg; envNum++) { + const envId = `env-${orgId}-${envNum}`; + + for (let queueNum = 1; queueNum <= queuesPerEnv; queueNum++) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + // Vary the ages slightly + score: now - Math.random() * 10000, + queueId: `queue-${orgId}-${envId}-${queueNum}`, + orgId, + envId, + }); + } + + // Setup reasonable concurrency limits + await setupConcurrency({ + redis, + keyProducer, + org: { id: orgId, currentConcurrency: 2, limit: 10 }, + env: { id: envId, currentConcurrency: 1, limit: 5 }, + }); + } + } + + // Track distribution statistics + type PositionStats = { + firstPosition: number; // Count of times this env/org was first + positionSums: number; // Sum of positions (for averaging) + appearances: number; // Total number of appearances + }; + + const envStats: Record = {}; + const orgStats: Record = {}; + + // Initialize stats objects + for (const orgId of orgs) { + orgStats[orgId] = { firstPosition: 0, positionSums: 0, appearances: 0 }; + for (let envNum = 1; envNum <= envsPerOrg; envNum++) { + const envId = `env-${orgId}-${envNum}`; + envStats[envId] = { firstPosition: 0, positionSums: 0, appearances: 0 }; + } + } + + // Run multiple iterations + for (let i = 0; i < iterations; i++) { + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i % 3}` // Simulate 3 different consumers + ); + + // Track positions of queues + result.forEach((queueId, position) => { + const orgId = keyProducer.orgIdFromQueue(queueId); + const envId = keyProducer.envIdFromQueue(queueId); + + // Update org stats + orgStats[orgId].appearances++; + orgStats[orgId].positionSums += position; + if (position === 0) orgStats[orgId].firstPosition++; + + // Update env stats + envStats[envId].appearances++; + envStats[envId].positionSums += position; + if (position === 0) envStats[envId].firstPosition++; + }); + } + + // Calculate and log statistics + console.log("\nOrganization Statistics:"); + for (const [orgId, stats] of Object.entries(orgStats)) { + const avgPosition = stats.positionSums / stats.appearances; + const firstPositionPercentage = (stats.firstPosition / iterations) * 100; + console.log(`${orgId}: + First Position: ${firstPositionPercentage.toFixed(2)}% + Average Position: ${avgPosition.toFixed(2)} + Total Appearances: ${stats.appearances}`); + } + + console.log("\nEnvironment Statistics:"); + for (const [envId, stats] of Object.entries(envStats)) { + const avgPosition = stats.positionSums / stats.appearances; + const firstPositionPercentage = (stats.firstPosition / iterations) * 100; + console.log(`${envId}: + First Position: ${firstPositionPercentage.toFixed(2)}% + Average Position: ${avgPosition.toFixed(2)} + Total Appearances: ${stats.appearances}`); + } + + // Verify fairness of first position distribution + const expectedFirstPositionPercentage = 100 / orgs.length; + const firstPositionStdDevOrgs = calculateStandardDeviation( + Object.values(orgStats).map((stats) => (stats.firstPosition / iterations) * 100) + ); + + const expectedEnvFirstPositionPercentage = 100 / (orgs.length * envsPerOrg); + const firstPositionStdDevEnvs = calculateStandardDeviation( + Object.values(envStats).map((stats) => (stats.firstPosition / iterations) * 100) + ); + + // Assert reasonable fairness for first position + expect(firstPositionStdDevOrgs).toBeLessThan(5); // Allow 5% standard deviation for orgs + expect(firstPositionStdDevEnvs).toBeLessThan(5); // Allow 5% standard deviation for envs + + // Verify that each org and env gets a fair chance at first position + for (const [orgId, stats] of Object.entries(orgStats)) { + const firstPositionPercentage = (stats.firstPosition / iterations) * 100; + expect(firstPositionPercentage).toBeGreaterThan(expectedFirstPositionPercentage * 0.7); // Within 30% of expected + expect(firstPositionPercentage).toBeLessThan(expectedFirstPositionPercentage * 1.3); + } + + for (const [envId, stats] of Object.entries(envStats)) { + const firstPositionPercentage = (stats.firstPosition / iterations) * 100; + expect(firstPositionPercentage).toBeGreaterThan(expectedEnvFirstPositionPercentage * 0.7); // Within 30% of expected + expect(firstPositionPercentage).toBeLessThan(expectedEnvFirstPositionPercentage * 1.3); + } + + // Verify average positions are reasonably distributed + const avgPositionsOrgs = Object.values(orgStats).map( + (stats) => stats.positionSums / stats.appearances + ); + const avgPositionsEnvs = Object.values(envStats).map( + (stats) => stats.positionSums / stats.appearances + ); + + const avgPositionStdDevOrgs = calculateStandardDeviation(avgPositionsOrgs); + const avgPositionStdDevEnvs = calculateStandardDeviation(avgPositionsEnvs); + + expect(avgPositionStdDevOrgs).toBeLessThan(1); // Average positions should be fairly consistent + expect(avgPositionStdDevEnvs).toBeLessThan(1); + }); + + redisTest( + "should shuffle environments while maintaining age order within environments", + async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "fixed-seed", + }); + + const now = Date.now(); + + // Setup three environments, each with two queues of different ages + await Promise.all([ + // env-1: one old queue (3000ms old) and one new queue (1000ms old) + setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 3000, + queueId: "queue-1-old", + orgId: "org-1", + envId: "env-1", + }), + setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: "queue-1-new", + orgId: "org-1", + envId: "env-1", + }), + + // env-2: same pattern + setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 3000, + queueId: "queue-2-old", + orgId: "org-1", + envId: "env-2", + }), + setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: "queue-2-new", + orgId: "org-1", + envId: "env-2", + }), + ]); + + // Setup basic concurrency settings + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 0, limit: 10 }, + env: { id: "env-1", currentConcurrency: 0, limit: 5 }, + }); + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 0, limit: 10 }, + env: { id: "env-2", currentConcurrency: 0, limit: 5 }, + }); + + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + // Group queues by environment + const queuesByEnv = result.reduce((acc, queueId) => { + const envId = keyProducer.envIdFromQueue(queueId); + if (!acc[envId]) { + acc[envId] = []; + } + acc[envId].push(queueId); + return acc; + }, {} as Record); + + // Verify that: + // 1. We got all queues + expect(result).toHaveLength(4); + + // 2. Queues are grouped by environment + for (const envQueues of Object.values(queuesByEnv)) { + expect(envQueues).toHaveLength(2); + + // 3. Within each environment, older queue comes before newer queue + const [firstQueue, secondQueue] = envQueues; + expect(firstQueue).toContain("old"); + expect(secondQueue).toContain("new"); + } + } + ); + + redisTest( + "should bias shuffling based on concurrency limits and available capacity", + async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const now = Date.now(); + + // Setup three environments with different concurrency settings + const envSetups = [ + { + envId: "env-1", + limit: 100, + current: 20, // Lots of available capacity + queueCount: 3, + }, + { + envId: "env-2", + limit: 50, + current: 40, // Less available capacity + queueCount: 3, + }, + { + envId: "env-3", + limit: 10, + current: 5, // Some available capacity + queueCount: 3, + }, + ]; + + // Setup queues and concurrency for each environment + for (const setup of envSetups) { + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 0, limit: 200 }, + env: { + id: setup.envId, + currentConcurrency: setup.current, + limit: setup.limit, + }, + }); + + for (let i = 0; i < setup.queueCount; i++) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000 * (i + 1), + queueId: `queue-${i}`, + orgId: "org-1", + envId: setup.envId, + }); + } + } + + // Create multiple strategies with different seeds + const numStrategies = 5; + const strategies = Array.from( + { length: numStrategies }, + (_, i) => + new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: `test-seed-${i}`, + biases: { + concurrencyLimitBias: 0.8, + availableCapacityBias: 0.5, + queueAgeRandomization: 0.0, + }, + }) + ); + + // Run iterations across all strategies + const iterationsPerStrategy = 100; + const allResults: Record[] = []; + + for (const strategy of strategies) { + const firstPositionCounts: Record = {}; + + for (let i = 0; i < iterationsPerStrategy; i++) { + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i % 3}` + ); + + expect(result.length).toBeGreaterThan(0); + + const firstEnv = keyProducer.envIdFromQueue(result[0]); + firstPositionCounts[firstEnv] = (firstPositionCounts[firstEnv] || 0) + 1; + } + + allResults.push(firstPositionCounts); + } + + // Calculate average distributions across all strategies + const avgDistribution: Record = {}; + const envIds = ["env-1", "env-2", "env-3"]; + + for (const envId of envIds) { + const sum = allResults.reduce((acc, result) => acc + (result[envId] || 0), 0); + avgDistribution[envId] = sum / numStrategies; + } + + // Log individual strategy results and the average + console.log("\nResults by strategy:"); + allResults.forEach((result, i) => { + console.log(`Strategy ${i + 1}:`, result); + }); + + console.log("\nAverage distribution:", avgDistribution); + + // Calculate percentages from average distribution + const totalCount = Object.values(avgDistribution).reduce((sum, count) => sum + count, 0); + const highLimitPercentage = (avgDistribution["env-1"] / totalCount) * 100; + const lowLimitPercentage = (avgDistribution["env-3"] / totalCount) * 100; + + console.log("\nPercentages:"); + console.log("High limit percentage:", highLimitPercentage); + console.log("Low limit percentage:", lowLimitPercentage); + + // Verify distribution across all strategies + expect(highLimitPercentage).toBeLessThan(60); + expect(lowLimitPercentage).toBeGreaterThan(10); + expect(highLimitPercentage).toBeGreaterThan(lowLimitPercentage); + } + ); + + redisTest("should respect ageInfluence parameter for queue ordering", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const now = Date.now(); + + // Setup queues with different ages in the same environment + const queueAges = [ + { id: "queue-1", age: 5000 }, // oldest + { id: "queue-2", age: 3000 }, + { id: "queue-3", age: 1000 }, // newest + ]; + + // Helper function to run iterations with a specific age influence + async function runWithQueueAgeRandomization(queueAgeRandomization: number) { + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "fixed-seed", + biases: { + concurrencyLimitBias: 0, + availableCapacityBias: 0, + queueAgeRandomization, + }, + }); + + const positionCounts: Record = { + "queue-1": [0, 0, 0], + "queue-2": [0, 0, 0], + "queue-3": [0, 0, 0], + }; + + const iterations = 1000; + for (let i = 0; i < iterations; i++) { + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + result.forEach((queueId, position) => { + const baseQueueId = queueId.split(":").pop()!; + positionCounts[baseQueueId][position]++; + }); + } + + return positionCounts; + } + + // Setup test data + for (const { id, age } of queueAges) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - age, + queueId: id, + orgId: "org-1", + envId: "env-1", + }); + } + + await setupConcurrency({ + redis, + keyProducer, + org: { id: "org-1", currentConcurrency: 0, limit: 10 }, + env: { id: "env-1", currentConcurrency: 0, limit: 5 }, + }); + + // Test with different age influence values + const strictAge = await runWithQueueAgeRandomization(0); // Strict age-based ordering + const mixed = await runWithQueueAgeRandomization(0.5); // Mix of age and random + const fullyRandom = await runWithQueueAgeRandomization(1); // Completely random + + console.log("Distribution with strict age ordering (0.0):", strictAge); + console.log("Distribution with mixed ordering (0.5):", mixed); + console.log("Distribution with random ordering (1.0):", fullyRandom); + + // With strict age ordering (0.0), oldest should always be first + expect(strictAge["queue-1"][0]).toBe(1000); // Always in first position + expect(strictAge["queue-3"][0]).toBe(0); // Never in first position + + // With fully random (1.0), positions should still allow for some age bias + const randomFirstPositionSpread = Math.abs( + fullyRandom["queue-1"][0] - fullyRandom["queue-3"][0] + ); + expect(randomFirstPositionSpread).toBeLessThan(200); // Allow for larger spread in distribution + + // With mixed (0.5), should show preference for age but not absolute + expect(mixed["queue-1"][0]).toBeGreaterThan(mixed["queue-3"][0]); // Older preferred + expect(mixed["queue-3"][0]).toBeGreaterThan(0); // But newer still gets chances + }); +}); diff --git a/apps/webapp/test/utils/marqs.ts b/apps/webapp/test/utils/marqs.ts new file mode 100644 index 0000000000..246ce413d3 --- /dev/null +++ b/apps/webapp/test/utils/marqs.ts @@ -0,0 +1,126 @@ +import { MarQSKeyProducer } from "~/v3/marqs/types"; +import { MarQSShortKeyProducer } from "~/v3/marqs/marqsKeyProducer.server.js"; +import Redis from "ioredis"; + +export function createKeyProducer(prefix: string): MarQSKeyProducer { + return new MarQSShortKeyProducer(prefix); +} + +export type SetupQueueOptions = { + parentQueue: string; + redis: Redis; + score: number; + queueId: string; + orgId: string; + envId: string; + keyProducer: MarQSKeyProducer; +}; + +export type ConcurrencySetupOptions = { + keyProducer: MarQSKeyProducer; + redis: Redis; + orgId: string; + envId: string; + currentConcurrency?: number; + orgLimit?: number; + envLimit?: number; + isOrgDisabled?: boolean; +}; + +/** + * Adds a queue to Redis with the given parameters + */ +export async function setupQueue({ + redis, + keyProducer, + parentQueue, + score, + queueId, + orgId, + envId, +}: SetupQueueOptions) { + // Add the queue to the parent queue's sorted set + const queue = keyProducer.queueKey(orgId, envId, queueId); + + await redis.zadd(parentQueue, score, queue); +} + +type SetupConcurrencyOptions = { + redis: Redis; + keyProducer: MarQSKeyProducer; + org: { id: string; currentConcurrency: number; limit?: number; isDisabled?: boolean }; + env: { id: string; currentConcurrency: number; limit?: number }; +}; + +/** + * Sets up concurrency-related Redis keys for orgs and envs + */ +export async function setupConcurrency({ redis, keyProducer, org, env }: SetupConcurrencyOptions) { + // Set org concurrency limit if provided + if (typeof org.limit === "number") { + await redis.set(keyProducer.orgConcurrencyLimitKey(org.id), org.limit.toString()); + } + + if (org.currentConcurrency > 0) { + // Set current concurrency by adding dummy members to the set + const orgCurrentKey = keyProducer.orgCurrentConcurrencyKey(org.id); + + // Add dummy running job IDs to simulate current concurrency + const dummyJobs = Array.from( + { length: org.currentConcurrency }, + (_, i) => `dummy-job-${i}-${Date.now()}` + ); + + await redis.sadd(orgCurrentKey, ...dummyJobs); + } + + if (org.isDisabled) { + await redis.set(keyProducer.disabledConcurrencyLimitKey(org.id), "1"); + } + + // Set env concurrency limit + if (typeof env.limit === "number") { + await redis.set(keyProducer.envConcurrencyLimitKey(env.id), env.limit.toString()); + } + + if (env.currentConcurrency > 0) { + // Set current concurrency by adding dummy members to the set + const envCurrentKey = keyProducer.envCurrentConcurrencyKey(env.id); + + // Add dummy running job IDs to simulate current concurrency + const dummyJobs = Array.from( + { length: env.currentConcurrency }, + (_, i) => `dummy-job-${i}-${Date.now()}` + ); + + await redis.sadd(envCurrentKey, ...dummyJobs); + } +} + +/** + * Calculates the standard deviation of a set of numbers. + * Standard deviation measures the amount of variation of a set of values from their mean. + * A low standard deviation indicates that the values tend to be close to the mean. + * + * @param values Array of numbers to calculate standard deviation for + * @returns The standard deviation of the values + */ +export function calculateStandardDeviation(values: number[]): number { + // If there are no values or only one value, the standard deviation is 0 + if (values.length <= 1) { + return 0; + } + + // Calculate the mean (average) of the values + const mean = values.reduce((sum, value) => sum + value, 0) / values.length; + + // Calculate the sum of squared differences from the mean + const squaredDifferences = values.map((value) => Math.pow(value - mean, 2)); + const sumOfSquaredDifferences = squaredDifferences.reduce((sum, value) => sum + value, 0); + + // Calculate the variance (average of squared differences) + const variance = sumOfSquaredDifferences / (values.length - 1); // Using n-1 for sample standard deviation + + // Standard deviation is the square root of the variance + return Math.sqrt(variance); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7e67336f7a..1c578c9300 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -252,6 +252,9 @@ importers: '@opentelemetry/api': specifier: 1.9.0 version: 1.9.0 + '@opentelemetry/api-logs': + specifier: 0.52.1 + version: 0.52.1 '@opentelemetry/core': specifier: 1.25.1 version: 1.25.1(@opentelemetry/api@1.9.0) diff --git a/references/v3-catalog/src/trigger/batch.ts b/references/v3-catalog/src/trigger/batch.ts index 3f463a45bd..056589e76a 100644 --- a/references/v3-catalog/src/trigger/batch.ts +++ b/references/v3-catalog/src/trigger/batch.ts @@ -298,7 +298,13 @@ export const batchV2TestTask = task({ retry: { maxAttempts: 1, }, - run: async ({ triggerSequentially }: { triggerSequentially?: boolean }) => { + run: async ({ + triggerSequentially, + largeBatchSize = 21, + }: { + triggerSequentially?: boolean; + largeBatchSize?: number; + }) => { // First lets try triggering with too many items try { await tasks.batchTrigger( @@ -604,7 +610,7 @@ export const batchV2TestTask = task({ // Now batchTriggerAndWait with 21 items const response10 = await batchV2TestChild.batchTriggerAndWait( - Array.from({ length: 21 }, (_, i) => ({ + Array.from({ length: largeBatchSize }, (_, i) => ({ payload: { foo: `bar${i}` }, })), { @@ -615,7 +621,7 @@ export const batchV2TestTask = task({ logger.debug("Response 10", { response10 }); assert.match(response10.id, /^batch_[a-z0-9]{21}$/, "response10: Batch ID is invalid"); - assert.equal(response10.runs.length, 21, "response10: Items length is invalid"); + assert.equal(response10.runs.length, largeBatchSize, "response10: Items length is invalid"); // Now repeat the first few tests using `tasks.batchTrigger`: const response11 = await tasks.batchTrigger(