From 3a7ed77bf49474f40c6de870bdbb02bebe9252e4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 8 Jan 2026 14:50:28 +0000 Subject: [PATCH 1/2] feat(runs): use metrics instead of spans in the Runs Replication service --- .cursor/rules/otel-metrics.mdc | 13 + .../runsReplicationInstance.server.ts | 3 +- .../services/runsReplicationService.server.ts | 120 +++- .../test/runsReplicationService.part1.test.ts | 284 +++++++-- apps/webapp/test/utils/tracing.ts | 45 ++ .../dashboards/runs-replication.json | 593 ++++++++++++++++++ 6 files changed, 989 insertions(+), 69 deletions(-) create mode 100644 docker/config/grafana/provisioning/dashboards/runs-replication.json diff --git a/.cursor/rules/otel-metrics.mdc b/.cursor/rules/otel-metrics.mdc index 8efed4a7d5..218f07c41e 100644 --- a/.cursor/rules/otel-metrics.mdc +++ b/.cursor/rules/otel-metrics.mdc @@ -42,6 +42,19 @@ this.counter.add(1, { }); ``` +## Prometheus Metric Naming + +When metrics are exported via OTLP to Prometheus, the exporter automatically adds unit suffixes to metric names: + +| OTel Metric Name | Unit | Prometheus Name | +|------------------|------|-----------------| +| `my_duration_ms` | `ms` | `my_duration_ms_milliseconds` | +| `my_counter` | counter | `my_counter_total` | +| `items_inserted` | counter | `items_inserted_inserts_total` | +| `batch_size` | histogram | `batch_size_items_bucket` | + +Keep this in mind when writing Grafana dashboards or Prometheus queries—the metric names in Prometheus will differ from the names defined in code. + ## Reference See the schedule engine (`internal-packages/schedule-engine/src/engine/index.ts`) for a good example of low-cardinality metric attributes. diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 3830d3dd6d..8dc078d338 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -2,7 +2,7 @@ import { ClickHouse } from "@internal/clickhouse"; import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; -import { provider } from "~/v3/tracer.server"; +import { meter, provider } from "~/v3/tracer.server"; import { RunsReplicationService } from "./runsReplicationService.server"; import { signalsEmitter } from "./signals.server"; @@ -62,6 +62,7 @@ function initializeRunsReplicationInstance() { logLevel: env.RUN_REPLICATION_LOG_LEVEL, waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1", tracer: provider.getTracer("runs-replication-service"), + meter, insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES, insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS, insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS, diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index c150eb2a00..2841813bc5 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -7,7 +7,16 @@ import { type MessageUpdate, type PgoutputMessage, } from "@internal/replication"; -import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing"; +import { + getMeter, + recordSpanError, + startSpan, + trace, + type Counter, + type Histogram, + type Meter, + type Tracer, +} from "@internal/tracing"; import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import { tryCatch } from "@trigger.dev/core/utils"; import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; @@ -51,6 +60,7 @@ export type RunsReplicationServiceOptions = { logger?: Logger; logLevel?: LogLevel; tracer?: Tracer; + meter?: Meter; waitForAsyncInsert?: boolean; insertStrategy?: "insert" | "insert_async"; // Retry configuration for insert operations @@ -90,6 +100,7 @@ export class RunsReplicationService { private _isShuttingDown = false; private _isShutDownComplete = false; private _tracer: Tracer; + private _meter: Meter; private _currentParseDurationMs: number | null = null; private _lastAcknowledgedAt: number | null = null; private _acknowledgeTimeoutMs: number; @@ -103,6 +114,16 @@ export class RunsReplicationService { private _insertStrategy: "insert" | "insert_async"; private _disablePayloadInsert: boolean; + // Metrics + private _replicationLagHistogram: Histogram; + private _batchesFlushedCounter: Counter; + private _batchSizeHistogram: Histogram; + private _taskRunsInsertedCounter: Counter; + private _payloadsInsertedCounter: Counter; + private _insertRetriesCounter: Counter; + private _transactionsProcessedCounter: Counter; + private _flushDurationHistogram: Histogram; + public readonly events: EventEmitter; constructor(private readonly options: RunsReplicationServiceOptions) { @@ -110,6 +131,60 @@ export class RunsReplicationService { options.logger ?? new Logger("RunsReplicationService", options.logLevel ?? "info"); this.events = new EventEmitter(); this._tracer = options.tracer ?? trace.getTracer("runs-replication-service"); + this._meter = options.meter ?? getMeter("runs-replication"); + + // Initialize metrics + this._replicationLagHistogram = this._meter.createHistogram( + "runs_replication.replication_lag_ms", + { + description: "Replication lag from Postgres commit to processing", + unit: "ms", + } + ); + + this._batchesFlushedCounter = this._meter.createCounter("runs_replication.batches_flushed", { + description: "Total batches flushed to ClickHouse", + }); + + this._batchSizeHistogram = this._meter.createHistogram("runs_replication.batch_size", { + description: "Number of items per batch flush", + unit: "items", + }); + + this._taskRunsInsertedCounter = this._meter.createCounter( + "runs_replication.task_runs_inserted", + { + description: "Task run inserts to ClickHouse", + unit: "inserts", + } + ); + + this._payloadsInsertedCounter = this._meter.createCounter( + "runs_replication.payloads_inserted", + { + description: "Payload inserts to ClickHouse", + unit: "inserts", + } + ); + + this._insertRetriesCounter = this._meter.createCounter("runs_replication.insert_retries", { + description: "Insert retry attempts", + }); + + this._transactionsProcessedCounter = this._meter.createCounter( + "runs_replication.transactions_processed", + { + description: "Transactions received from replication", + } + ); + + this._flushDurationHistogram = this._meter.createHistogram( + "runs_replication.flush_duration_ms", + { + description: "Duration of batch flush operations", + unit: "ms", + } + ); this._acknowledgeTimeoutMs = options.acknowledgeTimeoutMs ?? 1_000; @@ -423,20 +498,13 @@ export class RunsReplicationService { })) ); - this._tracer - .startSpan("handle_transaction", { - attributes: { - "transaction.xid": transaction.xid, - "transaction.replication_lag_ms": transaction.replicationLagMs, - "transaction.events": transaction.events.length, - "transaction.commit_end_lsn": transaction.commitEndLsn, - "transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined, - "transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs, - "transaction.version": _version.toString(), - }, - startTime: transaction.beginStartTimestamp, - }) - .end(); + // Record metrics + this._replicationLagHistogram.record(transaction.replicationLagMs); + + // Count events by type + for (const event of transaction.events) { + this._transactionsProcessedCounter.add(1, { event_type: event.tag }); + } this.logger.info("handle_transaction", { transaction: { @@ -501,6 +569,8 @@ export class RunsReplicationService { batchSize: batch.length, }); + const flushStartTime = performance.now(); + await startSpan(this._tracer, "flushBatch", async (span) => { const preparedInserts = await startSpan(this._tracer, "prepare_inserts", async (span) => { return await Promise.all(batch.map(this.#prepareRunInserts.bind(this))); @@ -584,6 +654,22 @@ export class RunsReplicationService { }); this.events.emit("batchFlushed", { flushId, taskRunInserts, payloadInserts }); + + // Record metrics + const flushDurationMs = performance.now() - flushStartTime; + const hasErrors = taskRunError !== null || payloadError !== null; + + this._batchSizeHistogram.record(batch.length); + this._flushDurationHistogram.record(flushDurationMs); + this._batchesFlushedCounter.add(1, { success: !hasErrors }); + + if (!taskRunError) { + this._taskRunsInsertedCounter.add(taskRunInserts.length); + } + + if (!payloadError) { + this._payloadsInsertedCounter.add(payloadInserts.length); + } }); } @@ -615,6 +701,10 @@ export class RunsReplicationService { delay, }); + // Record retry metric + const operation = operationName.includes("task run") ? "task_runs" : "payloads"; + this._insertRetriesCounter.add(1, { operation }); + await new Promise((resolve) => setTimeout(resolve, delay)); continue; } diff --git a/apps/webapp/test/runsReplicationService.part1.test.ts b/apps/webapp/test/runsReplicationService.part1.test.ts index aaaa9dd1b8..55d863d141 100644 --- a/apps/webapp/test/runsReplicationService.part1.test.ts +++ b/apps/webapp/test/runsReplicationService.part1.test.ts @@ -5,7 +5,7 @@ import { setTimeout } from "node:timers/promises"; import { z } from "zod"; import { TaskRunStatus } from "~/database-types"; import { RunsReplicationService } from "~/services/runsReplicationService.server"; -import { createInMemoryTracing } from "./utils/tracing"; +import { createInMemoryTracing, createInMemoryMetrics } from "./utils/tracing"; import superjson from "superjson"; vi.setConfig({ testTimeout: 60_000 }); @@ -72,7 +72,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Now we insert a row into the table const taskRun = await prisma.taskRun.create({ data: { friendlyId: "run_1234", @@ -91,7 +90,6 @@ describe("RunsReplicationService (part 1/2)", () => { await setTimeout(1000); - // Check that the row was replicated to clickhouse const queryRuns = clickhouse.reader.query({ name: "runs-replication", query: "SELECT * FROM trigger_dev.task_runs_v2", @@ -119,16 +117,8 @@ describe("RunsReplicationService (part 1/2)", () => { expect(spans.length).toBeGreaterThan(0); - const transactionSpan = spans.find( - (span) => - span.name === "handle_transaction" && - typeof span.attributes["transaction.events"] === "number" && - span.attributes["transaction.events"] > 0 - ); - - expect(transactionSpan).not.toBeNull(); - expect(transactionSpan?.attributes["transaction.parse_duration_ms"]).toBeGreaterThan(0); - expect(transactionSpan?.attributes["transaction.parse_duration_ms"]).toBeLessThan(5); + const flushBatchSpan = spans.find((span) => span.name === "flushBatch"); + expect(flushBatchSpan).toBeDefined(); await runsReplicationService.stop(); } @@ -197,7 +187,6 @@ describe("RunsReplicationService (part 1/2)", () => { const date = new Date(); - // Now we insert a row into the table const taskRun = await prisma.taskRun.create({ data: { friendlyId: "run_1234", @@ -222,7 +211,6 @@ describe("RunsReplicationService (part 1/2)", () => { await setTimeout(1000); - // Check that the row was replicated to clickhouse const queryRuns = clickhouse.reader.query({ name: "runs-replication", query: "SELECT * FROM trigger_dev.task_runs_v2", @@ -276,7 +264,7 @@ describe("RunsReplicationService (part 1/2)", () => { ); containerTest( - "should not produce any handle_transaction spans when no TaskRun events are produced", + "should not produce any flush spans when no TaskRun events are produced", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -337,9 +325,9 @@ describe("RunsReplicationService (part 1/2)", () => { const spans = exporter.getFinishedSpans(); - const handleTransactionSpans = spans.filter((span) => span.name === "handle_transaction"); + const flushBatchSpans = spans.filter((span) => span.name === "flushBatch"); - expect(handleTransactionSpans.length).toBe(0); + expect(flushBatchSpans.length).toBe(0); await runsReplicationService.stop(); } @@ -400,7 +388,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert a row into the table with a unique friendlyId const uniqueFriendlyId = `run_batching_${Date.now()}`; const taskRun = await prisma.taskRun.create({ data: { @@ -418,10 +405,8 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for replication await setTimeout(1000); - // Query ClickHouse for the replicated run const queryRuns = clickhouse.reader.query({ name: "runs-replication-batching", query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}", @@ -505,7 +490,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert a row into the table with a unique payload const uniquePayload = { foo: "payload-test", bar: Date.now() }; const taskRun = await prisma.taskRun.create({ data: { @@ -524,10 +508,8 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for replication await setTimeout(1000); - // Query ClickHouse for the replicated payload const queryPayloads = clickhouse.reader.query({ name: "runs-replication-payload", query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}", @@ -607,7 +589,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert a row into the table with a unique payload const largePayload = { foo: Array.from({ length: 100 }, () => "foo").join(""), bar: Array.from({ length: 100 }, () => "bar").join(""), @@ -631,10 +612,8 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for replication await setTimeout(1000); - // Query ClickHouse for the replicated payload const queryPayloads = clickhouse.reader.query({ name: "runs-replication-payload", query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}", @@ -714,7 +693,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert a row into the table const uniqueFriendlyId = `run_update_${Date.now()}`; const taskRun = await prisma.taskRun.create({ data: { @@ -734,19 +712,15 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for initial replication await setTimeout(1000); - // Update the status field await prisma.taskRun.update({ where: { id: taskRun.id }, data: { status: TaskRunStatus.COMPLETED_SUCCESSFULLY }, }); - // Wait for replication await setTimeout(1000); - // Query ClickHouse for the replicated run const queryRuns = clickhouse.reader.query({ name: "runs-replication-update", query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", @@ -824,7 +798,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert a row into the table const uniqueFriendlyId = `run_delete_${Date.now()}`; const taskRun = await prisma.taskRun.create({ data: { @@ -844,18 +817,14 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for initial replication await setTimeout(1000); - // Delete the TaskRun await prisma.taskRun.delete({ where: { id: taskRun.id }, }); - // Wait for replication await setTimeout(1000); - // Query ClickHouse for the replicated run using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-delete", query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", @@ -928,13 +897,10 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert Run 1 const run1Id = `run_shutdown_handover_1_${Date.now()}`; - // Initiate shutdown when the first insert message is received runsReplicationServiceA.events.on("message", async ({ message, service }) => { if (message.tag === "insert") { - // Initiate shutdown await service.shutdown(); } }); @@ -957,7 +923,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert Run 2 after shutdown is initiated const run2Id = `run_shutdown_handover_2_${Date.now()}`; const taskRun2 = await prisma.taskRun.create({ data: { @@ -977,17 +942,13 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for flush to complete await setTimeout(1000); - // Query ClickHouse for both runs using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-shutdown-handover", query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL ORDER BY created_at ASC", schema: z.any(), }); - - // Make sure only the first run is in ClickHouse const [queryError, result] = await queryRuns({}); expect(queryError).toBeNull(); expect(result?.length).toBe(1); @@ -1011,7 +972,6 @@ describe("RunsReplicationService (part 1/2)", () => { await runsReplicationServiceB.start(); - // Wait for replication await setTimeout(1000); const [queryErrorB, resultB] = await queryRuns({}); @@ -1085,7 +1045,6 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Insert Run 1 const run1Id = `run_shutdown_after_processed_${Date.now()}`; const taskRun1 = await prisma.taskRun.create({ data: { @@ -1105,10 +1064,8 @@ describe("RunsReplicationService (part 1/2)", () => { }, }); - // Wait for replication to ensure transaction is processed await setTimeout(1000); - // Query ClickHouse for the run using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-shutdown-after-processed", query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", @@ -1121,12 +1078,10 @@ describe("RunsReplicationService (part 1/2)", () => { expect(resultA?.length).toBe(1); expect(resultA?.[0]).toEqual(expect.objectContaining({ run_id: taskRun1.id })); - // Shutdown after all transactions are processed await runsReplicationServiceA.shutdown(); - await setTimeout(500); // Give a moment for shutdown + await setTimeout(500); - // Insert another run const taskRun2 = await prisma.taskRun.create({ data: { friendlyId: `run_shutdown_after_processed_${Date.now()}`, @@ -1165,7 +1120,6 @@ describe("RunsReplicationService (part 1/2)", () => { await setTimeout(1000); - // Query ClickHouse for the second run const [queryErrorB, resultB] = await queryRuns({ run_id: taskRun2.id }); expect(queryErrorB).toBeNull(); expect(resultB?.length).toBe(1); @@ -1174,4 +1128,228 @@ describe("RunsReplicationService (part 1/2)", () => { await runsReplicationServiceB.stop(); } ); + + containerTest( + "should record metrics with correct values when replicating runs", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-metrics", + }); + + const { tracer } = createInMemoryTracing(); + const metricsHelper = createInMemoryMetrics(); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-metrics", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 2, + flushIntervalMs: 100, + flushBatchSize: 5, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + meter: metricsHelper.meter, + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test-metrics", + slug: "test-metrics", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test-metrics", + slug: "test-metrics", + organizationId: organization.id, + externalRef: "test-metrics", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-metrics", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test-metrics", + pkApiKey: "test-metrics", + shortcode: "test-metrics", + }, + }); + + const now = Date.now(); + const createdRuns: string[] = []; + + for (let i = 0; i < 5; i++) { + const run = await prisma.taskRun.create({ + data: { + friendlyId: `run_metrics_${now}_${i}`, + taskIdentifier: "my-task-metrics", + payload: JSON.stringify({ index: i }), + payloadType: "application/json", + traceId: `metrics-${now}-${i}`, + spanId: `metrics-${now}-${i}`, + queue: "test-metrics", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + }, + }); + createdRuns.push(run.id); + } + + await setTimeout(1000); + + for (let i = 0; i < 3; i++) { + await prisma.taskRun.update({ + where: { id: createdRuns[i] }, + data: { status: "EXECUTING" }, + }); + } + + await setTimeout(1000); + + for (let i = 0; i < 2; i++) { + await prisma.taskRun.update({ + where: { id: createdRuns[i] }, + data: { + status: "COMPLETED_SUCCESSFULLY", + completedAt: new Date(), + output: JSON.stringify({ result: "success" }), + outputType: "application/json", + }, + }); + } + + await setTimeout(1000); + + const metrics = await metricsHelper.getMetrics(); + + function getMetricData(name: string) { + for (const resourceMetrics of metrics) { + for (const scopeMetrics of resourceMetrics.scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + if (metric.descriptor.name === name) { + return metric; + } + } + } + } + return null; + } + + function sumCounterValues(metric: any): number { + if (!metric?.dataPoints) return 0; + return metric.dataPoints.reduce((sum: number, dp: any) => sum + (dp.value || 0), 0); + } + + function getCounterValueByAttributes( + metric: any, + attributes: Record + ): number { + if (!metric?.dataPoints) return 0; + for (const dp of metric.dataPoints) { + const dpAttrs = dp.attributes || {}; + const matches = Object.entries(attributes).every( + ([key, value]) => String(dpAttrs[key]) === String(value) + ); + if (matches) { + return dp.value || 0; + } + } + return 0; + } + + function getHistogramCount(metric: any): number { + if (!metric?.dataPoints) return 0; + return metric.dataPoints.reduce((sum: number, dp: any) => { + if (typeof dp.count === "number") return sum + dp.count; + if (typeof dp.value?.count === "number") return sum + dp.value.count; + if (typeof dp.zeroCount === "number") return sum + dp.zeroCount; + if (dp.buckets?.counts) { + return sum + dp.buckets.counts.reduce((s: number, c: number) => s + c, 0); + } + return sum; + }, 0); + } + + function histogramHasData(metric: any): boolean { + if (!metric?.dataPoints || metric.dataPoints.length === 0) return false; + return metric.dataPoints.some((dp: any) => { + return ( + (typeof dp.count === "number" && dp.count > 0) || + (typeof dp.value?.count === "number" && dp.value.count > 0) || + (Array.isArray(dp.buckets?.counts) && dp.buckets.counts.some((c: number) => c > 0)) || + (typeof dp.sum === "number" && dp.sum > 0) || + typeof dp.min === "number" || + typeof dp.max === "number" + ); + }); + } + + function getCounterAttributeValues(metric: any, attributeName: string): unknown[] { + if (!metric?.dataPoints) return []; + return metric.dataPoints + .filter((dp: any) => dp.attributes?.[attributeName] !== undefined) + .map((dp: any) => dp.attributes[attributeName]); + } + + const batchesFlushed = getMetricData("runs_replication.batches_flushed"); + expect(batchesFlushed).not.toBeNull(); + const totalBatchesFlushed = sumCounterValues(batchesFlushed); + expect(totalBatchesFlushed).toBeGreaterThanOrEqual(1); + + const successAttributeValues = getCounterAttributeValues(batchesFlushed, "success"); + expect(successAttributeValues.length).toBeGreaterThanOrEqual(1); + + const taskRunsInserted = getMetricData("runs_replication.task_runs_inserted"); + expect(taskRunsInserted).not.toBeNull(); + const totalTaskRunsInserted = sumCounterValues(taskRunsInserted); + expect(totalTaskRunsInserted).toBeGreaterThanOrEqual(5); + + const payloadsInserted = getMetricData("runs_replication.payloads_inserted"); + expect(payloadsInserted).not.toBeNull(); + const totalPayloadsInserted = sumCounterValues(payloadsInserted); + expect(totalPayloadsInserted).toBeGreaterThanOrEqual(1); + + const transactionsProcessed = getMetricData("runs_replication.transactions_processed"); + expect(transactionsProcessed).not.toBeNull(); + const totalTransactionsProcessed = sumCounterValues(transactionsProcessed); + expect(totalTransactionsProcessed).toBeGreaterThanOrEqual(1); + + const eventTypes = getCounterAttributeValues(transactionsProcessed, "event_type"); + expect(eventTypes.length).toBeGreaterThanOrEqual(1); + expect(eventTypes).toContain("insert"); + + const batchSize = getMetricData("runs_replication.batch_size"); + expect(batchSize).not.toBeNull(); + expect(histogramHasData(batchSize)).toBe(true); + + const replicationLag = getMetricData("runs_replication.replication_lag_ms"); + expect(replicationLag).not.toBeNull(); + expect(histogramHasData(replicationLag)).toBe(true); + + const flushDuration = getMetricData("runs_replication.flush_duration_ms"); + expect(flushDuration).not.toBeNull(); + expect(histogramHasData(flushDuration)).toBe(true); + + await runsReplicationService.stop(); + await metricsHelper.shutdown(); + } + ); }); diff --git a/apps/webapp/test/utils/tracing.ts b/apps/webapp/test/utils/tracing.ts index 33b821abe7..09500a6c35 100644 --- a/apps/webapp/test/utils/tracing.ts +++ b/apps/webapp/test/utils/tracing.ts @@ -1,6 +1,12 @@ import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; import { InMemorySpanExporter, SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base"; import { trace } from "@opentelemetry/api"; +import { + MeterProvider, + InMemoryMetricExporter, + PeriodicExportingMetricReader, + AggregationTemporality, +} from "@opentelemetry/sdk-metrics"; export function createInMemoryTracing() { // Initialize the tracer provider and exporter @@ -18,3 +24,42 @@ export function createInMemoryTracing() { tracer, }; } + +export function createInMemoryMetrics() { + // Initialize the metric exporter with cumulative temporality for easier testing + const metricExporter = new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE); + + // Create a metric reader that exports frequently for testing + const metricReader = new PeriodicExportingMetricReader({ + exporter: metricExporter, + exportIntervalMillis: 100, // Export frequently for tests + }); + + // Initialize the meter provider + const meterProvider = new MeterProvider({ + readers: [metricReader], + }); + + // Retrieve a meter + const meter = meterProvider.getMeter("test-meter"); + + return { + metricExporter, + metricReader, + meterProvider, + meter, + // Helper to force collection and get metrics + async getMetrics() { + await metricReader.forceFlush(); + return metricExporter.getMetrics(); + }, + // Helper to reset metrics between tests + reset() { + metricExporter.reset(); + }, + // Helper to shutdown the meter provider + async shutdown() { + await meterProvider.shutdown(); + }, + }; +} diff --git a/docker/config/grafana/provisioning/dashboards/runs-replication.json b/docker/config/grafana/provisioning/dashboards/runs-replication.json new file mode 100644 index 0000000000..235d1298e3 --- /dev/null +++ b/docker/config/grafana/provisioning/dashboards/runs-replication.json @@ -0,0 +1,593 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 }, + "id": 1, + "panels": [], + "title": "Throughput", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 1 }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto" + }, + "pluginVersion": "11.3.0", + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_task_runs_inserted_inserts_total[5m]))", + "legendFormat": "Runs/sec", + "refId": "A" + } + ], + "title": "Task Runs Inserted Rate", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 6, "y": 1 }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto" + }, + "pluginVersion": "11.3.0", + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_payloads_inserted_inserts_total[5m]))", + "legendFormat": "Payloads/sec", + "refId": "A" + } + ], + "title": "Payloads Inserted Rate", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 12, "y": 1 }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto" + }, + "pluginVersion": "11.3.0", + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_batches_flushed_total[5m]))", + "legendFormat": "Batches/sec", + "refId": "A" + } + ], + "title": "Batches Flushed Rate", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "red", "value": null }, + { "color": "yellow", "value": 0.9 }, + { "color": "green", "value": 0.99 } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 18, "y": 1 }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto" + }, + "pluginVersion": "11.3.0", + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_batches_flushed_total{success=\"true\"}[5m])) / sum(rate(triggerdotdev_runs_replication_batches_flushed_total[5m]))", + "legendFormat": "Success Rate", + "refId": "A" + } + ], + "title": "Flush Success Rate", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 7 }, + "id": 6, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_transactions_processed_total[5m])) by (event_type)", + "legendFormat": "{{event_type}}", + "refId": "A" + } + ], + "title": "Transactions Processed by Event Type", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 7 }, + "id": 7, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_task_runs_inserted_inserts_total[5m]))", + "legendFormat": "Task Runs", + "refId": "A" + }, + { + "expr": "sum(rate(triggerdotdev_runs_replication_payloads_inserted_inserts_total[5m]))", + "legendFormat": "Payloads", + "refId": "B" + } + ], + "title": "Insert Rate Over Time", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 15 }, + "id": 8, + "panels": [], + "title": "Latency & Lag", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 16 }, + "id": 9, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(triggerdotdev_runs_replication_replication_lag_ms_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p50", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(triggerdotdev_runs_replication_replication_lag_ms_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(triggerdotdev_runs_replication_replication_lag_ms_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p99", + "refId": "C" + } + ], + "title": "Replication Lag (Postgres → ClickHouse)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 16 }, + "id": 10, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(triggerdotdev_runs_replication_flush_duration_ms_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p50", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(triggerdotdev_runs_replication_flush_duration_ms_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(triggerdotdev_runs_replication_flush_duration_ms_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p99", + "refId": "C" + } + ], + "title": "Flush Duration", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 24 }, + "id": 11, + "panels": [], + "title": "Batching & Queue", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 8, "x": 0, "y": 25 }, + "id": 12, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "triggerdotdev_runs_replication_flush_queue_depth", + "legendFormat": "Queue Depth", + "refId": "A" + } + ], + "title": "Flush Queue Depth", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 8, "x": 8, "y": 25 }, + "id": 13, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(triggerdotdev_runs_replication_batch_size_items_bucket[5m])) by (le))", + "legendFormat": "p50", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(triggerdotdev_runs_replication_batch_size_items_bucket[5m])) by (le))", + "legendFormat": "p95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(triggerdotdev_runs_replication_batch_size_items_bucket[5m])) by (le))", + "legendFormat": "p99", + "refId": "C" + } + ], + "title": "Batch Size Distribution", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 1 }, + { "color": "red", "value": 10 } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 8, "x": 16, "y": 25 }, + "id": 14, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto" + }, + "pluginVersion": "11.3.0", + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_insert_retries_total[5m]))", + "legendFormat": "Retries/sec", + "refId": "A" + } + ], + "title": "Insert Retries (should be low)", + "type": "stat" + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 33 }, + "id": 15, + "panels": [], + "title": "Retries by Operation", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { "type": "linear" }, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 24, "x": 0, "y": 34 }, + "id": 16, + "options": { + "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "expr": "sum(rate(triggerdotdev_runs_replication_insert_retries_total[5m])) by (operation)", + "legendFormat": "{{operation}}", + "refId": "A" + } + ], + "title": "Insert Retries by Operation", + "type": "timeseries" + } + ], + "refresh": "10s", + "schemaVersion": 39, + "tags": ["trigger.dev", "runs-replication", "clickhouse"], + "templating": { "list": [] }, + "time": { "from": "now-15m", "to": "now" }, + "timepicker": {}, + "timezone": "browser", + "title": "Runs Replication Service", + "uid": "runs-replication-metrics", + "version": 1 +} From 2cf14343b3ecd9bbb037bab614c560e80e0b71df Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 8 Jan 2026 15:43:13 +0000 Subject: [PATCH 2/2] make it clear we are tracking events, not transactions --- .../services/runsReplicationService.server.ts | 10 ++-- .../test/runsReplicationService.part1.test.ts | 40 ++------------ .../dashboards/runs-replication.json | 55 ++----------------- 3 files changed, 14 insertions(+), 91 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 2841813bc5..b1e6c42fc2 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -121,7 +121,7 @@ export class RunsReplicationService { private _taskRunsInsertedCounter: Counter; private _payloadsInsertedCounter: Counter; private _insertRetriesCounter: Counter; - private _transactionsProcessedCounter: Counter; + private _eventsProcessedCounter: Counter; private _flushDurationHistogram: Histogram; public readonly events: EventEmitter; @@ -171,10 +171,10 @@ export class RunsReplicationService { description: "Insert retry attempts", }); - this._transactionsProcessedCounter = this._meter.createCounter( - "runs_replication.transactions_processed", + this._eventsProcessedCounter = this._meter.createCounter( + "runs_replication.events_processed", { - description: "Transactions received from replication", + description: "Replication events processed (inserts, updates, deletes)", } ); @@ -503,7 +503,7 @@ export class RunsReplicationService { // Count events by type for (const event of transaction.events) { - this._transactionsProcessedCounter.add(1, { event_type: event.tag }); + this._eventsProcessedCounter.add(1, { event_type: event.tag }); } this.logger.info("handle_transaction", { diff --git a/apps/webapp/test/runsReplicationService.part1.test.ts b/apps/webapp/test/runsReplicationService.part1.test.ts index 55d863d141..a8726d8221 100644 --- a/apps/webapp/test/runsReplicationService.part1.test.ts +++ b/apps/webapp/test/runsReplicationService.part1.test.ts @@ -1258,36 +1258,6 @@ describe("RunsReplicationService (part 1/2)", () => { return metric.dataPoints.reduce((sum: number, dp: any) => sum + (dp.value || 0), 0); } - function getCounterValueByAttributes( - metric: any, - attributes: Record - ): number { - if (!metric?.dataPoints) return 0; - for (const dp of metric.dataPoints) { - const dpAttrs = dp.attributes || {}; - const matches = Object.entries(attributes).every( - ([key, value]) => String(dpAttrs[key]) === String(value) - ); - if (matches) { - return dp.value || 0; - } - } - return 0; - } - - function getHistogramCount(metric: any): number { - if (!metric?.dataPoints) return 0; - return metric.dataPoints.reduce((sum: number, dp: any) => { - if (typeof dp.count === "number") return sum + dp.count; - if (typeof dp.value?.count === "number") return sum + dp.value.count; - if (typeof dp.zeroCount === "number") return sum + dp.zeroCount; - if (dp.buckets?.counts) { - return sum + dp.buckets.counts.reduce((s: number, c: number) => s + c, 0); - } - return sum; - }, 0); - } - function histogramHasData(metric: any): boolean { if (!metric?.dataPoints || metric.dataPoints.length === 0) return false; return metric.dataPoints.some((dp: any) => { @@ -1327,12 +1297,12 @@ describe("RunsReplicationService (part 1/2)", () => { const totalPayloadsInserted = sumCounterValues(payloadsInserted); expect(totalPayloadsInserted).toBeGreaterThanOrEqual(1); - const transactionsProcessed = getMetricData("runs_replication.transactions_processed"); - expect(transactionsProcessed).not.toBeNull(); - const totalTransactionsProcessed = sumCounterValues(transactionsProcessed); - expect(totalTransactionsProcessed).toBeGreaterThanOrEqual(1); + const eventsProcessed = getMetricData("runs_replication.events_processed"); + expect(eventsProcessed).not.toBeNull(); + const totalEventsProcessed = sumCounterValues(eventsProcessed); + expect(totalEventsProcessed).toBeGreaterThanOrEqual(1); - const eventTypes = getCounterAttributeValues(transactionsProcessed, "event_type"); + const eventTypes = getCounterAttributeValues(eventsProcessed, "event_type"); expect(eventTypes.length).toBeGreaterThanOrEqual(1); expect(eventTypes).toContain("insert"); diff --git a/docker/config/grafana/provisioning/dashboards/runs-replication.json b/docker/config/grafana/provisioning/dashboards/runs-replication.json index 235d1298e3..680f260daa 100644 --- a/docker/config/grafana/provisioning/dashboards/runs-replication.json +++ b/docker/config/grafana/provisioning/dashboards/runs-replication.json @@ -191,12 +191,12 @@ }, "targets": [ { - "expr": "sum(rate(triggerdotdev_runs_replication_transactions_processed_total[5m])) by (event_type)", + "expr": "sum(rate(triggerdotdev_runs_replication_events_processed_total[5m])) by (event_type)", "legendFormat": "{{event_type}}", "refId": "A" } ], - "title": "Transactions Processed by Event Type", + "title": "Events Processed by Type", "type": "timeseries" }, { @@ -412,54 +412,7 @@ }, "overrides": [] }, - "gridPos": { "h": 8, "w": 8, "x": 0, "y": 25 }, - "id": 12, - "options": { - "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, - "tooltip": { "mode": "multi", "sort": "desc" } - }, - "targets": [ - { - "expr": "triggerdotdev_runs_replication_flush_queue_depth", - "legendFormat": "Queue Depth", - "refId": "A" - } - ], - "title": "Flush Queue Depth", - "type": "timeseries" - }, - { - "datasource": { "type": "prometheus", "uid": "prometheus" }, - "fieldConfig": { - "defaults": { - "color": { "mode": "palette-classic" }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 10, - "gradientMode": "none", - "hideFrom": { "legend": false, "tooltip": false, "viz": false }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { "type": "linear" }, - "showPoints": "never", - "spanNulls": false, - "stacking": { "group": "A", "mode": "none" }, - "thresholdsStyle": { "mode": "off" } - }, - "mappings": [], - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, - "unit": "short" - }, - "overrides": [] - }, - "gridPos": { "h": 8, "w": 8, "x": 8, "y": 25 }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 25 }, "id": 13, "options": { "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -503,7 +456,7 @@ }, "overrides": [] }, - "gridPos": { "h": 8, "w": 8, "x": 16, "y": 25 }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 25 }, "id": 14, "options": { "colorMode": "value",