-
-
Notifications
You must be signed in to change notification settings - Fork 953
perf(runs-replication): Improve the CPU efficiency and throughput of the runs replication to clickhouse #2866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (1)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the WalkthroughAdds a RunsReplicationService performance profiling harness (harness coordinator, producer/consumer processes, runners, data generator, metrics collector, process managers, CLI profiling script, README, and MockClickHouse). Introduces compact array formats and types for ClickHouse task run and payload inserts (TASK_RUN_COLUMNS/TASK_RUN_INDEX, PAYLOAD_COLUMNS/PAYLOAD_INDEX, TaskRunInsertArray, PayloadInsertArray) and compact insert APIs across the ClickHouse client, noop, types, and taskRuns modules. Extends pgoutput parsing with array-based message parsing and updates service code and tests to use the new array representations. Adds Clinic devDependencies and .gitignore rules for profiling outputs. Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: perf(runs-replication) - CPU efficiency and throughput improvementsOverviewThis PR introduces performance optimizations for the runs replication service by switching from object-based data representation to compact array format for ClickHouse inserts. It also adds a comprehensive performance test harness for profiling. Code Quality and Best PracticesPositives:
Concerns:
Potential Bugs or Issues
Performance ConsiderationsPositives:
Suggestions:
Security ConcernsNo security issues identified. The changes are internal data representation optimizations without external input handling changes. Test CoverageConcerns:
SummaryThis PR delivers meaningful performance improvements through a compact array format for ClickHouse inserts. The main concerns are:
Recommendations:
Overall, the performance optimization approach is sound, but the implementation could benefit from better maintainability safeguards. |
Review CompleteYour review story is ready! Comment !reviewfast on this PR to re-generate the story. |
| } | ||
|
|
||
| console.log("Stopping producer process"); | ||
| this.process.send({ type: "shutdown" }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProducerProcessManager.stop() missing try/catch around IPC send
Medium Severity
The ProducerProcessManager.stop() method calls this.process.send({ type: "shutdown" }) without a try/catch block. If the producer process has already crashed or exited, the IPC channel will be closed and this call will throw an error, causing the harness teardown to fail. The ConsumerProcessManager.stop() method correctly wraps the same call in try/catch (lines 121-125), but ProducerProcessManager doesn't follow the same pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 18
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal-packages/clickhouse/src/taskRuns.ts (1)
55-115: Guard against column-order drift between TASK_RUN_COLUMNS and row builders.While
insertCompactRawusesJSONCompactEachRowWithNamesformat (name-based matching, not position-based), the fragile synchronization betweenTASK_RUN_COLUMNS(line 56) and#prepareTaskRunInsertcreates maintenance risk. If these drift apart—e.g., during future refactoring or schema changes—misaligned arrays will silently insert data into wrong columns.Recommended mitigations:
- Add a compile-time guard using TypeScript
satisfiessoTASK_RUN_COLUMNSonly accepts validTaskRunV2keys and stays in-sync when fields are renamed.- Centralize row building in this module with a single source of truth (e.g.,
toTaskRunV2Row()) so callers don't re-implement ordering separately.
🤖 Fix all issues with AI agents
In @apps/webapp/app/services/runsReplicationService.server.ts:
- Around line 579-598: The sort comparator in runsReplicationService.server.ts
uses hardcoded numeric indices (e.g., a[1], a[2], a[5]) which tightly couples it
to the column order from #prepareTaskRunInsert; define a clear constant map
(e.g., TASK_RUN_COLUMNS with ENVIRONMENT_ID, ORGANIZATION_ID, PROJECT_ID,
RUN_ID, CREATED_AT) at module level or import from a shared place and replace
all numeric indices in the .sort comparator with TASK_RUN_COLUMNS.<NAME> lookups
so the comparator uses named indices and remains correct if column order
changes.
In @apps/webapp/scripts/profile-runs-replication.ts:
- Around line 95-105: The local constant timestamp declared as "const timestamp
= ..." is unused and triggers noUnusedLocals; remove that declaration (leave
timeWithSeconds, runFolder, outputDir and the assignments to
config.profiling.outputDir and config.output.metricsFile untouched) so the
function returns config without the unused variable.
In @apps/webapp/test/performance/clickhouse-mock.ts:
- Around line 1-53: MockClickHouse.taskRuns is missing insertCompactArrays and
insertPayloadsCompactArrays methods required by RunsReplicationService; add both
methods to the taskRuns object with signatures (compactRows: readonly any[][],
options?: any) and (compactPayloads: readonly any[][], options?: any)
respectively, implement the same delay handling as insert/insertPayloads,
increment the appropriate counters using compactRows.length /
compactPayloads.length, and return the same Promise<[Error | null, { rows:
number } | null]> shape as the existing insert methods.
In @apps/webapp/test/performance/consumer-runner.ts:
- Around line 9-23: The code in main() blindly calls
JSON.parse(process.env.CONSUMER_CONFIG!) which can throw a cryptic error or
parse undefined; replace this with a fast-fail validation: verify
process.env.CONSUMER_CONFIG exists, parse it inside a try/catch, and validate
the result with a zod schema (e.g., ConsumerConfigSchema.parse) to produce a
clear, structured error message before exiting or rethrowing; update the
ConsumerConfig usage (the const config assignment near shutdownFilePath
creation) to use the validated object and ensure processLogger or console.error
logs the validation error and calls process.exit(1).
In @apps/webapp/test/performance/consumer.ts:
- Around line 280-284: The exit listener on this.process uses
this.process?.on("exit", ...) which can accumulate orphaned listeners; change it
to use this.process?.once("exit", ...) so the handler runs only once, preventing
listener buildup—update the listener registration that currently calls
clearTimeout(timeout) and resolve() to use once instead of on.
- Around line 265-288: The producer stop() method can throw if IPC send fails
because the child process already exited; update stop() (the async stop method
in the producer class) to guard and handle send failures by first checking
this.process?.connected (or this.process) before calling this.process.send and
wrapping the send in a try/catch; on error simply log/warn the failure to send
shutdown and continue to the timeout/exit-wait logic so the function always
resolves and cleans up this.process and this.ready.
- Around line 129-144: In stop(), the "exit" listener uses this.process?.on
which can accumulate if stop() is called multiple times; change it to
this.process?.once("exit", ...) so the handler is removed after it runs (and
keep the existing clearTimeout(timeout) and resolve() behavior), or
alternatively attach a named handler and call
this.process?.removeListener("exit", handler) before resolving to ensure the
listener is not left registered.
In @apps/webapp/test/performance/data-generator.ts:
- Around line 6-85: Change the exported DataGeneratorOptions interface into a
type alias and make payloadType deterministic before payload creation: in
TaskRunDataGenerator.generateInsert decide payloadType first (using
this.options.includeComplexPayloads to influence the decision), then call
generatePayload with that payloadType (update generatePayload signature to
accept payloadType or a boolean indicating whether complex payloads are allowed)
so the generated payload shape matches the serializer (superjson vs
JSON.stringify). Update generatePayload implementation to produce only
JSON-serializable values when payloadType is "application/json" and to allow
BigInt/Map/Set when payloadType is "application/super+json"; update all
references to the old generatePayload signature accordingly.
In @apps/webapp/test/performance/harness.ts:
- Around line 457-471: The parseRedisUrl function uses a fragile regex that
misses usernames, missing ports, and TLS scheme; replace its implementation to
use the Node.js URL parser inside parseRedisUrl: construct new URL(url) in a
try/catch, extract hostname -> host (fallback "localhost"), parsed.port -> port
(default 6379 if empty), parsed.password -> password (or undefined),
parsed.username -> username (or undefined), and accept both redis:// and
rediss:// schemes; on parse failure return the default { host: "localhost",
port: 6379 } to preserve existing behavior.
- Around line 350-358: The SQL uses unescaped identifiers from config
(config.consumer.publicationName and config.consumer.slotName) interpolated into
this.prisma!.$executeRawUnsafe, creating SQL injection risk; add a strict
identifier validator (e.g., allow only [A-Za-z_][A-Za-z0-9_]* or use a safe
identifier-quoting helper) and run it on publicationName and slotName before
building the DROP/CREATE SQL, throwing an error if validation fails, then use
the validated/quoted identifier values when calling
this.prisma!.$executeRawUnsafe (or switch to a safe identifier-quoting utility
such as pg-format's ident) to ensure only safe names are used.
- Line 243: The CREATE DATABASE call interpolates dbName directly into
adminPrisma.$executeRawUnsafe which is an SQL injection risk; fix by
validating/sanitizing dbName before use (e.g. assert it matches a strict
whitelist regex like /^[A-Za-z0-9_]+$/ and throw if it fails), then use the
validated value in the CREATE DATABASE statement (or safely quote an identifier
after validation) so the database name cannot contain SQL control characters;
update the code around adminPrisma.$executeRawUnsafe and the dbName assignment
in harness.ts accordingly.
In @apps/webapp/test/performance/producer-runner.ts:
- Line 14: The config is being JSON.parsed without runtime validation; define a
Zod schema (e.g., producerConfigSchema) that matches the ProducerConfig shape
and use producerConfigSchema.parse or safeParse to validate the parsed value
from process.env.PRODUCER_CONFIG before assigning to config, handle missing or
invalid PRODUCER_CONFIG by throwing or exiting with a clear error, and replace
the direct JSON.parse assignment (const config: ProducerConfig =
JSON.parse(process.env.PRODUCER_CONFIG!);) with the validated result to ensure
runtime safety.
In @apps/webapp/test/performance/producer.ts:
- Around line 5-45: The stop() method currently clears the timeout but does not
resolve the awaited promise in runProducerLoop(), so awaiting sleep can hang;
make the sleep used by runProducerLoop() interruptible by wiring the existing
this.timer and a resolver: implement a class-bound sleep that sets this.timer =
setTimeout(...) and returns a Promise whose resolve is stored (e.g.,
this.timerResolver), and update stop() to clearTimeout(this.timer), null the
timer, and call the stored resolver so the sleep promise always resolves; update
runProducerLoop() to use this new interruptible sleep and ensure
timer/timerResolver are cleared after resolution.
In @internal-packages/clickhouse/src/client/client.ts:
- Around line 819-824: The heuristic using isSingleRow = events.length > 0 &&
!Array.isArray(events[0]) can misclassify when the first element itself is an
array; update the detection to explicitly check the shape of the whole input
(e.g., treat as multiple rows only if events is an array and every element is an
array via events.every(Array.isArray)), or add an explicit parameter (e.g.,
forceSingleRow or isBatch) so the caller declares single vs multiple rows; then
compute eventsArray from that reliable signal (refer to isSingleRow and
eventsArray) and adjust callers accordingly.
In @internal-packages/replication/src/pgoutput.ts:
- Around line 468-500: The switch in readTupleAsArray declares consts (e.g.,
bsize, valsize, valbuf, valtext) directly in case clauses which violates Biome's
noSwitchDeclarations; fix by wrapping each case body in its own block { ... } so
the const declarations are scoped correctly (apply to cases for 0x62, 0x74,
0x6e, 0x75 and default), keeping existing logic for reading via reader, calling
parser from columns[i], and using unchangedToastFallback?.[i].
- Around line 23-33: The array-style message variants (MessageUpdateArray,
MessageDeleteArray) currently receive full-width arrays while object-style
variants use readKeyTuple() which projects keys to relation.keyColumns
(converting null→undefined); make the array variants consistent by projecting
their key arrays using readKeyTuple() (or the same projection logic used by
readKeyTuple) when assigning the key field, or explicitly document that array
variants intentionally keep full tuples; also change any[] to unknown[] for
tuple types to improve type safety and update PgoutputMessageArray accordingly
so MessageUpdateArray/MessageDeleteArray reflect the projected key type.
🧹 Nitpick comments (19)
internal-packages/replication/src/pgoutput.ts (1)
228-255:parseArray()duplication risk: keep handler dispatch in sync withparse().
parseArray()largely duplicatesparse()with a different routing for I/U/D. To reduce drift, consider consolidating the shared switch and only swapping the insert/update/delete handlers (e.g., via a small dispatch table). Not required for correctness, but it will prevent subtle future mismatches.apps/webapp/test/performance/README.md (1)
19-28: Add language specifiers to fenced code blocks.The fenced code blocks at lines 19, 110, and 220 are missing language specifiers, which can affect rendering in various markdown viewers.
As per coding guidelines to format markdown using Prettier, consider adding language specifiers:
📝 Suggested improvements
Line 19 (architecture diagram):
-``` +```text Main Orchestrator ProcessLine 110 (usage text):
-``` +```text Usage: profile-runs-replication [options]Line 220 (file tree):
-``` +```text profiling-results/Also applies to: 110-122, 220-228
internal-packages/clickhouse/src/taskRuns.ts (1)
125-145: Consider adding the same guard/row-builder for payload compact inserts.Same drift risk applies to
PAYLOAD_COLUMNS+insertRawTaskRunPayloadsCompactArrays().apps/webapp/test/performance/consumer-runner.ts (1)
49-116: Graceful shutdown should stop the service and close clients (not justprocess.exit).Both the shutdown-file path and IPC shutdown path exit without attempting to stop
RunsReplicationService/ close ClickHouse / Redis connections. Even for profiling tooling, this can skew profiling output and leave partial writes in “real” mode.apps/webapp/test/performance/producer.ts (1)
46-58: ValidatetargetThroughput,batchSize, andinsertUpdateRatioto avoid invalid pacing math.
targetDuration = (batchSize / targetThroughput) * 1000will explode fortargetThroughput <= 0, and ratios outside[0,1]can yield negative insert/update counts.Also applies to: 79-83
apps/webapp/scripts/profile-runs-replication.ts (1)
247-287:__dirnameusage may break undertsxdepending on ESM/CJS mode.If this script runs as ESM,
__dirnameis undefined; the Clinic binary discovery andcwdcould fail.internal-packages/clickhouse/src/client/noop.ts (1)
163-218: LGTM: noop compact insert surfaces match the new writer API.Optional: consider lightweight validation in
insertCompact()(e.g.,toArray(record).length === columns.length) to catch mapping mistakes in tests when usingNoopClient.apps/webapp/test/performance/data-generator.ts (1)
87-108: Avoid potentialnoUnusedParameters:runIdis unused.If
noUnusedParametersis enabled, this will fail builds. Consider renaming to_runIdor using it (e.g., to bias updates).apps/webapp/test/performance/metrics-collector.ts (2)
5-16: Consider exporting or reusing these interfaces from a shared location.
ConsumerMetricsandBatchFlushedEventare defined locally here, but similar interfaces exist inconsumer.ts. This duplication could lead to drift. Consider exporting from a shared location (e.g.,config.ts) to maintain consistency.
159-160: Consider adding a TODO or tracking issue for the placeholder.
flushDurationP50is hardcoded to 0 with a comment about needing OpenTelemetry metrics. If this is a known limitation, consider adding a tracking mechanism so it doesn't get lost.Would you like me to open an issue to track implementing flush duration metrics extraction from OpenTelemetry?
apps/webapp/test/performance/harness.ts (2)
573-573: Accessing internal client property.
(clickhouse.writer as any).client.commandcasts toanyto access internal properties. This could break if the internal structure changes. Consider exposing a proper method for raw command execution.
519-520: Hardcoded relative path to migrations.The migrations path uses
__dirnamewith a relative path that assumes a specific directory structure. This could break during build/bundle or if the file is moved.Consider using a more robust path resolution or making it configurable.
apps/webapp/app/services/runsReplicationService.server.ts (2)
83-83: Loss of type safety withany[][].The
batchFlushedevent type changed fromTaskRunV2[]andRawTaskRunPayloadV1[]toany[][]. This sacrifices compile-time type checking. Consider defining tuple types or using branded types to maintain some safety.💡 Type-safe alternative
// Define tuple type matching column order type TaskRunInsertRow = [ string, // environment_id string, // organization_id string, // project_id string, // run_id // ... etc ]; type PayloadInsertRow = [ string, // run_id number, // created_at { data: unknown }, // payload ]; batchFlushed: [{ flushId: string; taskRunInserts: TaskRunInsertRow[]; payloadInserts: PayloadInsertRow[] }];
827-829: Silent skip for runs without environmentType or organizationId.When a run lacks
environmentTypeororganizationId, it returns an empty object silently. Consider logging this skip for debugging/monitoring purposes.if (!run.environmentType || !run.organizationId) { + this.logger.debug("Skipping run without required fields", { + runId: run.id, + hasEnvironmentType: !!run.environmentType, + hasOrganizationId: !!run.organizationId, + }); return {}; }internal-packages/clickhouse/src/client/client.ts (1)
703-716: Missing summary attributes in insertCompact error path.Unlike
insertUnsafeandinsertCompactRaw, the success path ofinsertCompactdoesn't set summary attributes from the result. This creates inconsistent observability.♻️ Add summary attributes for consistency
if (clickhouseError) { this.logger.error("Error inserting into clickhouse", { name: req.name, error: clickhouseError, table: req.table, }); recordSpanError(span, clickhouseError); return [new InsertError(clickhouseError.message), null]; } + span.setAttributes({ + "clickhouse.query_id": result.query_id, + "clickhouse.executed": result.executed, + "clickhouse.summary.read_rows": result.summary?.read_rows, + "clickhouse.summary.written_rows": result.summary?.written_rows, + "clickhouse.summary.written_bytes": result.summary?.written_bytes, + }); + return [null, result];apps/webapp/test/performance/config.ts (2)
6-7: Module-level side effect: dotenv loading.Loading
.env.localwithoverride: trueat module import time is a side effect that could unexpectedly affect other code that imports from this module. Consider making this explicit via an initialization function.💡 Alternative approach
let envLoaded = false; export function loadProfilingEnv(): void { if (!envLoaded) { loadEnv({ path: path.join(__dirname, ".env.local"), override: true }); envLoaded = true; } } // Call explicitly in harness setup instead of module load
9-71: Coding guideline: Use types over interfaces.Per the coding guidelines for
**/*.{ts,tsx}, types should be preferred over interfaces. Consider converting these to type aliases.As per coding guidelines, consider:
export type TestPhase = { name: string; durationSec: number; targetThroughput: number; };However, since these are test infrastructure files, this is a minor concern.
apps/webapp/test/performance/consumer.ts (2)
90-92: Exit handler doesn't prevent potential double-resolution.The exit handler in
start()logs but doesn't interact with the readiness promise. If the process exits before becoming ready,waitForReady()will timeout rather than fail immediately.Consider rejecting the readiness promise on unexpected exit:
private readyPromise: Promise<void> | null = null; private rejectReady: ((reason: Error) => void) | null = null; // In start(): this.readyPromise = new Promise((resolve, reject) => { this.rejectReady = reject; // existing logic... }); this.process.on("exit", (code, signal) => { if (!this.ready && this.rejectReady) { this.rejectReady(new Error(`Process exited before ready: code=${code}, signal=${signal}`)); } });
220-220: ProducerProcessManager uses looseanytyping.The constructor accepts
anyfor config and metrics callbacks. Consider using the properProducerConfigtype from./config.+import type { ConsumerConfig, ProducerConfig, ProfilingConfig } from "./config"; export class ProducerProcessManager { private process: ChildProcess | null = null; private ready = false; - private onMetrics?: (metrics: any) => void; + private onMetrics?: (metrics: ProducerMetrics) => void; private onError?: (error: string) => void; - constructor(private readonly config: any) {} + constructor(private readonly config: ProducerConfig) {}
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (21)
.gitignoreapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/package.jsonapps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/README.mdapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/config.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsapps/webapp/test/runsReplicationService.part2.test.tsinternal-packages/clickhouse/src/client/client.tsinternal-packages/clickhouse/src/client/noop.tsinternal-packages/clickhouse/src/client/types.tsinternal-packages/clickhouse/src/index.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/replication/src/pgoutput.ts
🧰 Additional context used
📓 Path-based instructions (11)
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
apps/webapp/test/performance/README.mdapps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsinternal-packages/replication/src/pgoutput.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/index.tsinternal-packages/clickhouse/src/client/noop.tsinternal-packages/clickhouse/src/client/types.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/performance/config.tsapps/webapp/scripts/profile-runs-replication.tsapps/webapp/package.jsoninternal-packages/clickhouse/src/client/client.tsapps/webapp/test/performance/consumer.ts
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
apps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsinternal-packages/replication/src/pgoutput.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/index.tsinternal-packages/clickhouse/src/client/noop.tsinternal-packages/clickhouse/src/client/types.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/performance/config.tsapps/webapp/scripts/profile-runs-replication.tsinternal-packages/clickhouse/src/client/client.tsapps/webapp/test/performance/consumer.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use zod for validation in packages/core and apps/webapp
Files:
apps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/performance/config.tsapps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/consumer.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
apps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsinternal-packages/replication/src/pgoutput.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/index.tsinternal-packages/clickhouse/src/client/noop.tsinternal-packages/clickhouse/src/client/types.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/performance/config.tsapps/webapp/scripts/profile-runs-replication.tsinternal-packages/clickhouse/src/client/client.tsapps/webapp/test/performance/consumer.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
apps/webapp/test/runsReplicationService.part2.test.ts
apps/webapp/**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Test files should only import classes and functions from
app/**/*.tsfiles and should not importenv.server.tsdirectly or indirectly; pass configuration through options instead
Files:
apps/webapp/test/runsReplicationService.part2.test.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}: When importing from@trigger.dev/corein the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp
Files:
apps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/performance/config.tsapps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/consumer.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Avoid mocks or stubs in tests; use helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for unit tests
Files:
apps/webapp/test/runsReplicationService.part2.test.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
apps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsinternal-packages/replication/src/pgoutput.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/index.tsinternal-packages/clickhouse/src/client/noop.tsinternal-packages/clickhouse/src/client/types.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/metrics-collector.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/performance/config.tsapps/webapp/scripts/profile-runs-replication.tsinternal-packages/clickhouse/src/client/client.tsapps/webapp/test/performance/consumer.ts
apps/webapp/app/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access all environment variables through the
envexport ofenv.server.tsinstead of directly accessingprocess.envin the Trigger.dev webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/app/services/**/*.server.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Separate testable services from configuration files; follow the pattern of
realtimeClient.server.ts(testable service) andrealtimeClientGlobal.server.ts(configuration) in the webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
🧠 Learnings (20)
📓 Common learnings
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
apps/webapp/test/performance/README.mdapps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/performance/producer.tsinternal-packages/clickhouse/src/index.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Export tasks with unique IDs within the project to enable proper task discovery and execution
Applied to files:
apps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsapps/webapp/test/performance/data-generator.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Subscribe to run updates using `runs.subscribeToRun()` for realtime monitoring of task execution
Applied to files:
apps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/producer.tsapps/webapp/test/performance/consumer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `.withStreams()` to subscribe to realtime streams from task metadata in addition to run changes
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger.config.ts : Configure OpenTelemetry instrumentations and exporters in trigger.config.ts for enhanced logging
Applied to files:
apps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Generate example payloads for tasks when possible
Applied to files:
apps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/data-generator.ts
📚 Learning: 2025-11-26T14:40:07.146Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2710
File: packages/schema-to-json/package.json:0-0
Timestamp: 2025-11-26T14:40:07.146Z
Learning: Node.js 24+ has native TypeScript support and can execute .ts files directly without tsx or ts-node for scripts that use only erasable TypeScript syntax (type annotations, interfaces, etc.). The trigger.dev repository uses Node.js 24.11.1+ and scripts like updateVersion.ts can be run with `node` instead of `tsx`.
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use the `task()` function from `trigger.dev/sdk/v3` to define tasks with id and run properties
Applied to files:
apps/webapp/test/performance/producer-runner.tsinternal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions
Applied to files:
apps/webapp/test/performance/producer.tsinternal-packages/clickhouse/src/taskRuns.tsapps/webapp/test/performance/data-generator.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `runs.subscribeToBatch()` to subscribe to changes for all runs in a batch
Applied to files:
apps/webapp/test/performance/producer.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `schedules.task()` for scheduled/cron tasks instead of regular `task()`
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/test/performance/data-generator.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `yourTask.batchTrigger()` to trigger multiple runs of a task from inside another task
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-06-14T08:07:46.625Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2175
File: apps/webapp/app/services/environmentMetricsRepository.server.ts:202-207
Timestamp: 2025-06-14T08:07:46.625Z
Learning: In apps/webapp/app/services/environmentMetricsRepository.server.ts, the ClickHouse methods (getTaskActivity, getCurrentRunningStats, getAverageDurations) intentionally do not filter by the `tasks` parameter at the ClickHouse level, even though the tasks parameter is accepted by the public methods. This is done on purpose as there is not much benefit from adding that filtering at the ClickHouse layer.
Applied to files:
internal-packages/clickhouse/src/index.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed
Applied to files:
apps/webapp/test/performance/clickhouse-mock.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/app/services/**/*.server.{ts,tsx} : Separate testable services from configuration files; follow the pattern of `realtimeClient.server.ts` (testable service) and `realtimeClientGlobal.server.ts` (configuration) in the webapp
Applied to files:
apps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/**/*.test.{ts,tsx} : Test files should only import classes and functions from `app/**/*.ts` files and should not import `env.server.ts` directly or indirectly; pass configuration through options instead
Applied to files:
apps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to {packages/core,apps/webapp}/**/*.{ts,tsx} : Use zod for validation in packages/core and apps/webapp
Applied to files:
apps/webapp/package.json
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: pnpm version `10.23.0` and Node.js version `20.11.1` are required for development
Applied to files:
apps/webapp/package.json
🧬 Code graph analysis (10)
apps/webapp/test/runsReplicationService.part2.test.ts (1)
apps/webapp/app/services/runsReplicationService.server.ts (2)
run(851-908)run(910-919)
internal-packages/clickhouse/src/taskRuns.ts (1)
internal-packages/clickhouse/src/client/types.ts (1)
ClickhouseWriter(209-245)
internal-packages/clickhouse/src/index.ts (1)
internal-packages/clickhouse/src/taskRuns.ts (2)
insertTaskRunsCompactArrays(104-115)insertRawTaskRunPayloadsCompactArrays(127-145)
internal-packages/clickhouse/src/client/noop.ts (2)
packages/core/src/v3/tryCatch.ts (1)
Result(5-5)internal-packages/clickhouse/src/client/errors.ts (1)
InsertError(24-32)
internal-packages/clickhouse/src/client/types.ts (2)
packages/core/src/v3/tryCatch.ts (1)
Result(5-5)internal-packages/clickhouse/src/client/errors.ts (1)
InsertError(24-32)
apps/webapp/test/performance/data-generator.ts (1)
apps/webapp/app/db.server.ts (1)
Prisma(99-99)
apps/webapp/test/performance/metrics-collector.ts (1)
apps/webapp/test/performance/config.ts (2)
PhaseMetrics(140-163)ProducerMetrics(131-138)
apps/webapp/test/performance/config.ts (1)
internal-packages/redis/src/index.ts (1)
RedisOptions(4-4)
internal-packages/clickhouse/src/client/client.ts (11)
internal-packages/clickhouse/src/client/types.ts (1)
ClickhouseInsertFunction(201-207)apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts (2)
events(233-236)span(1926-1943)internal-packages/tracing/src/index.ts (3)
startSpan(60-90)recordSpanError(92-98)Span(42-42)packages/core/src/v3/utils/flattenAttributes.ts (1)
flattenAttributes(6-14)apps/webapp/memory-leak-detector.js (1)
i(886-886)packages/core/src/v3/workers/taskExecutor.ts (1)
result(1285-1332)packages/core/src/v3/tryCatch.ts (2)
tryCatch(8-15)Result(5-5)internal-packages/clickhouse/src/client/errors.ts (1)
InsertError(24-32)apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1)
Span(25-25)apps/webapp/app/services/runsReplicationService.server.ts (1)
error(718-745)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)
error(1905-1916)
apps/webapp/test/performance/consumer.ts (1)
apps/webapp/test/performance/config.ts (2)
ConsumerConfig(30-42)ProfilingConfig(44-48)
🪛 Biome (2.1.2)
internal-packages/replication/src/pgoutput.ts
[error] 480-480: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 484-484: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 485-485: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 486-486: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
🪛 markdownlint-cli2 (0.18.1)
apps/webapp/test/performance/README.md
19-19: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
110-110: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
220-220: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: Cursor Bugbot
- GitHub Check: typecheck / typecheck
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: claude-review
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (27)
.gitignore (1)
53-56: ✓ Gitignore entries for profiling artifacts are appropriate.The added ignore rules correctly exclude profiling outputs and tooling artifacts (Clinic.js, V8 logs, and results directories) from source control. The rules follow existing patterns and are well-commented.
apps/webapp/package.json (2)
224-225: Looks good. The addition of Clinic.js profiling tools aligns with the PR's performance optimization objectives and enables CPU profiling and performance analysis of the runs replication service.Both @clinic/doctor@11.0.0 and @clinic/flame@13.0.0 are valid, latest-release versions with MIT licenses and no known security vulnerabilities.
268-269: LGTM! Package versions are valid and secure.Both clinic@13.0.0 and commander@11.0.0 are valid versions with no security vulnerabilities. Note that commander has a newer major version (14.0.2) available; ensure compatibility testing if upgrading is considered. The JSON formatting is correct and properly alphabetized.
apps/webapp/test/runsReplicationService.part2.test.ts (3)
892-897: LGTM! Array indices match the documented structure.The array-based access aligns with the compact insert format from
#prepareTaskRunInsert. The comments correctly document that index 3 isrun_idand index 6 isstatus.
1062-1079: LGTM! Sorting validation correctly uses array indices.The array indices correctly correspond to the compact format, and the sorting order (organization_id, project_id, environment_id, created_at, run_id) aligns with ClickHouse table schema for optimal insert performance.
1109-1111: LGTM! Payload insert validation uses correct array index.The array index 0 correctly corresponds to
run_idfrom the#preparePayloadInsertcompact format.internal-packages/clickhouse/src/index.ts (2)
6-7: LGTM! Import names clearly indicate compact array format.The renamed imports (
insertTaskRunsCompactArrays,insertRawTaskRunPayloadsCompactArrays) make the API intent explicit and align with the performance optimization goals.
171-172: Breaking API change - fully implemented and all call sites updated.The public API methods have been renamed from
insert/insertPayloadstoinsertCompactArrays/insertPayloadsCompactArrays. All usages in the codebase have been updated, including the consumer inrunsReplicationService.server.ts.apps/webapp/test/performance/producer-runner.ts (2)
38-38: LGTM! BigInt serialization limitation is documented.Disabling complex payloads to avoid BigInt serialization issues with IPC is a reasonable workaround for the test harness, and the comment clearly documents the rationale.
61-106: LGTM! Error handling is appropriate for a test harness.The IPC message handling includes proper error handling, and the uncaught exception handlers appropriately notify the parent process while allowing the producer to continue running. This is suitable for a long-running performance test process.
internal-packages/clickhouse/src/client/types.ts (1)
223-242: LGTM! New compact insert methods have good type safety.The new
insertCompactandinsertCompactRawmethods provide both type-safe and high-performance insertion paths. The use ofreadonly string[]for columns prevents accidental modification, and the API design is consistent with existing patterns.apps/webapp/test/performance/metrics-collector.ts (3)
180-184: Percentile calculation may be off-by-one for edge cases.The percentile calculation uses
Math.ceil(sorted.length * p) - 1. Forp = 0.5with a single element array, this returns index 0, which is correct. However, for empty arrays, you return 0 which is appropriate. The logic looks sound for the intended use case.
88-101: LGTM! Proper directory creation before file write.The
exportToJSONmethod correctly ensures the directory exists before writing. The use ofrecursive: truehandles nested paths properly.
103-139: Producer metrics aggregation logic is correct.The approach of keeping the latest metric per worker (using cumulative counters) and summing across workers properly handles multi-worker scenarios where each worker reports independently.
apps/webapp/test/performance/harness.ts (2)
319-340: Retry loop with proper exponential backoff pattern.The retry logic for dropping the replication slot with a fixed 2-second delay between attempts is reasonable for this use case. The bounded retry count prevents infinite loops.
473-486: waitForReplicationLag appears to be a placeholder implementation.The method always returns after 5 seconds regardless of actual lag, and the
maxLagMsparameter is unused. This seems intentional for approximation but may not accurately detect when replication catches up.Is this placeholder behavior intentional? The comment says "approximated" but the actual lag isn't being measured.
apps/webapp/app/services/runsReplicationService.server.ts (3)
759-770: Behavior change: async insert settings now always returned.Previously,
#getClickhouseInsertSettings()returned{}when strategy was "insert". Now it always returns async insert settings for the "insert_async" case. Verify this is intentional - it means the "insert" strategy still gets an empty object (line 761-762).The logic appears correct (returns
{}for "insert" strategy, async settings otherwise), but confirm this matches the intended behavior since the AI summary mentioned "always returns the default async insert configuration."
860-907: Array construction matches documented column order.The array returned by
#prepareTaskRunInsertis well-documented with inline comments matching the column names. This is good practice for maintainability with positional arrays.
910-919: Payload insert array structure is clear and concise.The simplified 3-element array format
[run_id, created_at, payload]is well-documented and matches the PAYLOAD_COLUMNS order.internal-packages/clickhouse/src/client/client.ts (4)
650-718: Well-structured insertCompact implementation.The
insertCompactmethod properly:
- Builds the compact format with column headers as the first row
- Uses
JSONCompactEachRowWithNamesformat- Includes comprehensive span attributes for observability
- Handles both single events and arrays
903-912: Improved error recording flow.The refactored
recordClickhouseErrornow always callsrecordSpanError, even for non-ClickHouseError instances. This ensures all errors are properly recorded in spans.
914-927: Explicit return type added.Adding
ClickHouseLogLevelas the explicit return type improves type safety and documentation.
729-755: LGTM - insertUnsafe unified array handling.The introduction of
eventsArrayto normalize input handling is clean and consistent with the other insert methods.apps/webapp/test/performance/config.ts (2)
73-129: getDefaultConfig returns a well-structured partial configuration.The defaults are sensible for development/testing:
- Reasonable throughput targets (1000-5000 rec/sec)
- Appropriate batch sizes and intervals
- Proper fallbacks for infrastructure URLs
123-127: Environment variable access in default config.The infrastructure section accesses
process.envdirectly. Per coding guidelines forapps/webapp/app/**/*.{ts,tsx}, environment variables should be accessed through theenvexport. However, since this is test infrastructure (apps/webapp/test/), this may be acceptable.Confirm that test infrastructure files are exempt from the
env.server.tsrequirement, as this file is undertest/notapp/.apps/webapp/test/performance/consumer.ts (2)
40-49: Process spawn with proper IPC configuration.The stdio configuration correctly adapts between profiling (needs stdout capture for readiness detection) and non-profiling modes (uses inherit for simplicity). The IPC channel is properly included.
172-201: Clinic.js integration is well-implemented.The
buildClinicArgs()method properly:
- Uses
--collect-onlyfor deferred visualization- Passes
--destfor output location- Uses
node --import tsxloader pattern for TypeScript support- Resolves clinic from node_modules directly for reliability
| import type { RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse"; | ||
|
|
||
| /** | ||
| * Mock ClickHouse client for CPU-only profiling. | ||
| * Implements the minimal interface needed by RunsReplicationService | ||
| * without actually writing to ClickHouse. | ||
| */ | ||
| export class MockClickHouse { | ||
| private insertCount = 0; | ||
| private payloadInsertCount = 0; | ||
|
|
||
| constructor(private readonly insertDelayMs: number = 0) {} | ||
|
|
||
| taskRuns = { | ||
| insert: async ( | ||
| runs: TaskRunV2[], | ||
| options?: any | ||
| ): Promise<[Error | null, { rows: number } | null]> => { | ||
| if (this.insertDelayMs > 0) { | ||
| await new Promise((resolve) => setTimeout(resolve, this.insertDelayMs)); | ||
| } | ||
|
|
||
| this.insertCount += runs.length; | ||
|
|
||
| return [null, { rows: runs.length }]; | ||
| }, | ||
|
|
||
| insertPayloads: async ( | ||
| payloads: RawTaskRunPayloadV1[], | ||
| options?: any | ||
| ): Promise<[Error | null, { rows: number } | null]> => { | ||
| if (this.insertDelayMs > 0) { | ||
| await new Promise((resolve) => setTimeout(resolve, this.insertDelayMs)); | ||
| } | ||
|
|
||
| this.payloadInsertCount += payloads.length; | ||
|
|
||
| return [null, { rows: payloads.length }]; | ||
| }, | ||
| }; | ||
|
|
||
| getStats() { | ||
| return { | ||
| totalInserts: this.insertCount, | ||
| totalPayloadInserts: this.payloadInsertCount, | ||
| }; | ||
| } | ||
|
|
||
| reset() { | ||
| this.insertCount = 0; | ||
| this.payloadInsertCount = 0; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find RunsReplicationService
fd -t f "RunsReplicationService" --extension ts --extension tsxRepository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Search for RunsReplicationService definition
rg "class RunsReplicationService|export.*RunsReplicationService" -t ts -t tsxRepository: triggerdotdev/trigger.dev
Length of output: 96
🏁 Script executed:
# Look for insertCompactRaw method references
rg "insertCompactRaw" -t ts -t tsxRepository: triggerdotdev/trigger.dev
Length of output: 96
🏁 Script executed:
# Check the actual ClickHouse client interface/types
rg "interface.*ClickHouse|class.*ClickHouse" -t ts -t tsx -A 10Repository: triggerdotdev/trigger.dev
Length of output: 96
🏁 Script executed:
# Search for RunsReplicationService more broadly
rg "RunsReplicationService" -t tsRepository: triggerdotdev/trigger.dev
Length of output: 6586
🏁 Script executed:
# Look at imports and actual types in the mock file
cat apps/webapp/test/performance/clickhouse-mock.tsRepository: triggerdotdev/trigger.dev
Length of output: 1421
🏁 Script executed:
# Check for ClickHouse client interfaces and definitions
rg "taskRuns.*insert|insertCompact" -t tsRepository: triggerdotdev/trigger.dev
Length of output: 1344
🏁 Script executed:
# Find files that import from clickhouse
fd -t f -e ts -e tsx | xargs grep -l "from.*clickhouse" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 1103
🏁 Script executed:
# Look at the actual calls in RunsReplicationService
rg "insertCompactArrays|insertPayloadsCompactArrays" -t ts -B 5 -A 5Repository: triggerdotdev/trigger.dev
Length of output: 3194
🏁 Script executed:
# Check the ClickHouse client types for these methods
cat internal-packages/clickhouse/src/client/types.ts | head -100Repository: triggerdotdev/trigger.dev
Length of output: 2993
🏁 Script executed:
# Look at the taskRuns.ts implementation to understand the compact arrays signature
cat internal-packages/clickhouse/src/taskRuns.ts | grep -A 20 "insertCompactArrays\|insertPayloadsCompactArrays"Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# View the taskRuns.ts file
cat internal-packages/clickhouse/src/taskRuns.tsRepository: triggerdotdev/trigger.dev
Length of output: 10202
🏁 Script executed:
# Check what the ClickHouse client noop.ts has for these methods
cat internal-packages/clickhouse/src/client/noop.ts | head -150Repository: triggerdotdev/trigger.dev
Length of output: 4462
🏁 Script executed:
# Find where MockClickHouse is imported/used
rg "MockClickHouse" -t ts -B 2 -A 5Repository: triggerdotdev/trigger.dev
Length of output: 2012
🏁 Script executed:
# Look at the profile-runs-replication script to see if it uses the mock
cat apps/webapp/scripts/profile-runs-replication.ts | head -100Repository: triggerdotdev/trigger.dev
Length of output: 3372
Add missing insertCompactArrays and insertPayloadsCompactArrays methods to MockClickHouse.taskRuns.
RunsReplicationService now calls insertCompactArrays() and insertPayloadsCompactArrays() on the taskRuns object, but MockClickHouse only implements the old insert() and insertPayloads() methods. This causes runtime failures when using the mock with the replication service. The taskRuns property must expose both compact insert methods with signatures accepting readonly any[][] instead of the old object array types.
Required changes
export class MockClickHouse {
private insertCount = 0;
private payloadInsertCount = 0;
constructor(private readonly insertDelayMs: number = 0) {}
taskRuns = {
- insert: async (
- runs: TaskRunV2[],
+ insertCompactArrays: async (
+ runs: readonly any[][],
options?: any
): Promise<[Error | null, { rows: number } | null]> => {
if (this.insertDelayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, this.insertDelayMs));
}
this.insertCount += runs.length;
return [null, { rows: runs.length }];
},
- insertPayloads: async (
- payloads: RawTaskRunPayloadV1[],
+ insertPayloadsCompactArrays: async (
+ payloads: readonly any[][],
options?: any
): Promise<[Error | null, { rows: number } | null]> => {
if (this.insertDelayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, this.insertDelayMs));
}
this.payloadInsertCount += payloads.length;
return [null, { rows: payloads.length }];
},
};🤖 Prompt for AI Agents
In @apps/webapp/test/performance/clickhouse-mock.ts around lines 1 - 53,
MockClickHouse.taskRuns is missing insertCompactArrays and
insertPayloadsCompactArrays methods required by RunsReplicationService; add both
methods to the taskRuns object with signatures (compactRows: readonly any[][],
options?: any) and (compactPayloads: readonly any[][], options?: any)
respectively, implement the same delay handling as insert/insertPayloads,
increment the appropriate counters using compactRows.length /
compactPayloads.length, and return the same Promise<[Error | null, { rows:
number } | null]> shape as the existing insert methods.
| async function main() { | ||
| const hasIPC = !!process.send; | ||
|
|
||
| if (!hasIPC) { | ||
| console.log( | ||
| "Warning: IPC not available (likely running under profiler) - metrics will not be sent to parent" | ||
| ); | ||
| } | ||
|
|
||
| // Parse configuration from environment variable | ||
| const config: ConsumerConfig = JSON.parse(process.env.CONSUMER_CONFIG!); | ||
|
|
||
| // Create shutdown signal file path | ||
| const shutdownFilePath = path.join(config.outputDir || "/tmp", ".shutdown-signal"); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail fast with a clear error if CONSUMER_CONFIG is missing/invalid JSON.
JSON.parse(process.env.CONSUMER_CONFIG!) will throw a cryptic error (or parse undefined) and skip structured reporting. In apps/webapp/**/*.{ts,tsx}, you can lean on zod to validate the env payload (per guidelines).
🤖 Prompt for AI Agents
In @apps/webapp/test/performance/consumer-runner.ts around lines 9 - 23, The
code in main() blindly calls JSON.parse(process.env.CONSUMER_CONFIG!) which can
throw a cryptic error or parse undefined; replace this with a fast-fail
validation: verify process.env.CONSUMER_CONFIG exists, parse it inside a
try/catch, and validate the result with a zod schema (e.g.,
ConsumerConfigSchema.parse) to produce a clear, structured error message before
exiting or rethrowing; update the ConsumerConfig usage (the const config
assignment near shutdownFilePath creation) to use the validated object and
ensure processLogger or console.error logs the validation error and calls
process.exit(1).
| await new Promise<void>((resolve) => { | ||
| // With shutdown signal file, consumer-runner should exit within a few seconds | ||
| // With --collect-only, Clinic.js then quickly packages the data and exits | ||
| const timeoutMs = isProfiling ? 15000 : 30000; | ||
|
|
||
| const timeout = setTimeout(() => { | ||
| console.warn(`Consumer process did not exit after ${timeoutMs}ms, killing`); | ||
| this.process?.kill("SIGKILL"); | ||
| resolve(); | ||
| }, timeoutMs); | ||
|
|
||
| this.process?.on("exit", (code, signal) => { | ||
| clearTimeout(timeout); | ||
| console.log(`Consumer process exited with code ${code}, signal ${signal}`); | ||
| resolve(); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential for orphaned exit listener.
The exit listener added in stop() may accumulate if stop() is called multiple times without the process exiting. Consider using once instead of on.
🐛 Fix
- this.process?.on("exit", (code, signal) => {
+ this.process?.once("exit", (code, signal) => {
clearTimeout(timeout);
console.log(`Consumer process exited with code ${code}, signal ${signal}`);
resolve();
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await new Promise<void>((resolve) => { | |
| // With shutdown signal file, consumer-runner should exit within a few seconds | |
| // With --collect-only, Clinic.js then quickly packages the data and exits | |
| const timeoutMs = isProfiling ? 15000 : 30000; | |
| const timeout = setTimeout(() => { | |
| console.warn(`Consumer process did not exit after ${timeoutMs}ms, killing`); | |
| this.process?.kill("SIGKILL"); | |
| resolve(); | |
| }, timeoutMs); | |
| this.process?.on("exit", (code, signal) => { | |
| clearTimeout(timeout); | |
| console.log(`Consumer process exited with code ${code}, signal ${signal}`); | |
| resolve(); | |
| }); | |
| await new Promise<void>((resolve) => { | |
| // With shutdown signal file, consumer-runner should exit within a few seconds | |
| // With --collect-only, Clinic.js then quickly packages the data and exits | |
| const timeoutMs = isProfiling ? 15000 : 30000; | |
| const timeout = setTimeout(() => { | |
| console.warn(`Consumer process did not exit after ${timeoutMs}ms, killing`); | |
| this.process?.kill("SIGKILL"); | |
| resolve(); | |
| }, timeoutMs); | |
| this.process?.once("exit", (code, signal) => { | |
| clearTimeout(timeout); | |
| console.log(`Consumer process exited with code ${code}, signal ${signal}`); | |
| resolve(); | |
| }); |
🤖 Prompt for AI Agents
In @apps/webapp/test/performance/consumer.ts around lines 129 - 144, In stop(),
the "exit" listener uses this.process?.on which can accumulate if stop() is
called multiple times; change it to this.process?.once("exit", ...) so the
handler is removed after it runs (and keep the existing clearTimeout(timeout)
and resolve() behavior), or alternatively attach a named handler and call
this.process?.removeListener("exit", handler) before resolving to ensure the
listener is not left registered.
| export interface TaskRunProducerOptions { | ||
| prisma: PrismaClient; | ||
| dataGenerator: TaskRunDataGenerator; | ||
| workerId?: string; | ||
| targetThroughput: number; | ||
| insertUpdateRatio: number; | ||
| batchSize: number; | ||
| } | ||
|
|
||
| export class TaskRunProducer { | ||
| private running = false; | ||
| private totalInserts = 0; | ||
| private totalUpdates = 0; | ||
| private errors = 0; | ||
| private latencies: number[] = []; | ||
| private createdRunIds: string[] = []; | ||
| private timer: NodeJS.Timeout | null = null; | ||
| private startTime: number = 0; | ||
|
|
||
| constructor(private readonly options: TaskRunProducerOptions) {} | ||
|
|
||
| async start(): Promise<void> { | ||
| if (this.running) { | ||
| throw new Error("Producer is already running"); | ||
| } | ||
|
|
||
| this.running = true; | ||
| this.startTime = Date.now(); | ||
| this.resetMetrics(); | ||
|
|
||
| await this.runProducerLoop(); | ||
| } | ||
|
|
||
| async stop(): Promise<void> { | ||
| this.running = false; | ||
| if (this.timer) { | ||
| clearTimeout(this.timer); | ||
| this.timer = null; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop() can hang forever: clearing the timeout prevents the awaited sleep from resolving.
runProducerLoop() awaits a Promise that only resolves via setTimeout (Line 85-88 / 95-98). stop() clears that timeout (Line 40-43), so the promise may never resolve → loop never iterates again → start() never returns / process shutdown can stall.
Proposed diff (interruptible sleep)
-export interface TaskRunProducerOptions {
+export type TaskRunProducerOptions = {
prisma: PrismaClient;
dataGenerator: TaskRunDataGenerator;
workerId?: string;
targetThroughput: number;
insertUpdateRatio: number;
batchSize: number;
-}
+};
export class TaskRunProducer {
private running = false;
@@
private timer: NodeJS.Timeout | null = null;
+ private wake: (() => void) | null = null;
@@
async stop(): Promise<void> {
this.running = false;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
+ this.wake?.();
+ this.wake = null;
}
+
+ private sleep(ms: number): Promise<void> {
+ return new Promise((resolve) => {
+ this.wake = resolve;
+ this.timer = setTimeout(() => {
+ this.timer = null;
+ this.wake = null;
+ resolve();
+ }, ms);
+ });
+ }
@@
- if (this.running && delay > 0) {
- await new Promise((resolve) => {
- this.timer = setTimeout(resolve, delay);
- });
- }
+ if (this.running && delay > 0) await this.sleep(delay);
@@
- if (this.running) {
- await new Promise((resolve) => {
- this.timer = setTimeout(resolve, 1000);
- });
- }
+ if (this.running) await this.sleep(1000);Also applies to: 60-101
🤖 Prompt for AI Agents
In @apps/webapp/test/performance/producer.ts around lines 5 - 45, The stop()
method currently clears the timeout but does not resolve the awaited promise in
runProducerLoop(), so awaiting sleep can hang; make the sleep used by
runProducerLoop() interruptible by wiring the existing this.timer and a
resolver: implement a class-bound sleep that sets this.timer = setTimeout(...)
and returns a Promise whose resolve is stored (e.g., this.timerResolver), and
update stop() to clearTimeout(this.timer), null the timer, and call the stored
resolver so the sleep promise always resolves; update runProducerLoop() to use
this new interruptible sleep and ensure timer/timerResolver are cleared after
resolution.
| // Check if events is a single row (array) or multiple rows (array of arrays) | ||
| // If first element is not an array, treat as single row | ||
| const isSingleRow = events.length > 0 && !Array.isArray(events[0]); | ||
| const eventsArray: readonly any[][] = isSingleRow | ||
| ? [events as any[]] | ||
| : (events as readonly any[][]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single row detection heuristic may have edge cases.
The check events.length > 0 && !Array.isArray(events[0]) assumes all rows are arrays. If the first element of a single row happens to be an array (e.g., [["nested"], "value"]), it would be misinterpreted as multiple rows.
Consider using a more explicit API where the caller indicates whether they're passing a single row or multiple rows, or document this limitation clearly.
🤖 Prompt for AI Agents
In @internal-packages/clickhouse/src/client/client.ts around lines 819 - 824,
The heuristic using isSingleRow = events.length > 0 && !Array.isArray(events[0])
can misclassify when the first element itself is an array; update the detection
to explicitly check the shape of the whole input (e.g., treat as multiple rows
only if events is an array and every element is an array via
events.every(Array.isArray)), or add an explicit parameter (e.g., forceSingleRow
or isBatch) so the caller declares single vs multiple rows; then compute
eventsArray from that reliable signal (refer to isSingleRow and eventsArray) and
adjust callers accordingly.
| export type PgoutputMessageArray = | ||
| | MessageBegin | ||
| | MessageCommit | ||
| | MessageDeleteArray | ||
| | MessageInsertArray | ||
| | MessageMessage | ||
| | MessageOrigin | ||
| | MessageRelation | ||
| | MessageTruncate | ||
| | MessageType | ||
| | MessageUpdateArray; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and examine the pgoutput.ts file
find . -name "pgoutput.ts" -type fRepository: triggerdotdev/trigger.dev
Length of output: 116
🏁 Script executed:
# Get the file size to understand how to read it
wc -l internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 118
🏁 Script executed:
# Read the specific lines mentioned in the review (23-33)
sed -n '23,33p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 301
🏁 Script executed:
# Read lines 110-128 as mentioned in "Also applies to"
sed -n '110,128p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 482
🏁 Script executed:
# Look for the readKeyTuple and readTupleAsArray functions
rg -n "readKeyTuple|readTupleAsArray" internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 768
🏁 Script executed:
# Look for switch declarations and const declarations within them (lines around 480-486)
sed -n '475,495p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 780
🏁 Script executed:
# Read the readKeyTuple implementation (around line 425)
sed -n '425,467p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 1498
🏁 Script executed:
# Read the full readTupleAsArray implementation (around line 468)
sed -n '468,530p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 2035
🏁 Script executed:
# Check if there's a lint configuration and what lint errors exist
find . -name "biome.json" -o -name ".biomerc*" | head -5Repository: triggerdotdev/trigger.dev
Length of output: 81
🏁 Script executed:
# Look at the object variant message interfaces to compare with array variants
sed -n '35,110p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 1750
🏁 Script executed:
# Check the biome.json configuration to see lint rules
cat biome.json | head -80Repository: triggerdotdev/trigger.dev
Length of output: 1843
🏁 Script executed:
# Look for where the array message variants are constructed to see how key/old/new are used
rg -n "MessageInsertArray|MessageUpdateArray|MessageDeleteArray" internal-packages/replication/src/pgoutput.ts -B 2 -A 2Repository: triggerdotdev/trigger.dev
Length of output: 1476
🏁 Script executed:
# Check the specific lines where key is assigned in array variants
sed -n '385,425p' internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 1772
🏁 Script executed:
# Check for usage of MessageUpdateArray, MessageDeleteArray to see if downstream code expects key to be projected
rg -n "MessageUpdateArray|MessageDeleteArray|\.key" internal-packages/replication/src/ -A 2 -B 2 | head -100Repository: triggerdotdev/trigger.dev
Length of output: 3651
🏁 Script executed:
# Look for imports and usage of these message types outside pgoutput.ts
rg -l "MessageUpdateArray|MessageDeleteArray|MessageInsertArray" internal-packages/replication/src/ --type tsRepository: triggerdotdev/trigger.dev
Length of output: 114
🏁 Script executed:
# Check if there's parsing logic that depends on key shape
rg -n "relation\.keyColumns" internal-packages/replication/src/pgoutput.ts -B 3 -A 3Repository: triggerdotdev/trigger.dev
Length of output: 402
🏁 Script executed:
# Verify the actual switch case structure to understand the noSwitchDeclarations issue
sed -n '468,507p' internal-packages/replication/src/pgoutput.ts | cat -ARepository: triggerdotdev/trigger.dev
Length of output: 1413
Remove lint error claim and fix semantic divergence between object and array key variants.
The code is linter-compliant (biome has noInnerDeclarations disabled). However, the array variants diverge from the object variants in a meaningful way: readKeyTuple() projects the key to relation.keyColumns only with null→undefined conversion, but readTupleAsArray() returns the full-width tuple without projection. When MessageUpdateArray and MessageDeleteArray assign key, they currently get a full-width array rather than a key-projected one. Either project the key to match the object variant's semantics, or document the full-width behavior explicitly. Additionally, consider changing any[] to unknown[] for better type safety.
🤖 Prompt for AI Agents
In @internal-packages/replication/src/pgoutput.ts around lines 23 - 33, The
array-style message variants (MessageUpdateArray, MessageDeleteArray) currently
receive full-width arrays while object-style variants use readKeyTuple() which
projects keys to relation.keyColumns (converting null→undefined); make the array
variants consistent by projecting their key arrays using readKeyTuple() (or the
same projection logic used by readKeyTuple) when assigning the key field, or
explicitly document that array variants intentionally keep full tuples; also
change any[] to unknown[] for tuple types to improve type safety and update
PgoutputMessageArray accordingly so MessageUpdateArray/MessageDeleteArray
reflect the projected key type.
| // Array variants - skip object creation for performance | ||
| private msgInsertArray(reader: BinaryReader): MessageInsertArray { | ||
| const relation = this._relationCache.get(reader.readInt32()); | ||
| if (!relation) throw Error("missing relation"); | ||
| reader.readUint8(); // consume the 'N' key | ||
| return { | ||
| tag: "insert", | ||
| relation, | ||
| new: this.readTupleAsArray(reader, relation), | ||
| }; | ||
| } | ||
| private msgUpdateArray(reader: BinaryReader): MessageUpdateArray { | ||
| const relation = this._relationCache.get(reader.readInt32()); | ||
| if (!relation) throw Error("missing relation"); | ||
| let key: any[] | null = null; | ||
| let old: any[] | null = null; | ||
| let new_: any[] | null = null; | ||
| const subMsgKey = reader.readUint8(); | ||
| if (subMsgKey === 0x4b) { | ||
| key = this.readTupleAsArray(reader, relation); | ||
| reader.readUint8(); | ||
| new_ = this.readTupleAsArray(reader, relation); | ||
| } else if (subMsgKey === 0x4f) { | ||
| old = this.readTupleAsArray(reader, relation); | ||
| reader.readUint8(); | ||
| new_ = this.readTupleAsArray(reader, relation, old); | ||
| } else if (subMsgKey === 0x4e) { | ||
| new_ = this.readTupleAsArray(reader, relation); | ||
| } else { | ||
| throw Error(`unknown submessage key ${String.fromCharCode(subMsgKey)}`); | ||
| } | ||
| return { tag: "update", relation, key, old, new: new_ }; | ||
| } | ||
| private msgDeleteArray(reader: BinaryReader): MessageDeleteArray { | ||
| const relation = this._relationCache.get(reader.readInt32()); | ||
| if (!relation) throw Error("missing relation"); | ||
| let key: any[] | null = null; | ||
| let old: any[] | null = null; | ||
| const subMsgKey = reader.readUint8(); | ||
| if (subMsgKey === 0x4b) { | ||
| key = this.readTupleAsArray(reader, relation); | ||
| } else if (subMsgKey === 0x4f) { | ||
| old = this.readTupleAsArray(reader, relation); | ||
| } else { | ||
| throw Error(`unknown submessage key ${String.fromCharCode(subMsgKey)}`); | ||
| } | ||
| return { tag: "delete", relation, key, old }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and explore the pgoutput.ts file
find . -name "pgoutput.ts" -type f | head -5Repository: triggerdotdev/trigger.dev
Length of output: 116
🏁 Script executed:
# Get file size to understand how much we need to read
wc -l internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 118
🏁 Script executed:
# Read the specific lines mentioned (377-424) plus surrounding context
sed -n '350,450p' internal-packages/replication/src/pgoutput.ts | cat -nRepository: triggerdotdev/trigger.dev
Length of output: 4706
🏁 Script executed:
# Find readTupleAsArray implementation
sed -n '100,250p' internal-packages/replication/src/pgoutput.ts | cat -nRepository: triggerdotdev/trigger.dev
Length of output: 5123
🏁 Script executed:
# Continue looking for readTupleAsArray
grep -n "readTupleAsArray" internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 563
🏁 Script executed:
# Read the readTupleAsArray implementation
sed -n '468,520p' internal-packages/replication/src/pgoutput.ts | cat -nRepository: triggerdotdev/trigger.dev
Length of output: 2113
🏁 Script executed:
# Also read the full context around readKeyTuple and readTuple to compare
sed -n '320,480p' internal-packages/replication/src/pgoutput.ts | cat -nRepository: triggerdotdev/trigger.dev
Length of output: 7079
🏁 Script executed:
# Let me check if there are any tests or documentation about tuple shapes
grep -r "keyColumns\|readKeyTuple" internal-packages/replication/src --include="*.ts" -B 2 -A 2 | head -50Repository: triggerdotdev/trigger.dev
Length of output: 2908
🏁 Script executed:
# Check MessageRelation interface to understand keyColumns
sed -n '1,100p' internal-packages/replication/src/pgoutput.ts | grep -A 10 "interface MessageRelation"Repository: triggerdotdev/trigger.dev
Length of output: 345
🏁 Script executed:
# Let's look for any tests or consumers of these array variants
find . -name "*.test.ts" -o -name "*.spec.ts" | xargs grep -l "msgDeleteArray\|MessageDeleteArray\|readTupleAsArray" 2>/dev/null | head -3Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Check if there are any usages or consumers of the array variants
grep -r "parseArray\|MessageInsertArray\|MessageUpdateArray\|MessageDeleteArray" internal-packages --include="*.ts" | grep -v pgoutput.ts | head -20Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Look at the PostgreSQL WAL protocol comments or any documentation about K tuples
sed -n '1,50p' internal-packages/replication/src/pgoutput.ts | cat -nRepository: triggerdotdev/trigger.dev
Length of output: 1478
🏁 Script executed:
# Search for replica_identity or replica identity related code
grep -n "replica" internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 316
🏁 Script executed:
# Let me check the PostgreSQL pgoutput protocol documentation in comments or understand if K tuples are full-width
grep -n "0x4b\|0x4f\|0x4e\|KEY\|OLD\|NEW" internal-packages/replication/src/pgoutput.ts | head -30Repository: triggerdotdev/trigger.dev
Length of output: 492
🏁 Script executed:
# Check if there's any documentation or README about how the array tuples should be used
find internal-packages/replication -name "*.md" -o -name "README*" | xargs cat 2>/dev/nullRepository: triggerdotdev/trigger.dev
Length of output: 82
🏁 Script executed:
# Let me search for usage of the parseArray function to understand consumer expectations
grep -r "parseArray" . --include="*.ts" -B 3 -A 3 | head -50Repository: triggerdotdev/trigger.dev
Length of output: 569
🏁 Script executed:
# Let me check if there are any PostgreSQL protocol references or inline documentation
grep -n "pgoutput\|WAL\|replication\|protocol" internal-packages/replication/src/pgoutput.ts | head -20Repository: triggerdotdev/trigger.dev
Length of output: 174
🏁 Script executed:
# Check the git history or any commit messages about these methods
git log --oneline -20 -- internal-packages/replication/src/pgoutput.ts 2>/dev/null | head -10Repository: triggerdotdev/trigger.dev
Length of output: 86
🏁 Script executed:
# Let's look more carefully at how K tuples are actually handled in the wire protocol
# by checking if there are similar implementations in other files
find . -name "*.ts" -type f | xargs grep -l "readInt16\|nfields" | head -5Repository: triggerdotdev/trigger.dev
Length of output: 116
🏁 Script executed:
# Let me check the PostgreSQL pgoutput documentation by looking at what nfields would be for K tuples
# Let's trace through: when K is sent, does it include all columns or only replica identity columns?
# Look for any comments or inline documentation about tuple sizes
sed -n '1,547p' internal-packages/replication/src/pgoutput.ts | grep -i "tuple\|field\|column\|key" | head -30Repository: triggerdotdev/trigger.dev
Length of output: 1382
🏁 Script executed:
# Check if there's documentation about REPLICA IDENTITY and how K tuples work
# Let me search for any external documentation links or references
grep -i "replica.identity\|pgoutput\|wal\|documentation" internal-packages/replication/src/pgoutput.tsRepository: triggerdotdev/trigger.dev
Length of output: 554
🏁 Script executed:
# Let me look at what happens with the key tuple - check if the array variant should match the object variant
# by comparing the null handling between readKeyTuple and readTupleAsArray more carefully
cat > /tmp/analyze.py << 'EOF'
# Analyzing the difference between object and array key handling
object_path = """
// msgUpdate/msgDelete object path when K (key) is present:
key = this.readKeyTuple(reader, relation);
// readKeyTuple does:
1. const tuple = this.readTuple(reader, relation); // reads ALL columns
2. for (const k of relation.keyColumns) {
key[k] = tuple[k] === null ? undefined : tuple[k]; // converts null->undefined
}
// Result: key is an object with only keyColumns set, nulls are undefined
"""
array_path = """
// msgUpdateArray/msgDeleteArray array path when K (key) is present:
key = this.readTupleAsArray(reader, relation);
// readTupleAsArray does:
1. const nfields = reader.readInt16(); // number of fields in the tuple
2. const tuple = new Array(nfields); // array with length = nfields
3. for (let i = 0; i < nfields; i++) {
// ... reads values, nulls stay as null
}
// Result: key is an array with length nfields, no projection, nulls are null
"""
print("Object path (uses readKeyTuple for K):")
print(object_path)
print("\nArray path (uses readTupleAsArray for K):")
print(array_path)
print("\nDifferences:")
print("1. Projection: Object projects to keyColumns; Array includes all nfields as-is")
print("2. Null handling: Object converts null→undefined; Array keeps null")
print("3. Index alignment: Object uses column names; Array uses indices but misaligned with relation.columns")
EOF
python /tmp/analyze.pyRepository: triggerdotdev/trigger.dev
Length of output: 155
Array handlers: K (key) tuple handling differs from object variant and may cause index misalignment.
The object variant correctly handles K tuples via readKeyTuple(), which:
- Reads the full tuple (all columns)
- Projects to only
relation.keyColumnsby name - Converts null to undefined
The array variant uses readTupleAsArray() directly, which has two problems:
-
Index misalignment: Per PostgreSQL's pgoutput protocol, K tuples contain only replica identity columns (not full-width). When
nfields < relation.columns.length, array indices won't align withrelation.columnsindices, making the tuple meaningless without a mapping. -
Null semantics: K tuples keep nulls as-is, while the object variant converts null→undefined for keys. This parity gap could confuse consumers.
To keep behavior consistent between paths, implement a dedicated readKeyTupleAsArray() helper that projects to only the key column indices (analogous to readKeyTuple()).
| private readTupleAsArray( | ||
| reader: BinaryReader, | ||
| { columns }: MessageRelation, | ||
| unchangedToastFallback?: any[] | null | ||
| ): any[] { | ||
| const nfields = reader.readInt16(); | ||
| const tuple = new Array(nfields); | ||
| for (let i = 0; i < nfields; i++) { | ||
| const { parser } = columns[i]; | ||
| const kind = reader.readUint8(); | ||
| switch (kind) { | ||
| case 0x62: // 'b' binary | ||
| const bsize = reader.readInt32(); | ||
| tuple[i] = reader.read(bsize); | ||
| break; | ||
| case 0x74: // 't' text | ||
| const valsize = reader.readInt32(); | ||
| const valbuf = reader.read(valsize); | ||
| const valtext = reader.decodeText(valbuf); | ||
| tuple[i] = parser(valtext); | ||
| break; | ||
| case 0x6e: // 'n' null | ||
| tuple[i] = null; | ||
| break; | ||
| case 0x75: // 'u' unchanged toast datum | ||
| tuple[i] = unchangedToastFallback?.[i]; | ||
| break; | ||
| default: | ||
| throw Error(`unknown attribute kind ${String.fromCharCode(kind)}`); | ||
| } | ||
| } | ||
| return tuple; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Biome noSwitchDeclarations (likely CI blocker).
Biome is correctly flagging const declarations directly under case clauses in readTupleAsArray() (shared switch scope). Wrap each case body in a block.
Proposed fix
private readTupleAsArray(
reader: BinaryReader,
{ columns }: MessageRelation,
unchangedToastFallback?: any[] | null
): any[] {
const nfields = reader.readInt16();
const tuple = new Array(nfields);
for (let i = 0; i < nfields; i++) {
const { parser } = columns[i];
const kind = reader.readUint8();
switch (kind) {
- case 0x62: // 'b' binary
- const bsize = reader.readInt32();
- tuple[i] = reader.read(bsize);
- break;
- case 0x74: // 't' text
- const valsize = reader.readInt32();
- const valbuf = reader.read(valsize);
- const valtext = reader.decodeText(valbuf);
- tuple[i] = parser(valtext);
- break;
+ case 0x62: { // 'b' binary
+ const bsize = reader.readInt32();
+ tuple[i] = reader.read(bsize);
+ break;
+ }
+ case 0x74: { // 't' text
+ const valsize = reader.readInt32();
+ const valbuf = reader.read(valsize);
+ const valtext = reader.decodeText(valbuf);
+ tuple[i] = parser(valtext);
+ break;
+ }
case 0x6e: // 'n' null
tuple[i] = null;
break;
case 0x75: // 'u' unchanged toast datum
tuple[i] = unchangedToastFallback?.[i];
break;
default:
throw Error(`unknown attribute kind ${String.fromCharCode(kind)}`);
}
}
return tuple;
}🧰 Tools
🪛 Biome (2.1.2)
[error] 480-480: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 484-484: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 485-485: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 486-486: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Safe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
🤖 Prompt for AI Agents
In @internal-packages/replication/src/pgoutput.ts around lines 468 - 500, The
switch in readTupleAsArray declares consts (e.g., bsize, valsize, valbuf,
valtext) directly in case clauses which violates Biome's noSwitchDeclarations;
fix by wrapping each case body in its own block { ... } so the const
declarations are scoped correctly (apply to cases for 0x62, 0x74, 0x6e, 0x75 and
default), keeping existing logic for reading via reader, calling parser from
columns[i], and using unchangedToastFallback?.[i].
PR Review: perf(runs-replication): Improve the CPU efficiency and throughput of the runs replication to clickhouseOverviewThis PR introduces a significant performance optimization to the Code Quality & Best PracticesStrengths
Suggestions for Improvement
Potential Bugs or IssuesMedium Priority
Low Priority
Performance ConsiderationsPositive Impact
Questions
Security ConcernsNo security concerns identified. The changes are internal data format optimizations that don't affect input validation, authentication, or data exposure. Test CoverageStrengths
Suggestions
SummaryThis is a well-executed performance optimization with strong type safety guarantees. The code is clean and the test harness is comprehensive. The main areas for improvement are:
Recommendation: Approve with minor suggestions above. |
Code Review for PR #2866: Performance Improvements for Runs ReplicationSummaryThis PR improves the CPU efficiency and throughput of the runs replication service to ClickHouse by switching from object-based data representation to array-based (compact) format. This is a well-structured performance optimization with comprehensive tooling for profiling. 👍 Strengths1. Performance Optimization Approach
2. Type Safety
3. Comprehensive Testing Infrastructure
4. Tests Updated
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @apps/webapp/test/performance/config.ts:
- Around line 9-71: Replace each exported interface declaration with an exported
type alias preserving the exact shape and property names/optionality for
TestPhase, ProducerConfig, ConsumerConfig, ProfilingConfig, OutputConfig,
InfrastructureConfig, and HarnessConfig; e.g., convert "export interface X { ...
}" to "export type X = { ... }", keep union literal types (like tool) and
imported types (RuntimeEnvironmentType, RedisOptions) unchanged, and run a quick
project-wide type search to update any ambient interface-specific usages (none
should be required since names remain identical).
In @apps/webapp/test/performance/consumer-runner.ts:
- Around line 34-47: When config.useMockClickhouse is false, validate that
config.clickhouseUrl is present before using it in the new ClickHouse call;
update the block that imports ClickHouse and constructs clickhouse (the dynamic
import + new ClickHouse({... url: config.clickhouseUrl!, ... })) to check for a
non-empty config.clickhouseUrl, and if missing throw or log a clear error (or
exit) rather than using the non-null assertion so you avoid a runtime crash.
In @apps/webapp/test/performance/data-generator.ts:
- Around line 5-12: DataGeneratorOptions is declared as an interface but your
coding guidelines require types; change the declaration of DataGeneratorOptions
from an interface to a type alias (export type DataGeneratorOptions = { ... })
preserving all property names, types (including RuntimeEnvironmentType), and
optional modifiers (includeComplexPayloads?), and keep the export name unchanged
so any usages continue to work.
🧹 Nitpick comments (8)
apps/webapp/test/performance/producer-runner.ts (1)
14-14: Add error handling for config parsing.The JSON.parse with non-null assertion assumes PRODUCER_CONFIG is always valid JSON. Consider adding validation or error handling.
🛡️ Proposed validation for config parsing
- const config = JSON.parse(process.env.PRODUCER_CONFIG!) as ProducerConfig; + if (!process.env.PRODUCER_CONFIG) { + throw new Error("PRODUCER_CONFIG environment variable is required"); + } + + let config: ProducerConfig; + try { + config = JSON.parse(process.env.PRODUCER_CONFIG) as ProducerConfig; + } catch (error) { + throw new Error(`Failed to parse PRODUCER_CONFIG: ${error instanceof Error ? error.message : String(error)}`); + }apps/webapp/test/performance/consumer-runner.ts (1)
20-20: Add error handling for config parsing.The JSON.parse with non-null assertion assumes CONSUMER_CONFIG is always valid JSON. This is the same issue as in producer-runner.ts. Consider adding validation or error handling.
🛡️ Proposed validation for config parsing
- const config = JSON.parse(process.env.CONSUMER_CONFIG!) as ConsumerConfig; + if (!process.env.CONSUMER_CONFIG) { + throw new Error("CONSUMER_CONFIG environment variable is required"); + } + + let config: ConsumerConfig; + try { + config = JSON.parse(process.env.CONSUMER_CONFIG) as ConsumerConfig; + } catch (error) { + throw new Error(`Failed to parse CONSUMER_CONFIG: ${error instanceof Error ? error.message : String(error)}`); + }apps/webapp/scripts/profile-runs-replication.ts (2)
95-99: Remove unusedtimestampvariable.The
timestampvariable on line 96 is declared but never used; onlytimeWithSecondsis used for therunFolderconstruction.🧹 Proposed fix
// Organize output directory: profiling-results/[runName]-[timestamp]/ - const timestamp = new Date().toISOString().replace(/[:.]/g, "-").replace("T", "_").split("_")[0]; const timeWithSeconds = new Date().toISOString().replace(/[:.]/g, "-").replace("T", "_").substring(0, 19); const runFolder = `${config.runName}-${timeWithSeconds}`;
141-163: UsePhaseMetrics[]instead ofany[]for phases parameter.
PhaseMetricsis already imported from./configbut not used for thephasesparameter inprintSummaryandcreateSummaryReport. Using proper types provides compile-time validation for property access.♻️ Proposed fix
-function printSummary(phases: any[]): void { +function printSummary(phases: PhaseMetrics[]): void {async function createSummaryReport( config: HarnessConfig, - phases: any[], + phases: PhaseMetrics[], outputPath: string ): Promise<void> {As per coding guidelines, prefer proper types over
anyfor type safety.apps/webapp/test/performance/harness.ts (1)
458-472: Define a return type forparseRedisUrl.The method returns
any, losing type safety. Consider defining an explicit return type matching theRedisOptionsinterface from the config.♻️ Proposed fix
- private parseRedisUrl(url: string): any { + private parseRedisUrl(url: string): { host: string; port: number; password?: string } { const match = url.match(/^redis:\/\/(?::([^@]+)@)?([^:]+):(\d+)/); if (!match) { return { host: "localhost", port: 6379, }; } return { host: match[2], port: parseInt(match[3], 10), password: match[1] || undefined, }; }apps/webapp/test/performance/consumer.ts (3)
6-17: Usetypeinstead ofinterfaceper coding guidelines.The coding guidelines specify using types over interfaces for TypeScript definitions.
♻️ Proposed fix
-interface ConsumerMetrics { +type ConsumerMetrics = { heapUsed: number; heapTotal: number; rss: number; eventLoopUtilization: number; -} +}; -interface BatchFlushedEvent { +type BatchFlushedEvent = { flushId: string; - taskRunInserts: any[]; - payloadInserts: any[]; -} + taskRunInserts: unknown[]; + payloadInserts: unknown[]; +};As per coding guidelines, prefer types over interfaces.
214-220: Add proper types forconfigandonMetricscallback.Using
anyloses type safety. Consider importingProducerConfigfrom./configand defining a metrics type.♻️ Proposed fix
+import type { ConsumerConfig, ProfilingConfig, ProducerConfig } from "./config"; + +type ProducerMetrics = { + recordsProduced: number; + throughput: number; +}; export class ProducerProcessManager { private process: ChildProcess | null = null; private ready = false; - private onMetrics?: (metrics: any) => void; + private onMetrics?: (metrics: ProducerMetrics) => void; private onError?: (error: string) => void; - constructor(private readonly config: any) {} + constructor(private readonly config: ProducerConfig) {}
265-295: Add error handling forsend()calls to match ConsumerProcessManager pattern.
process.send()can throw if the IPC channel is closed. The ConsumerProcessManager wraps this in try-catch (line 122), but ProducerProcessManager does not.♻️ Proposed fix for stop()
async stop(): Promise<void> { if (!this.process) { return; } console.log("Stopping producer process"); - this.process.send({ type: "shutdown" }); + try { + this.process.send({ type: "shutdown" }); + } catch (error) { + console.warn("Could not send shutdown message, process may have already exited"); + }♻️ Proposed fix for send()
send(message: any): void { if (!this.process) { throw new Error("Producer process not started"); } - this.process.send(message); + try { + this.process.send(message); + } catch (error) { + console.warn("Could not send message to producer, IPC channel may be closed"); + } }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/config.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/producer-runner.ts
🧰 Additional context used
📓 Path-based instructions (6)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/config.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use zod for validation in packages/core and apps/webapp
Files:
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/config.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/config.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}: When importing from@trigger.dev/corein the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp
Files:
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/config.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/config.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
apps/webapp/scripts/profile-runs-replication.tsapps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.tsapps/webapp/test/performance/consumer.tsapps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.tsapps/webapp/test/performance/config.ts
🧠 Learnings (23)
📓 Common learnings
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Export tasks with unique IDs within the project to enable proper task discovery and execution
Applied to files:
apps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Generate example payloads for tasks when possible
Applied to files:
apps/webapp/test/performance/data-generator.tsapps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions
Applied to files:
apps/webapp/test/performance/data-generator.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Applied to files:
apps/webapp/test/performance/data-generator.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
apps/webapp/test/performance/harness.tsapps/webapp/test/performance/clickhouse-mock.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to internal-packages/database/**/*.{ts,tsx} : Use Prisma for database interactions in internal-packages/database with PostgreSQL
Applied to files:
apps/webapp/test/performance/harness.tsapps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-07-21T12:52:44.342Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2284
File: apps/webapp/app/services/realtimeClient.server.ts:111-127
Timestamp: 2025-07-21T12:52:44.342Z
Learning: Electric (the database service used in the realtimeClient) has built-in SQL injection protection and safely handles whereClause parameters passed via URL parameters, so direct string interpolation of runId values into SQL where clauses is safe when using Electric.
Applied to files:
apps/webapp/test/performance/harness.ts
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed
Applied to files:
apps/webapp/test/performance/clickhouse-mock.ts
📚 Learning: 2024-10-16T01:08:01.788Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 1389
File: apps/coordinator/src/index.ts:403-0
Timestamp: 2024-10-16T01:08:01.788Z
Learning: In `apps/coordinator/src/index.ts`, moving the `exitRun` and `crashRun` functions outside the `onConnection` method is not feasible because of TypeScript type issues with `socket`. Keeping these functions inside `onConnection` is preferred.
Applied to files:
apps/webapp/test/performance/consumer.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Subscribe to run updates using `runs.subscribeToRun()` for realtime monitoring of task execution
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `.withStreams()` to subscribe to realtime streams from task metadata in addition to run changes
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to {packages/core,apps/webapp}/**/*.{ts,tsx} : Use zod for validation in packages/core and apps/webapp
Applied to files:
apps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.ts
📚 Learning: 2025-08-14T18:35:44.370Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: apps/webapp/app/env.server.ts:764-765
Timestamp: 2025-08-14T18:35:44.370Z
Learning: The BoolEnv helper in apps/webapp/app/utils/boolEnv.ts uses z.preprocess with inconsistent default value types across the codebase - some usages pass boolean defaults (correct) while others pass string defaults (incorrect), leading to type confusion. The helper should enforce boolean-only defaults or have clearer documentation.
Applied to files:
apps/webapp/test/performance/producer-runner.tsapps/webapp/test/performance/consumer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `schemaTask()` from `trigger.dev/sdk/v3` with Zod schema for payload validation
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-09-03T14:34:41.781Z
Learnt from: myftija
Repo: triggerdotdev/trigger.dev PR: 2464
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.settings/route.tsx:357-371
Timestamp: 2025-09-03T14:34:41.781Z
Learning: When using Zod's safeParse, the .data property is undefined when parsing fails, but TypeScript may still complain about accessing .data without checking .success first. The suggested approach of checking .success before accessing .data improves type safety and code clarity.
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-26T14:40:07.146Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2710
File: packages/schema-to-json/package.json:0-0
Timestamp: 2025-11-26T14:40:07.146Z
Learning: Node.js 24+ has native TypeScript support and can execute .ts files directly without tsx or ts-node for scripts that use only erasable TypeScript syntax (type annotations, interfaces, etc.). The trigger.dev repository uses Node.js 24.11.1+ and scripts like updateVersion.ts can be run with `node` instead of `tsx`.
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use the `task()` function from `trigger.dev/sdk/v3` to define tasks with id and run properties
Applied to files:
apps/webapp/test/performance/producer-runner.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger.config.ts : Configure OpenTelemetry instrumentations and exporters in trigger.config.ts for enhanced logging
Applied to files:
apps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/app/services/**/*.server.{ts,tsx} : Separate testable services from configuration files; follow the pattern of `realtimeClient.server.ts` (testable service) and `realtimeClientGlobal.server.ts` (configuration) in the webapp
Applied to files:
apps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/**/*.test.{ts,tsx} : Test files should only import classes and functions from `app/**/*.ts` files and should not import `env.server.ts` directly or indirectly; pass configuration through options instead
Applied to files:
apps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger.config.ts : Define global lifecycle functions (onStart, onSuccess, onFailure) in trigger.config.ts to apply to all tasks
Applied to files:
apps/webapp/test/performance/config.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger.config.ts : Use build extensions in trigger.config.ts (additionalFiles, additionalPackages, aptGet, prismaExtension, etc.) to customize the build
Applied to files:
apps/webapp/test/performance/config.ts
🧬 Code graph analysis (5)
apps/webapp/test/performance/clickhouse-mock.ts (2)
internal-packages/clickhouse/src/taskRuns.ts (2)
TaskRunInsertArray(206-252)PayloadInsertArray(258-262)internal-packages/clickhouse/src/index.ts (1)
ClickHouse(87-208)
apps/webapp/test/performance/consumer.ts (1)
apps/webapp/test/performance/config.ts (2)
ConsumerConfig(30-42)ProfilingConfig(44-48)
apps/webapp/test/performance/producer-runner.ts (3)
apps/webapp/test/performance/config.ts (1)
ProducerConfig(15-28)apps/webapp/test/performance/data-generator.ts (1)
TaskRunDataGenerator(14-143)apps/webapp/test/performance/producer.ts (1)
TaskRunProducer(14-191)
apps/webapp/test/performance/consumer-runner.ts (3)
apps/webapp/test/performance/config.ts (1)
ConsumerConfig(30-42)apps/webapp/test/performance/clickhouse-mock.ts (2)
asMockClickHouse(107-109)MockClickHouse(25-104)apps/webapp/app/services/runsReplicationService.server.ts (2)
data(918-955)error(715-742)
apps/webapp/test/performance/config.ts (1)
internal-packages/redis/src/index.ts (1)
RedisOptions(4-4)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: Cursor Bugbot
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: claude-review
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (15)
apps/webapp/test/performance/clickhouse-mock.ts (1)
1-109: LGTM - Mock implementation is appropriate for performance profiling.The MockClickHouse implementation correctly simulates the ClickHouse compact array insert APIs with optional delays and metrics tracking. The approach of avoiding external dependencies (like @clickhouse/client) and using type casting in the helper function is acceptable for performance profiling scenarios.
apps/webapp/test/performance/producer-runner.ts (1)
89-106: Good error handling approach for producer resilience.The error handlers correctly avoid exiting on uncaught exceptions, allowing the producer loop to continue and maintain throughput during profiling. This is appropriate for performance testing scenarios.
apps/webapp/test/performance/data-generator.ts (1)
111-138: Well-designed payload generation with size control.The payload generation correctly handles sizing by calculating padding before adding complex types. This ensures accurate payload size targets for performance testing.
apps/webapp/scripts/profile-runs-replication.ts (4)
10-27: LGTM!CLI option parsing with Commander is well-structured with sensible defaults and clear option descriptions.
107-139: LGTM!Configuration display is well-organized with clear formatting and informative output.
247-287: LGTM!Graceful error handling ensures visualization failures don't abort the entire profiling run.
289-329: LGTM!Proper orchestration with try/catch ensuring teardown runs even on failure.
apps/webapp/test/performance/harness.ts (5)
1-8: LGTM!Imports are well-organized and appropriate for the harness functionality.
25-149: LGTM!Comprehensive setup flow with clear logging and proper sequencing of database creation, migrations, replication configuration, and process management.
151-191: LGTM!Phase orchestration with per-worker throughput distribution and consumer catch-up wait is well-implemented.
193-218: LGTM!Clean teardown with helpful message about preserving the profiling database for inspection.
508-598: LGTM!Migration parsing handles goose format correctly with appropriate error handling for idempotent operations.
apps/webapp/test/performance/consumer.ts (3)
31-96: LGTM!Process spawning with proper IPC setup, profiling mode detection, and readiness handling is well-implemented.
98-149: LGTM!Shutdown logic correctly handles both profiling (file-based signal) and non-profiling (IPC) modes with appropriate timeouts and fallback to SIGKILL.
203-211: LGTM!Polling-based ready wait with configurable timeout is appropriate for IPC coordination.
Also applies to: 309-317
| export interface TestPhase { | ||
| name: string; | ||
| durationSec: number; | ||
| targetThroughput: number; // records/sec | ||
| } | ||
|
|
||
| export interface ProducerConfig { | ||
| enabled: boolean; | ||
| workerCount: number; // Number of parallel producer processes | ||
| workerId?: string; // Unique identifier for this specific worker | ||
| targetThroughput: number; | ||
| insertUpdateRatio: number; // 0.0-1.0, e.g. 0.8 = 80% inserts, 20% updates | ||
| batchSize: number; | ||
| payloadSizeKB: number; | ||
| databaseUrl: string; | ||
| organizationId: string; | ||
| projectId: string; | ||
| runtimeEnvironmentId: string; | ||
| environmentType: RuntimeEnvironmentType; | ||
| } | ||
|
|
||
| export interface ConsumerConfig { | ||
| flushBatchSize: number; | ||
| flushIntervalMs: number; | ||
| maxFlushConcurrency: number; | ||
| useMockClickhouse: boolean; | ||
| mockClickhouseDelay: number; // milliseconds | ||
| pgConnectionUrl: string; | ||
| clickhouseUrl?: string; | ||
| redisOptions: RedisOptions; | ||
| slotName: string; | ||
| publicationName: string; | ||
| outputDir?: string; // For shutdown signal file | ||
| } | ||
|
|
||
| export interface ProfilingConfig { | ||
| enabled: boolean; | ||
| tool: "doctor" | "flame" | "both" | "none"; | ||
| outputDir: string; | ||
| } | ||
|
|
||
| export interface OutputConfig { | ||
| metricsFile: string; | ||
| verbose: boolean; | ||
| } | ||
|
|
||
| export interface InfrastructureConfig { | ||
| databaseUrl: string; | ||
| profilingDatabaseName?: string; // Defaults to "trigger_profiling" | ||
| redisUrl?: string; | ||
| clickhouseUrl?: string; | ||
| } | ||
|
|
||
| export interface HarnessConfig { | ||
| runName: string; // Short identifier for this run (e.g. "baseline", "optimized-v1") | ||
| runDescription?: string; // Optional longer description of what this run is testing | ||
| phases: TestPhase[]; | ||
| producer: ProducerConfig; | ||
| consumer: ConsumerConfig; | ||
| profiling: ProfilingConfig; | ||
| output: OutputConfig; | ||
| infrastructure: InfrastructureConfig; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Convert interfaces to types per coding guidelines.
The codebase coding guidelines specify: "Use types over interfaces for TypeScript". All interface declarations in this file should be converted to type declarations.
♻️ Proposed refactor to use types instead of interfaces
-export interface TestPhase {
+export type TestPhase = {
name: string;
durationSec: number;
targetThroughput: number; // records/sec
-}
+};
-export interface ProducerConfig {
+export type ProducerConfig = {
enabled: boolean;
workerCount: number; // Number of parallel producer processes
workerId?: string; // Unique identifier for this specific worker
targetThroughput: number;
insertUpdateRatio: number; // 0.0-1.0, e.g. 0.8 = 80% inserts, 20% updates
batchSize: number;
payloadSizeKB: number;
databaseUrl: string;
organizationId: string;
projectId: string;
runtimeEnvironmentId: string;
environmentType: RuntimeEnvironmentType;
-}
+};
-export interface ConsumerConfig {
+export type ConsumerConfig = {
flushBatchSize: number;
flushIntervalMs: number;
maxFlushConcurrency: number;
useMockClickhouse: boolean;
mockClickhouseDelay: number; // milliseconds
pgConnectionUrl: string;
clickhouseUrl?: string;
redisOptions: RedisOptions;
slotName: string;
publicationName: string;
outputDir?: string; // For shutdown signal file
-}
+};
-export interface ProfilingConfig {
+export type ProfilingConfig = {
enabled: boolean;
tool: "doctor" | "flame" | "both" | "none";
outputDir: string;
-}
+};
-export interface OutputConfig {
+export type OutputConfig = {
metricsFile: string;
verbose: boolean;
-}
+};
-export interface InfrastructureConfig {
+export type InfrastructureConfig = {
databaseUrl: string;
profilingDatabaseName?: string; // Defaults to "trigger_profiling"
redisUrl?: string;
clickhouseUrl?: string;
-}
+};
-export interface HarnessConfig {
+export type HarnessConfig = {
runName: string; // Short identifier for this run (e.g. "baseline", "optimized-v1")
runDescription?: string; // Optional longer description of what this run is testing
phases: TestPhase[];
producer: ProducerConfig;
consumer: ConsumerConfig;
profiling: ProfilingConfig;
output: OutputConfig;
infrastructure: InfrastructureConfig;
-}
+};
-export interface ProducerMetrics {
+export type ProducerMetrics = {
workerId?: string; // Unique identifier for this producer worker
totalInserts: number;
totalUpdates: number;
actualThroughput: number;
errors: number;
latencies: number[]; // for calculating percentiles
-}
+};
-export interface PhaseMetrics {
+export type PhaseMetrics = {
phase: string;
durationMs: number;
// Producer
recordsProduced: number;
producerThroughput: number;
// Consumer
batchesFlushed: number;
recordsConsumed: number;
consumerThroughput: number;
replicationLagP50: number;
replicationLagP95: number;
replicationLagP99: number;
// Performance
eventLoopUtilization: number;
flushDurationP50: number;
// Memory
heapUsedMB: number;
heapTotalMB: number;
-}
+};As per coding guidelines.
Also applies to: 131-163
🤖 Prompt for AI Agents
In @apps/webapp/test/performance/config.ts around lines 9 - 71, Replace each
exported interface declaration with an exported type alias preserving the exact
shape and property names/optionality for TestPhase, ProducerConfig,
ConsumerConfig, ProfilingConfig, OutputConfig, InfrastructureConfig, and
HarnessConfig; e.g., convert "export interface X { ... }" to "export type X = {
... }", keep union literal types (like tool) and imported types
(RuntimeEnvironmentType, RedisOptions) unchanged, and run a quick project-wide
type search to update any ambient interface-specific usages (none should be
required since names remain identical).
| if (config.useMockClickhouse) { | ||
| clickhouse = asMockClickHouse(new MockClickHouse(config.mockClickhouseDelay)); | ||
| } else { | ||
| // Use dynamic import to avoid module resolution issues with tsx | ||
| const { ClickHouse } = await import("@internal/clickhouse"); | ||
| clickhouse = new ClickHouse({ | ||
| url: config.clickhouseUrl!, | ||
| name: "runs-replication-profiling", | ||
| compression: { | ||
| request: true, | ||
| }, | ||
| logLevel: "info", | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate clickhouseUrl when useMockClickhouse is false.
Line 40 uses a non-null assertion on config.clickhouseUrl, but this could be undefined when useMockClickhouse is false, leading to a runtime error.
🛡️ Proposed fix to validate clickhouseUrl
let clickhouse: Pick<ClickHouse, "taskRuns">;
if (config.useMockClickhouse) {
clickhouse = asMockClickHouse(new MockClickHouse(config.mockClickhouseDelay));
} else {
+ if (!config.clickhouseUrl) {
+ throw new Error("clickhouseUrl is required when useMockClickhouse is false");
+ }
// Use dynamic import to avoid module resolution issues with tsx
const { ClickHouse } = await import("@internal/clickhouse");
clickhouse = new ClickHouse({
- url: config.clickhouseUrl!,
+ url: config.clickhouseUrl,
name: "runs-replication-profiling",
compression: {
request: true,
},
logLevel: "info",
});
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (config.useMockClickhouse) { | |
| clickhouse = asMockClickHouse(new MockClickHouse(config.mockClickhouseDelay)); | |
| } else { | |
| // Use dynamic import to avoid module resolution issues with tsx | |
| const { ClickHouse } = await import("@internal/clickhouse"); | |
| clickhouse = new ClickHouse({ | |
| url: config.clickhouseUrl!, | |
| name: "runs-replication-profiling", | |
| compression: { | |
| request: true, | |
| }, | |
| logLevel: "info", | |
| }); | |
| } | |
| let clickhouse: Pick<ClickHouse, "taskRuns">; | |
| if (config.useMockClickhouse) { | |
| clickhouse = asMockClickHouse(new MockClickHouse(config.mockClickhouseDelay)); | |
| } else { | |
| if (!config.clickhouseUrl) { | |
| throw new Error("clickhouseUrl is required when useMockClickhouse is false"); | |
| } | |
| // Use dynamic import to avoid module resolution issues with tsx | |
| const { ClickHouse } = await import("@internal/clickhouse"); | |
| clickhouse = new ClickHouse({ | |
| url: config.clickhouseUrl, | |
| name: "runs-replication-profiling", | |
| compression: { | |
| request: true, | |
| }, | |
| logLevel: "info", | |
| }); | |
| } |
🤖 Prompt for AI Agents
In @apps/webapp/test/performance/consumer-runner.ts around lines 34 - 47, When
config.useMockClickhouse is false, validate that config.clickhouseUrl is present
before using it in the new ClickHouse call; update the block that imports
ClickHouse and constructs clickhouse (the dynamic import + new ClickHouse({...
url: config.clickhouseUrl!, ... })) to check for a non-empty
config.clickhouseUrl, and if missing throw or log a clear error (or exit) rather
than using the non-null assertion so you avoid a runtime crash.
| export interface DataGeneratorOptions { | ||
| organizationId: string; | ||
| projectId: string; | ||
| runtimeEnvironmentId: string; | ||
| environmentType: RuntimeEnvironmentType; | ||
| payloadSizeKB: number; | ||
| includeComplexPayloads?: boolean; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Convert interface to type per coding guidelines.
The codebase coding guidelines specify: "Use types over interfaces for TypeScript". Convert DataGeneratorOptions to a type declaration.
♻️ Proposed refactor
-export interface DataGeneratorOptions {
+export type DataGeneratorOptions = {
organizationId: string;
projectId: string;
runtimeEnvironmentId: string;
environmentType: RuntimeEnvironmentType;
payloadSizeKB: number;
includeComplexPayloads?: boolean;
-}
+};As per coding guidelines.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export interface DataGeneratorOptions { | |
| organizationId: string; | |
| projectId: string; | |
| runtimeEnvironmentId: string; | |
| environmentType: RuntimeEnvironmentType; | |
| payloadSizeKB: number; | |
| includeComplexPayloads?: boolean; | |
| } | |
| export type DataGeneratorOptions = { | |
| organizationId: string; | |
| projectId: string; | |
| runtimeEnvironmentId: string; | |
| environmentType: RuntimeEnvironmentType; | |
| payloadSizeKB: number; | |
| includeComplexPayloads?: boolean; | |
| }; |
🤖 Prompt for AI Agents
In @apps/webapp/test/performance/data-generator.ts around lines 5 - 12,
DataGeneratorOptions is declared as an interface but your coding guidelines
require types; change the declaration of DataGeneratorOptions from an interface
to a type alias (export type DataGeneratorOptions = { ... }) preserving all
property names, types (including RuntimeEnvironmentType), and optional modifiers
(includeComplexPayloads?), and keep the export name unchanged so any usages
continue to work.
PR Review: Performance Optimization for Runs Replication to ClickHouseThis is a well-designed performance optimization PR that converts object-based data structures to compact arrays for ClickHouse inserts. The changes are focused and the approach is sound. Here's my detailed review: ✅ Strengths1. Smart Data Structure Optimization 2. Type-Safe Array Access (function verifyTaskRunColumnIndices() {
for (let i = 0; i < TASK_RUN_COLUMNS.length; i++) {
const column = TASK_RUN_COLUMNS[i];
const index = TASK_RUN_INDEX[column];
if (index !== i) {
throw new Error(...);
}
}
})();This ensures the indices stay synchronized with the column order. 3. Comprehensive Performance Test Harness
4. NoopClient Implementation 5. Buffer Parsing Optimization in pgoutput
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal-packages/clickhouse/src/taskRuns.test.ts (1)
85-100: Add WHERE clauses to filter byrun_idso tests validate parameterization and isolate rows properly.The queries pass
run_idparameters but don't use them in SQL WHERE clauses, causing tests to return all rows instead of filtered results. Use the{run_id: String}placeholder format in WHERE clauses and add theparamsschema toqueryPayloadsqueries that currently lack it.Proposed diff
@@ const query = client.query({ name: "query-task-runs", - query: "SELECT * FROM trigger_dev.task_runs_v2", + query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id: String}", schema: z.object({ environment_id: z.string(), run_id: z.string(), concurrency_key: z.string(), bulk_action_group_ids: z.array(z.string()), }), params: z.object({ run_id: z.string(), }), }); @@ const queryPayloads = client.query({ name: "query-raw-task-run-payloads", - query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1", + query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id: String}", schema: z.object({ run_id: z.string(), created_at: z.coerce.date(), payload: z.unknown(), }), + params: z.object({ + run_id: z.string(), + }), }); @@ const query = client.query({ name: "query-task-runs", - query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id: String}", schema: z.object({ environment_id: z.string(), run_id: z.string(), status: z.string(), }), params: z.object({ run_id: z.string(), }), }); @@ const queryPayloads = client.query({ name: "query-raw-task-run-payloads", - query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1", + query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id: String}", schema: z.object({ run_id: z.string(), created_at: z.coerce.date(), payload: z.unknown(), }), + params: z.object({ + run_id: z.string(), + }), });Also applies to: 125-136, 257-271, 419-430
🧹 Nitpick comments (3)
internal-packages/clickhouse/src/taskRuns.test.ts (3)
4-12: Make compact-array ordering self-checking (and actually use*_COLUMNS).
Right now the tests rely on comments for field order; adding simple assertions will catch column-order drift early and also givesTASK_RUN_COLUMNS/PAYLOAD_COLUMNSa concrete purpose.Proposed diff
@@ const now = Date.now(); const taskRunData: TaskRunInsertArray = [ @@ null, // max_duration_in_seconds ]; + + expect(taskRunData).toHaveLength(TASK_RUN_COLUMNS.length); @@ const payloadData: PayloadInsertArray = [ "run_1234", // run_id Date.now(), // created_at { data: { key: "value" } }, // payload ]; + + expect(payloadData).toHaveLength(PAYLOAD_COLUMNS.length);Also applies to: 30-77, 113-117
113-116: Use deterministic/portable timestamps in tests.
new Date("2025-04-30 16:34:04.312")is timezone/parse-format dependent, andDate.now()adds avoidable nondeterminism. Prefer explicit ISO strings (withZ) or reuse a single fixednow.Proposed diff
@@ - const payloadData: PayloadInsertArray = [ + const now = Date.now(); + const payloadData: PayloadInsertArray = [ "run_1234", // run_id - Date.now(), // created_at + now, // created_at { data: { key: "value" } }, // payload ]; @@ - const createdAt = new Date("2025-04-30 16:34:04.312").getTime(); - const queuedAt = new Date("2025-04-30 16:34:04.311").getTime(); + const createdAt = new Date("2025-04-30T16:34:04.312Z").getTime(); + const queuedAt = new Date("2025-04-30T16:34:04.311Z").getTime(); @@ - const createdAt = new Date("2025-04-30 16:34:04.312").getTime(); - const queuedAt = new Date("2025-04-30 16:34:04.311").getTime(); + const createdAt = new Date("2025-04-30T16:34:04.312Z").getTime(); + const queuedAt = new Date("2025-04-30T16:34:04.311Z").getTime(); @@ - const payloadData: PayloadInsertArray = [ + const now = Date.now(); + const payloadData: PayloadInsertArray = [ "run_1234", // run_id - Date.now(), // created_at + now, // created_at { data: { data: {Also applies to: 153-155, 296-298, 398-401
30-77: (Optional) Reduce the hugeTaskRunInsertArrayduplication with a tiny builder.
This will make future column additions/reordering much cheaper to update across tests.Example approach
function makeBaseTaskRun(overrides: Partial<Record<number, unknown>> = {}): TaskRunInsertArray { const base: unknown[] = [ /* ... one canonical array in correct order ... */ ]; for (const [k, v] of Object.entries(overrides)) base[Number(k)] = v; return base as TaskRunInsertArray; }(If you have index constants like
TASK_RUN_INDEX, prefer those over raw numeric indices.)Also applies to: 156-250, 299-345
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/clickhouse/src/taskRuns.test.ts
🧰 Additional context used
📓 Path-based instructions (6)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Avoid mocks or stubs in tests; use helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for unit tests
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
🧠 Learnings (7)
📓 Common learnings
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `schedules.task()` for scheduled/cron tasks instead of regular `task()`
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Subscribe to run updates using `runs.subscribeToRun()` for realtime monitoring of task execution
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: Cursor Bugbot
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: claude-review
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
internal-packages/clickhouse/src/taskRuns.test.ts (1)
14-16: Ensure this runs under vitest (repo guideline).
If vitest globals aren’t enabled, importdescribe/expectfromvitest.
PR Review: perf(runs-replication): Improve CPU efficiency and throughputThanks for this comprehensive performance optimization PR! This is a well-thought-out approach to improving the runs replication pipeline. Here is my detailed review: ✅ Strengths1. Performance Optimization Strategy
2. Type Safety Mechanisms
3. Comprehensive Performance Testing Harness
4. Clean Refactoring
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @internal-packages/clickhouse/src/taskRuns.ts:
- Around line 220-266: TaskRunInsertArray and PayloadInsertArray expect
JSON-wrapped objects ({ data: ... }) but the zod schemas TaskRunV2
(output/error) and RawTaskRunPayloadV1 (payload) allow raw values, causing a
type mismatch; fix by either (A) changing the zod definitions to require the
wrapper (e.g., output: z.object({ data: z.unknown() }), error: z.object({ data:
z.unknown() }), payload: z.object({ data: z.unknown() })) so validated data
matches TaskRunInsertArray/PayloadInsertArray, or (B) relax the tuple types to
accept unknown for those fields to match existing zod schemas; also add a short
comment documenting the chosen convention and mention enable_json_type = 1 if
ClickHouse expects the { data: ... } wrapper.
🧹 Nitpick comments (3)
internal-packages/clickhouse/src/taskRuns.test.ts (1)
28-75: Consider extracting a test data factory function.The 45-element tuple construction is very verbose and will need to be updated in multiple places if column order changes. Consider extracting a helper function that builds
TaskRunInsertArrayfrom a partial object with sensible defaults.Example factory pattern
function createTaskRunArray(overrides: Partial<{ environment_id: string; run_id: string; status: string; // ... other commonly varied fields }>): TaskRunInsertArray { const now = Date.now(); return [ overrides.environment_id ?? "env_default", "org_default", "project_default", overrides.run_id ?? "run_default", now, now, overrides.status ?? "PENDING", // ... rest with defaults ]; }This would make tests more maintainable and reduce duplication.
internal-packages/clickhouse/src/taskRuns.ts (2)
170-181: Settings override could disable critical JSON type configuration.Both compact insert functions allow the provided
settingsparameter to override critical defaults likeenable_json_typeandtype_json_skip_duplicated_paths(lines 176-177, 291-292). If these settings are required for correctness, consider making them non-overridable or documenting why overrides might be needed.Make critical settings non-overridable
export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insertCompactRaw({ name: "insertTaskRunsCompactArrays", table: "trigger_dev.task_runs_v2", columns: TASK_RUN_COLUMNS, settings: { + ...settings, enable_json_type: 1, type_json_skip_duplicated_paths: 1, - ...settings, }, }); }This ensures JSON type settings cannot be overridden while still allowing other settings to be customized.
Also applies to: 278-296
158-168: Consider lazy verification instead of module-load time.The runtime verification runs at module load time via an IIFE. While this catches errors early, it executes even if the code is never used. Consider one of these alternatives:
- Run verification in a test instead of production code
- Use a lazy initialization pattern that verifies on first use
- Keep current approach but add a comment explaining the tradeoff
The current approach is acceptable but adds startup cost to all module consumers.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal-packages/clickhouse/src/taskRuns.test.tsinternal-packages/clickhouse/src/taskRuns.ts
🧰 Additional context used
📓 Path-based instructions (6)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
internal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/taskRuns.test.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
internal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/taskRuns.test.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
internal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/taskRuns.test.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
internal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/taskRuns.test.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Avoid mocks or stubs in tests; use helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for unit tests
Files:
internal-packages/clickhouse/src/taskRuns.test.ts
🧠 Learnings (7)
📓 Common learnings
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use the `task()` function from `trigger.dev/sdk/v3` to define tasks with id and run properties
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `yourTask.batchTrigger()` to trigger multiple runs of a task from inside another task
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsinternal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
internal-packages/clickhouse/src/taskRuns.test.ts
🧬 Code graph analysis (2)
internal-packages/clickhouse/src/taskRuns.ts (2)
internal-packages/clickhouse/src/index.ts (1)
TASK_RUN_COLUMNS(36-36)internal-packages/clickhouse/src/client/types.ts (1)
ClickhouseWriter(209-245)
internal-packages/clickhouse/src/taskRuns.test.ts (1)
internal-packages/clickhouse/src/taskRuns.ts (4)
insertTaskRunsCompactArrays(170-181)insertRawTaskRunPayloadsCompactArrays(278-296)TaskRunInsertArray(220-266)PayloadInsertArray(272-276)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: Cursor Bugbot
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: claude-review
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
internal-packages/clickhouse/src/taskRuns.test.ts (2)
4-10: LGTM: Clean migration to compact array APIs.The import updates correctly reference the new compact array insert functions and tuple types.
111-115: No action needed. The{ data: ... }wrapper format is correct and required by the insert path.The
PayloadInsertArraytuple type requirespayload: { data: unknown }because the#prepareJsonmethod explicitly wraps payloads in this format before insertion (seerunsReplicationService.server.tslines 918–935). TheRawTaskRunPayloadV1schema definespayload: z.unknown()for reading because the read schema is intentionally generic—the actual stored format is the wrapped structure. The test code correctly follows the insertion format requirement.Likely an incorrect or invalid review comment.
internal-packages/clickhouse/src/taskRuns.ts (1)
56-152: Excellent type-safe column mapping with runtime verification.The combination of:
TASK_RUN_COLUMNSas the source of truthTASK_RUN_INDEXwithsatisfiesconstraint for compile-time checking- Runtime verification at module load time
...provides strong guarantees against index drift. This is a well-designed pattern for maintaining synchronization between column order and index mappings.
| export type TaskRunInsertArray = [ | ||
| environment_id: string, | ||
| organization_id: string, | ||
| project_id: string, | ||
| run_id: string, | ||
| updated_at: number, | ||
| created_at: number, | ||
| status: string, | ||
| environment_type: string, | ||
| friendly_id: string, | ||
| attempt: number, | ||
| engine: string, | ||
| task_identifier: string, | ||
| queue: string, | ||
| schedule_id: string, | ||
| batch_id: string, | ||
| completed_at: number | null, | ||
| started_at: number | null, | ||
| executed_at: number | null, | ||
| delay_until: number | null, | ||
| queued_at: number | null, | ||
| expired_at: number | null, | ||
| usage_duration_ms: number, | ||
| cost_in_cents: number, | ||
| base_cost_in_cents: number, | ||
| output: { data: unknown }, | ||
| error: { data: unknown }, | ||
| tags: string[], | ||
| task_version: string, | ||
| sdk_version: string, | ||
| cli_version: string, | ||
| machine_preset: string, | ||
| root_run_id: string, | ||
| parent_run_id: string, | ||
| depth: number, | ||
| span_id: string, | ||
| trace_id: string, | ||
| idempotency_key: string, | ||
| expiration_ttl: string, | ||
| is_test: boolean, | ||
| _version: string, | ||
| _is_deleted: number, | ||
| concurrency_key: string, | ||
| bulk_action_group_ids: string[], | ||
| worker_queue: string, | ||
| max_duration_in_seconds: number | null, | ||
| ]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "taskRuns.ts" -path "*/clickhouse/*" -type fRepository: triggerdotdev/trigger.dev
Length of output: 115
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | head -100Repository: triggerdotdev/trigger.dev
Length of output: 3369
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '200,210p'Repository: triggerdotdev/trigger.dev
Length of output: 433
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '30,40p'Repository: triggerdotdev/trigger.dev
Length of output: 460
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '270,280p'Repository: triggerdotdev/trigger.dev
Length of output: 411
🏁 Script executed:
grep -n "TaskRunInsertArray\|PayloadInsertArray" internal-packages/clickhouse/src/taskRuns.ts | head -20Repository: triggerdotdev/trigger.dev
Length of output: 146
🏁 Script executed:
grep -n "insertRawTaskRunCompactArrays\|insertRawTaskRunPayloadsCompactArrays" internal-packages/clickhouse/src/taskRuns.tsRepository: triggerdotdev/trigger.dev
Length of output: 182
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '230,260p'Repository: triggerdotdev/trigger.dev
Length of output: 1038
🏁 Script executed:
grep -rn "insertRawTaskRunCompactArrays\|insertRawTaskRunPayloadsCompactArrays" internal-packages/clickhouse/src/ --include="*.ts" | grep -v "export function\|export const"Repository: triggerdotdev/trigger.dev
Length of output: 738
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '278,330p'Repository: triggerdotdev/trigger.dev
Length of output: 1976
🏁 Script executed:
grep -n "insertRawTaskRunCompactArrays\|export function insert" internal-packages/clickhouse/src/taskRuns.tsRepository: triggerdotdev/trigger.dev
Length of output: 420
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '100,180p'Repository: triggerdotdev/trigger.dev
Length of output: 2662
🏁 Script executed:
grep -rn "TaskRunInsertArray\|PayloadInsertArray" internal-packages/clickhouse/ --include="*.ts" | grep -v "^[^:]*:export type"Repository: triggerdotdev/trigger.dev
Length of output: 979
🏁 Script executed:
grep -rn "insertTaskRunsCompactArrays\|insertRawTaskRunPayloadsCompactArrays" internal-packages/ --include="*.ts" -A 3 | head -50Repository: triggerdotdev/trigger.dev
Length of output: 3871
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.test.ts | sed -n '20,50p'Repository: triggerdotdev/trigger.dev
Length of output: 1260
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.test.ts | sed -n '50,90p'Repository: triggerdotdev/trigger.dev
Length of output: 1856
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.test.ts | sed -n '110,130p'Repository: triggerdotdev/trigger.dev
Length of output: 952
🏁 Script executed:
rg "RawTaskRunPayloadV1|insertRawTaskRunPayloads" internal-packages/ --include="*.ts" -B 2 -A 5 | head -80Repository: triggerdotdev/trigger.dev
Length of output: 510
🏁 Script executed:
rg "RawTaskRunPayloadV1|insertRawTaskRunPayloads" internal-packages/ -t ts -B 2 -A 5 | head -80Repository: triggerdotdev/trigger.dev
Length of output: 5809
🏁 Script executed:
grep -rn "\.insert(" internal-packages/clickhouse/src/taskRuns.ts -A 5Repository: triggerdotdev/trigger.dev
Length of output: 455
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '184,195p'Repository: triggerdotdev/trigger.dev
Length of output: 468
🏁 Script executed:
cat -n internal-packages/clickhouse/src/taskRuns.ts | sed -n '298,315p'Repository: triggerdotdev/trigger.dev
Length of output: 771
🏁 Script executed:
grep -rn "enable_json_type\|type_json_skip_duplicated_paths" internal-packages/clickhouse/ -B 5 -A 2Repository: triggerdotdev/trigger.dev
Length of output: 8438
🏁 Script executed:
grep -rn "data:" internal-packages/clickhouse/src/ -t ts | grep -E "output|error|payload" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 179
🏁 Script executed:
grep -rn "{ data:" internal-packages/clickhouse/ -t ts -B 3 -A 3 | head -60Repository: triggerdotdev/trigger.dev
Length of output: 179
🏁 Script executed:
cat -n internal-packages/clickhouse/schema/004_create_task_runs_v2.sql | sed -n '1,80p'Repository: triggerdotdev/trigger.dev
Length of output: 3640
🏁 Script executed:
grep -rn "output\|error" internal-packages/clickhouse/schema/004_create_task_runs_v2.sqlRepository: triggerdotdev/trigger.dev
Length of output: 178
🏁 Script executed:
cat -n internal-packages/clickhouse/schema/004_create_task_runs_v2.sqlRepository: triggerdotdev/trigger.dev
Length of output: 4170
🏁 Script executed:
grep -rn "insertCompactRaw\|\.insert(" internal-packages/clickhouse/src/client/ --include="*.ts" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 991
🏁 Script executed:
find internal-packages/clickhouse/src/client -name "*.ts" -type fRepository: triggerdotdev/trigger.dev
Length of output: 424
🏁 Script executed:
cat -n internal-packages/clickhouse/src/client/client.ts | sed -n '803,870p'Repository: triggerdotdev/trigger.dev
Length of output: 2957
🏁 Script executed:
grep -rn "insertTaskRuns\|insertRawTaskRunPayloads" --include="*.ts" --include="*.tsx" internal-packages/ | grep -v "export\|test\|schema" | head -30Repository: triggerdotdev/trigger.dev
Length of output: 830
🏁 Script executed:
grep -rn "insertCompactArrays\|insertPayloadsCompactArrays" --include="*.ts" --include="*.tsx" internal-packages/ | grep -v "export\|test\|schema" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 316
🏁 Script executed:
grep -rn "insertCompactArrays\|insertPayloadsCompactArrays" --include="*.ts" --include="*.tsx" . --exclude-dir=node_modules --exclude-dir=.git | grep -v "internal-packages/clickhouse" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 554
🏁 Script executed:
find . -name "*.ts" -o -name "*.tsx" | xargs grep -l "TaskRunInsertArray\|PayloadInsertArray" | grep -v test | grep -v node_modulesRepository: triggerdotdev/trigger.dev
Length of output: 175
🏁 Script executed:
grep -rn "output\|error\|payload" internal-packages/clickhouse/src/taskRuns.test.ts | grep "{ data" | head -10Repository: triggerdotdev/trigger.dev
Length of output: 497
🏁 Script executed:
cat -n ./apps/webapp/app/services/runsReplicationService.server.ts | sed -n '765,800p'Repository: triggerdotdev/trigger.dev
Length of output: 1538
🏁 Script executed:
cat -n ./apps/webapp/app/services/runsReplicationService.server.ts | sed -n '700,775p'Repository: triggerdotdev/trigger.dev
Length of output: 2856
🏁 Script executed:
grep -rn "output.*data\|error.*data" ./apps/webapp/app/services/runsReplicationService.server.ts -B 5 -A 5 | head -60Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
grep -rn "{ data:" ./apps/webapp/app/ --include="*.ts" | grep -E "output|error|payload" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 1930
🏁 Script executed:
cat -n ./apps/webapp/app/services/runsReplicationService.server.ts | sed -n '1,100p'Repository: triggerdotdev/trigger.dev
Length of output: 3777
🏁 Script executed:
cat -n ./apps/webapp/app/services/runsReplicationService.server.ts | sed -n '875,920p'Repository: triggerdotdev/trigger.dev
Length of output: 2143
🏁 Script executed:
grep -rn "TASK_RUN_INDEX\|output\|error" ./apps/webapp/app/services/runsReplicationService.server.ts -B 2 -A 2 | head -80Repository: triggerdotdev/trigger.dev
Length of output: 2940
Type inconsistency between zod schema and tuple types needs clarification.
The zod schemas accept unstructured data while the tuple types enforce a specific wrapper format:
TaskRunV2:output: z.unknown()anderror: z.unknown()(lines 30-31)RawTaskRunPayloadV1:payload: z.unknown()(line 200)
But the corresponding tuple types require { data: ... } wrapper:
TaskRunInsertArray:outputanderroras{ data: unknown }(lines 245-246)PayloadInsertArray:payloadas{ data: unknown }(line 275)
This mismatch means data prepared with the zod schemas won't satisfy the tuple type constraints. Either the schemas should be more restrictive (requiring the wrapper structure), or the tuple types should match the schema flexibility. Consider documenting the intent behind the { data: ... } wrapper format, especially if it's required by ClickHouse's JSON column handling with enable_json_type = 1.
🤖 Prompt for AI Agents
In @internal-packages/clickhouse/src/taskRuns.ts around lines 220 - 266,
TaskRunInsertArray and PayloadInsertArray expect JSON-wrapped objects ({ data:
... }) but the zod schemas TaskRunV2 (output/error) and RawTaskRunPayloadV1
(payload) allow raw values, causing a type mismatch; fix by either (A) changing
the zod definitions to require the wrapper (e.g., output: z.object({ data:
z.unknown() }), error: z.object({ data: z.unknown() }), payload: z.object({
data: z.unknown() })) so validated data matches
TaskRunInsertArray/PayloadInsertArray, or (B) relax the tuple types to accept
unknown for those fields to match existing zod schemas; also add a short comment
documenting the chosen convention and mention enable_json_type = 1 if ClickHouse
expects the { data: ... } wrapper.
- Add back insertTaskRuns and insertRawTaskRunPayloads for tsql tests - Generate TASK_RUN_INDEX and PAYLOAD_INDEX programmatically from column arrays - Fix maxDurationInSeconds default from 0 to null - Update runsReplicationService.part2.test.ts to use index constants Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Code Review for PR #2866Title: perf(runs-replication): Improve the CPU efficiency and throughput of the runs replication to clickhouse SummaryThis PR introduces a significant performance optimization to the runs replication service by switching from object-based to array-based data structures for ClickHouse inserts. The changes are well-designed and include comprehensive testing infrastructure. Code Quality and Best PracticesStrengths:
Suggestions:
Potential Issues
Performance ConsiderationsPositive:
Potential Concerns:
Security ConcernsNo security issues identified. The changes are purely performance-related and do not affect authentication, authorization, or input validation. Test CoverageStrengths:
Suggestions:
Overall AssessmentThis is a well-executed performance optimization. The code changes are clean, type-safe, and include proper testing. The main risk is the implicit coupling between column order and array construction, which should be validated either at compile-time (via TypeScript) or runtime (via assertions). Recommendation: Approve with minor suggestions - Consider adding the column count validation to prevent silent data corruption if the arrays get out of sync. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @apps/webapp/app/services/runsReplicationService.server.ts:
- Around line 583-596: The comparator used to sort task-run rows (the anonymous
comparator that references TASK_RUN_INDEX and fields like organization_id,
project_id, environment_id, created_at, and run_id) incorrectly returns 1 when
values are equal, which can violate comparator contracts; update each comparison
to return 0 when the compared fields are equal (e.g., after comparing created_at
if subtraction is zero return 0, and change the final return to return 0 when
a[TASK_RUN_INDEX.run_id] === b[TASK_RUN_INDEX.run_id] instead of always
returning 1); apply the same fix to the other comparator block that uses the
same fields (the one noted around the other occurrence).
In @internal-packages/clickhouse/src/taskRuns.ts:
- Around line 218-236: The function insertRawTaskRunPayloadsCompactArrays
currently spreads caller settings after hardcoded async defaults which can
produce surprising precedence; change to define a single DEFAULT_ASYNC_SETTINGS
constant and merge so caller-provided settings override defaults (e.g., merge
with defaults first then ...settings, or use Object.assign({},
DEFAULT_ASYNC_SETTINGS, settings)), or centralize these defaults in one place
and document the exact precedence, and apply the merged object to the settings
property in the insertCompactRaw call; ensure you update the settings merge in
insertRawTaskRunPayloadsCompactArrays to use that pattern and add a brief
comment about precedence.
- Around line 156-217: The TaskRunInsertArray defines output and error as {
data: unknown } which mismatches the ClickHouse JSON columns; update the tuple
type in TaskRunInsertArray to use output: unknown and error: unknown (or
alternatively handle wrapping/unwrapping at the insertion point if you
intentionally want the nested shape) so the TypeScript types align with the
ClickHouse schema for columns named output and error.
🧹 Nitpick comments (5)
internal-packages/clickhouse/src/taskRuns.ts (2)
55-123: Avoid unsafe cast forTASK_RUN_INDEX; prefersatisfiesto keep compile-time guarantees.
as { readonly [K in TaskRunColumnName]: number }can mask mistakes during refactors. Prefersatisfiesso TS validates the mapping without widening.Proposed change
export const TASK_RUN_INDEX = Object.fromEntries( TASK_RUN_COLUMNS.map((col, idx) => [col, idx]) -) as { readonly [K in TaskRunColumnName]: number }; +) satisfies { readonly [K in TaskRunColumnName]: number };
147-155: Apply samesatisfiespattern toPAYLOAD_INDEXfor consistency/safety.Proposed change
export const PAYLOAD_INDEX = Object.fromEntries( PAYLOAD_COLUMNS.map((col, idx) => [col, idx]) -) as { readonly [K in PayloadColumnName]: number }; +) satisfies { readonly [K in PayloadColumnName]: number };apps/webapp/test/runsReplicationService.part2.test.ts (2)
891-900: Consider removing optional chaining after you assertbatchFlushedEvents[0]exists.Right now
batchFlushedEvents?.[0]....can turn “missing event” into a less-direct failure. Once you’ve asserted the array has an element, prefer direct indexing for clearer failures.
1069-1082: Sorting assertions look good; consider using a comparator helper to reduce duplication.The lexicographic compare loop is fine, but extracting a small
compareTuple(prevKey, currKey)helper would make future changes less error-prone.Also applies to: 1111-1112
apps/webapp/app/services/runsReplicationService.server.ts (1)
700-703: Make retry metric attribute selection explicit (avoid string-contains heuristic).
operationName.includes("task run")is a little brittle. Consider passingoperation: "task_runs" | "payloads"into#insertWithRetryinstead.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
apps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.tsinternal-packages/clickhouse/src/taskRuns.ts
🧰 Additional context used
📓 Path-based instructions (11)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use zod for validation in packages/core and apps/webapp
Files:
apps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
apps/webapp/app/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access all environment variables through the
envexport ofenv.server.tsinstead of directly accessingprocess.envin the Trigger.dev webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/app/services/**/*.server.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Separate testable services from configuration files; follow the pattern of
realtimeClient.server.ts(testable service) andrealtimeClientGlobal.server.ts(configuration) in the webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}: When importing from@trigger.dev/corein the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp
Files:
apps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
apps/webapp/test/runsReplicationService.part2.test.ts
apps/webapp/**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Test files should only import classes and functions from
app/**/*.tsfiles and should not importenv.server.tsdirectly or indirectly; pass configuration through options instead
Files:
apps/webapp/test/runsReplicationService.part2.test.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Avoid mocks or stubs in tests; use helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for unit tests
Files:
apps/webapp/test/runsReplicationService.part2.test.ts
🧠 Learnings (20)
📓 Common learnings
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Export tasks with unique IDs within the project to enable proper task discovery and execution
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/test/runsReplicationService.part2.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use the `task()` function from `trigger.dev/sdk/v3` to define tasks with id and run properties
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Subscribe to run updates using `runs.subscribeToRun()` for realtime monitoring of task execution
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Applied to files:
internal-packages/clickhouse/src/taskRuns.tsapps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `yourTask.batchTrigger()` to trigger multiple runs of a task from inside another task
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `schedules.task()` for scheduled/cron tasks instead of regular `task()`
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-08-14T18:35:44.370Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: apps/webapp/app/env.server.ts:764-765
Timestamp: 2025-08-14T18:35:44.370Z
Learning: The BoolEnv helper in apps/webapp/app/utils/boolEnv.ts uses z.preprocess with inconsistent default value types across the codebase - some usages pass boolean defaults (correct) while others pass string defaults (incorrect), leading to type confusion. The helper should enforce boolean-only defaults or have clearer documentation.
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `schemaTask()` from `trigger.dev/sdk/v3` with Zod schema for payload validation
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2026-01-08T15:57:09.298Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/otel-metrics.mdc:0-0
Timestamp: 2026-01-08T15:57:09.298Z
Learning: Applies to **/*.ts : Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to {packages/core,apps/webapp}/**/*.{ts,tsx} : Use zod for validation in packages/core and apps/webapp
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-09-03T14:34:41.781Z
Learnt from: myftija
Repo: triggerdotdev/trigger.dev PR: 2464
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.settings/route.tsx:357-371
Timestamp: 2025-09-03T14:34:41.781Z
Learning: When using Zod's safeParse, the .data property is undefined when parsing fails, but TypeScript may still complain about accessing .data without checking .success first. The suggested approach of checking .success before accessing .data improves type safety and code clarity.
Applied to files:
internal-packages/clickhouse/src/taskRuns.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
apps/webapp/app/services/runsReplicationService.server.tsapps/webapp/test/runsReplicationService.part2.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `runs.subscribeToBatch()` to subscribe to changes for all runs in a batch
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `.withStreams()` to subscribe to realtime streams from task metadata in addition to run changes
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Limit task duration using the `maxDuration` property (in seconds)
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/**/*.test.{ts,tsx} : Test files should only import classes and functions from `app/**/*.ts` files and should not import `env.server.ts` directly or indirectly; pass configuration through options instead
Applied to files:
apps/webapp/test/runsReplicationService.part2.test.ts
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed
Applied to files:
apps/webapp/test/runsReplicationService.part2.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerAndWait()` to batch trigger multiple different tasks and wait for results
Applied to files:
apps/webapp/test/runsReplicationService.part2.test.ts
🧬 Code graph analysis (2)
apps/webapp/app/services/runsReplicationService.server.ts (5)
internal-packages/clickhouse/src/taskRuns.ts (4)
TaskRunInsertArray(160-206)PayloadInsertArray(212-216)TASK_RUN_INDEX(108-110)PAYLOAD_INDEX(152-154)internal-packages/clickhouse/src/index.ts (2)
TASK_RUN_INDEX(37-37)PAYLOAD_INDEX(39-39)apps/webapp/app/v3/tracing.server.ts (1)
startSpan(7-35)apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts (1)
span(1926-1943)apps/webapp/app/v3/services/triggerTaskV1.server.ts (2)
payload(733-764)payload(766-776)
apps/webapp/test/runsReplicationService.part2.test.ts (2)
internal-packages/clickhouse/src/taskRuns.ts (2)
TASK_RUN_INDEX(108-110)PAYLOAD_INDEX(152-154)internal-packages/clickhouse/src/index.ts (2)
TASK_RUN_INDEX(37-37)PAYLOAD_INDEX(39-39)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Cursor Bugbot
- GitHub Check: claude-review
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
apps/webapp/test/runsReplicationService.part2.test.ts (1)
1-1: Nice: tests now useTASK_RUN_INDEX/PAYLOAD_INDEXto stay aligned with column order.apps/webapp/app/services/runsReplicationService.server.ts (4)
1-2: Good API shift:batchFlushednow exposes typed array inserts (matches perf goal).Also applies to: 84-86
175-177: OTEL metric attributes are low-cardinality (👍); keep it that way.
event_typeis bounded to insert/update/delete, which aligns with the repo’s OTEL cardinality guidance. Based on learnings, keep avoiding IDs/unbounded values in metric attributes.Also applies to: 503-505
815-846: Array construction matches the declared column order; looks consistent.The positional array returned by
#prepareTaskRunInsert()appears aligned withTASK_RUN_COLUMNS, and payload insert aligns withPAYLOAD_COLUMNS.Also applies to: 854-905, 907-916
760-767: Pattern is correct and well-supported. Theparams.clickhouse_settingsapproach is the standard @clickhouse/client API for per-request settings. The internal ClickHouse wrapper explicitly mergesoptions?.params?.clickhouse_settingsinto the request, confirming this is intentional and works as expected. Even{}is safe—it simply spreads to an empty settings object. No action needed.
| /** | ||
| * Type-safe tuple representing a task run insert array. | ||
| * Order matches TASK_RUN_COLUMNS exactly. | ||
| */ | ||
| export type TaskRunInsertArray = [ | ||
| environment_id: string, | ||
| organization_id: string, | ||
| project_id: string, | ||
| run_id: string, | ||
| updated_at: number, | ||
| created_at: number, | ||
| status: string, | ||
| environment_type: string, | ||
| friendly_id: string, | ||
| attempt: number, | ||
| engine: string, | ||
| task_identifier: string, | ||
| queue: string, | ||
| schedule_id: string, | ||
| batch_id: string, | ||
| completed_at: number | null, | ||
| started_at: number | null, | ||
| executed_at: number | null, | ||
| delay_until: number | null, | ||
| queued_at: number | null, | ||
| expired_at: number | null, | ||
| usage_duration_ms: number, | ||
| cost_in_cents: number, | ||
| base_cost_in_cents: number, | ||
| output: { data: unknown }, | ||
| error: { data: unknown }, | ||
| tags: string[], | ||
| task_version: string, | ||
| sdk_version: string, | ||
| cli_version: string, | ||
| machine_preset: string, | ||
| root_run_id: string, | ||
| parent_run_id: string, | ||
| depth: number, | ||
| span_id: string, | ||
| trace_id: string, | ||
| idempotency_key: string, | ||
| expiration_ttl: string, | ||
| is_test: boolean, | ||
| _version: string, | ||
| _is_deleted: number, | ||
| concurrency_key: string, | ||
| bulk_action_group_ids: string[], | ||
| worker_queue: string, | ||
| max_duration_in_seconds: number | null, | ||
| ]; | ||
|
|
||
| /** | ||
| * Type-safe tuple representing a payload insert array. | ||
| * Order matches PAYLOAD_COLUMNS exactly. | ||
| */ | ||
| export type PayloadInsertArray = [ | ||
| run_id: string, | ||
| created_at: number, | ||
| payload: { data: unknown }, | ||
| ]; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's find the schema definition files and the TASK_RUN_COLUMNS constant
fd -t f -e ts -e sql -e js | grep -E "(schema|migration|column)" | head -20Repository: triggerdotdev/trigger.dev
Length of output: 854
🏁 Script executed:
# Search for TASK_RUN_COLUMNS constant definition
rg "TASK_RUN_COLUMNS" -A 5Repository: triggerdotdev/trigger.dev
Length of output: 3240
🏁 Script executed:
# Look for ClickHouse table creation or schema definition
rg "CREATE TABLE.*task_run" -i -A 30Repository: triggerdotdev/trigger.dev
Length of output: 8501
🏁 Script executed:
# Check for any schema definition in the taskRuns.ts file itself
head -n 200 internal-packages/clickhouse/src/taskRuns.ts | tail -n 100Repository: triggerdotdev/trigger.dev
Length of output: 2870
🏁 Script executed:
cat internal-packages/clickhouse/schema/004_create_task_runs_v2.sqlRepository: triggerdotdev/trigger.dev
Length of output: 3512
🏁 Script executed:
# Also check if there's a v2 payload table
grep -A 20 "raw_task_runs_payload_v2\|payload.*table" internal-packages/clickhouse/schema/004_create_task_runs_v2.sqlRepository: triggerdotdev/trigger.dev
Length of output: 51
Fix output and error types to match ClickHouse schema.
The ClickHouse schema defines output and error as plain JSON types (not wrapped in { data: ... }):
output JSON(max_dynamic_paths = 1024),
error JSON(max_dynamic_paths = 64),
However, TaskRunInsertArray types them as { data: unknown }. This mismatch will cause data to be persisted with an unexpected nested structure. Change to output: unknown and error: unknown to match the schema, or explicitly wrap/unwrap values during insertion if the nested structure is intentional.
🤖 Prompt for AI Agents
In @internal-packages/clickhouse/src/taskRuns.ts around lines 156 - 217, The
TaskRunInsertArray defines output and error as { data: unknown } which
mismatches the ClickHouse JSON columns; update the tuple type in
TaskRunInsertArray to use output: unknown and error: unknown (or alternatively
handle wrapping/unwrapping at the insertion point if you intentionally want the
nested shape) so the TypeScript types align with the ClickHouse schema for
columns named output and error.
| export function insertRawTaskRunPayloadsCompactArrays( | ||
| ch: ClickhouseWriter, | ||
| settings?: ClickHouseSettings | ||
| ) { | ||
| return ch.insertCompactRaw({ | ||
| name: "insertRawTaskRunPayloadsCompactArrays", | ||
| table: "trigger_dev.raw_task_runs_payload_v1", | ||
| columns: PAYLOAD_COLUMNS, | ||
| settings: { | ||
| async_insert: 1, | ||
| wait_for_async_insert: 0, | ||
| async_insert_max_data_size: "1000000", | ||
| async_insert_busy_timeout_ms: 1000, | ||
| enable_json_type: 1, | ||
| type_json_skip_duplicated_paths: 1, | ||
| ...settings, | ||
| }, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double-check async insert defaults won’t fight caller-provided settings.
insertRawTaskRunPayloadsCompactArrays bakes in async insert settings (e.g. wait_for_async_insert: 0) while also allowing ...settings. If upstream also injects settings (via client params or per-request overrides), it’s easy to end up with surprising precedence. Consider documenting precedence explicitly or centralizing the default settings in one place.
🤖 Prompt for AI Agents
In @internal-packages/clickhouse/src/taskRuns.ts around lines 218 - 236, The
function insertRawTaskRunPayloadsCompactArrays currently spreads caller settings
after hardcoded async defaults which can produce surprising precedence; change
to define a single DEFAULT_ASYNC_SETTINGS constant and merge so caller-provided
settings override defaults (e.g., merge with defaults first then ...settings, or
use Object.assign({}, DEFAULT_ASYNC_SETTINGS, settings)), or centralize these
defaults in one place and document the exact precedence, and apply the merged
object to the settings property in the insertCompactRaw call; ensure you update
the settings merge in insertRawTaskRunPayloadsCompactArrays to use that pattern
and add a brief comment about precedence.
- Add type predicates to filter functions for proper type narrowing - Add type assertions for tuple index accesses in sort comparisons Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Code Review: PR #2866 - Improve CPU efficiency and throughput of runs replication to ClickHouseSummaryThis PR transforms the RunsReplicationService from using object-based data format to a compact array-based format when inserting data into ClickHouse. The goal is to improve CPU efficiency and throughput during high-volume replication from PostgreSQL to ClickHouse. ✅ Strengths
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
apps/webapp/app/services/runsReplicationService.server.ts (2)
579-606: Consider extracting sort functions for maintainability.The sorting logic correctly implements composite key ordering for ClickHouse primary keys, but the inline comparisons are verbose and repeated type assertions reduce readability.
♻️ Optional refactor to extract sort functions
Consider extracting the sort logic into helper functions:
function compareTaskRunInserts(a: TaskRunInsertArray, b: TaskRunInsertArray): number { const aOrgId = a[TASK_RUN_INDEX.organization_id] as string; const bOrgId = b[TASK_RUN_INDEX.organization_id] as string; if (aOrgId !== bOrgId) return aOrgId < bOrgId ? -1 : 1; const aProjId = a[TASK_RUN_INDEX.project_id] as string; const bProjId = b[TASK_RUN_INDEX.project_id] as string; if (aProjId !== bProjId) return aProjId < bProjId ? -1 : 1; const aEnvId = a[TASK_RUN_INDEX.environment_id] as string; const bEnvId = b[TASK_RUN_INDEX.environment_id] as string; if (aEnvId !== bEnvId) return aEnvId < bEnvId ? -1 : 1; const aCreatedAt = a[TASK_RUN_INDEX.created_at] as number; const bCreatedAt = b[TASK_RUN_INDEX.created_at] as number; if (aCreatedAt !== bCreatedAt) return aCreatedAt - bCreatedAt; const aRunId = a[TASK_RUN_INDEX.run_id] as string; const bRunId = b[TASK_RUN_INDEX.run_id] as string; return aRunId < bRunId ? -1 : 1; } function comparePayloadInserts(a: PayloadInsertArray, b: PayloadInsertArray): number { const aRunId = a[PAYLOAD_INDEX.run_id] as string; const bRunId = b[PAYLOAD_INDEX.run_id] as string; return aRunId < bRunId ? -1 : 1; }Then simplify the usage:
- .sort((a, b) => { - const aOrgId = a[TASK_RUN_INDEX.organization_id] as string; - // ... rest of the sorting logic - }); + .sort(compareTaskRunInserts);Also applies to: 610-617
836-838: Consider logging when runs are skipped due to missing required fields.The guard correctly returns an empty object (both fields are optional), but silently skipping runs without
environmentTypeororganizationIdcould make debugging data issues difficult.💡 Add logging for skipped runs
if (!run.environmentType || !run.organizationId) { + this.logger.warn("Skipping run insert due to missing required fields", { + runId: run.id, + hasEnvironmentType: !!run.environmentType, + hasOrganizationId: !!run.organizationId, + }); return {}; }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
apps/webapp/app/services/runsReplicationService.server.ts
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
apps/webapp/app/services/runsReplicationService.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use zod for validation in packages/core and apps/webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/app/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access all environment variables through the
envexport ofenv.server.tsinstead of directly accessingprocess.envin the Trigger.dev webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/app/services/**/*.server.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Separate testable services from configuration files; follow the pattern of
realtimeClient.server.ts(testable service) andrealtimeClientGlobal.server.ts(configuration) in the webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}: When importing from@trigger.dev/corein the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
apps/webapp/app/services/runsReplicationService.server.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
apps/webapp/app/services/runsReplicationService.server.ts
🧠 Learnings (7)
📓 Common learnings
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `runs.subscribeToBatch()` to subscribe to changes for all runs in a batch
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Limit task duration using the `maxDuration` property (in seconds)
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use the `task()` function from `trigger.dev/sdk/v3` to define tasks with id and run properties
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Attach metadata to task runs using the metadata option when triggering, and access/update it inside runs using metadata functions
Applied to files:
apps/webapp/app/services/runsReplicationService.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/services/runsReplicationService.server.ts (2)
internal-packages/clickhouse/src/taskRuns.ts (4)
TaskRunInsertArray(160-206)PayloadInsertArray(212-216)TASK_RUN_INDEX(108-110)PAYLOAD_INDEX(152-154)internal-packages/clickhouse/src/index.ts (2)
TASK_RUN_INDEX(37-37)PAYLOAD_INDEX(39-39)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: Cursor Bugbot
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: claude-review
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
apps/webapp/app/services/runsReplicationService.server.ts (7)
1-2: LGTM! Type changes align with performance optimization.The migration from object-based (
TaskRunV2,RawTaskRunPayloadV1) to array-based types (TaskRunInsertArray,PayloadInsertArray) along with the index constants (TASK_RUN_INDEX,PAYLOAD_INDEX) provides a good balance between performance and type safety. The event type update correctly reflects the new data structures.Also applies to: 84-86
684-766: LGTM! Well-designed retry logic with proper error classification.The retry implementation follows best practices:
- Exponential backoff with jitter prevents thundering herd
- Comprehensive permanent error pattern matching
- Proper metric recording for observability
- Generic implementation allows reuse for both insert types
768-779: LGTM! Appropriate ClickHouse async insert configuration.The settings provide proper control over async insert behavior with configurable wait semantics. The
async_insert_max_data_sizeandasync_insert_busy_timeout_msvalues are reasonable defaults for batching.
781-825: LGTM! Insert methods correctly use compact array APIs.Both methods properly:
- Accept array-based insert types
- Apply ClickHouse insert settings for async/sync operation
- Record errors to spans for observability
- Throw errors for retry logic to handle
860-917: LGTM! Array construction correctly matches TASK_RUN_COLUMNS order.The method properly:
- Maps all PostgreSQL TaskRun fields to the array positions
- Handles nullable fields with appropriate defaults
- Converts BigInt version to string for ClickHouse
- Sets deletion flag based on event type
The explicit field ordering with comments makes future maintenance easier.
919-928: LGTM! Payload array construction correctly matches PAYLOAD_COLUMNS order.The straightforward 3-element array properly maps the run ID, creation timestamp, and payload data.
175-177: LGTM! Improved metric description with appropriate low-cardinality attributes.The updated description clearly indicates what events are counted, and the
event_typeattribute used at line 504 has bounded cardinality (insert, update, delete), following OTEL best practices per coding guidelines.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
PR Review: Runs Replication Performance ImprovementsReviewer: Claude (Automated Review) OverviewThis PR implements significant performance optimizations for the Statistics: +6,889 additions, -582 deletions across 22 files Code Quality and Best Practices✅ Strengths
|
| Category | Status |
|---|---|
| Code Quality | ✅ Good with minor suggestions |
| Potential Bugs | |
| Performance | ✅ Sound approach, recommend showing benchmarks |
| Security | ✅ No concerns |
| Test Coverage |
Recommendation: Address the column order validation and error logging concerns before merge. The core optimization approach is sound and well-implemented.
This review was generated by Claude Code
Moving to a separate PR. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
PR Review: perf(runs-replication): Improve CPU efficiency and throughputOverall AssessmentThis is a well-designed performance optimization PR that converts the runs replication service from object-based inserts to compact array-based inserts for ClickHouse operations. The approach is sound and should deliver measurable performance improvements through reduced memory allocation and faster serialization. ✅ Strengths
🔍 Code Quality Observations
|
- Remove profile-runs-replication.ts script - Remove profiling-related .gitignore entries - Remove clinic/commander devDependencies Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
PR Review: Compact Array Format for ClickHouse InsertsThis is a well-executed performance optimization that introduces compact array-based inserts for ClickHouse, reducing JSON serialization overhead and improving insert performance through pre-sorted batches. Here's my detailed review: Code Quality and Best PracticesStrengths:
Suggestions:
Potential Bugs or Issues
Performance ConsiderationsPositive:
Note:
Security ConcernsNo security concerns identified. The changes are internal data transformation optimizations and don't introduce new attack surfaces. Test CoveragePositive:
Minor Note:
SummaryThis is a solid performance optimization with good engineering practices. The PR achieves its goals of reducing CPU overhead and improving ClickHouse insert throughput while maintaining backward compatibility. The main suggestions are around improving error context in logs and documenting the column order dependency. Recommendation: Approve with optional follow-up for the error logging context improvement. 🤖 Reviewed with Claude Code |
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
PR Review: perf(runs-replication): Improve the CPU efficiency and throughput of the runs replication to clickhouseOverall AssessmentThis is a well-structured performance optimization PR that switches from object-based JSON serialization to a compact array format for ClickHouse inserts. The approach is sound and should provide meaningful performance improvements for high-throughput replication scenarios. Code Quality and Best PracticesStrengths:
Suggestions:
Potential Bugs or Issues
Performance ConsiderationsPositive:
Potential concerns:
Security ConcernsNo security concerns identified. The changes are purely internal data transformation and don't affect input validation or data handling boundaries. Test CoverageStrengths:
Suggestions:
Additional Observations
SummaryThis PR makes solid performance improvements with good type safety mechanisms. The main recommendations are:
Verdict: Approve with minor suggestions |
Summary
Optimizes the runs replication service for better CPU efficiency and throughput when inserting task runs into ClickHouse.
Key Changes
JSONCompactEachRowWithNamesinstead ofJSONEachRowfor ClickHouse inserts, reducing JSON serialization overheadTaskRunInsertArrayandPayloadInsertArraytuple types with compile-time column order validationTASK_RUN_INDEXandPAYLOAD_INDEXare generated from column arrays to prevent manual synchronization errorsFiles Changed
runsReplicationService.server.ts- Core optimization to use compact array inserts@internal/clickhouse- AddedinsertCompactRawmethod and tuple typestaskRuns.ts- Column definitions, index constants, and insert functions