Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .cursor/rules/otel-metrics.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ this.counter.add(1, {
});
```

## Prometheus Metric Naming

When metrics are exported via OTLP to Prometheus, the exporter automatically adds unit suffixes to metric names:

| OTel Metric Name | Unit | Prometheus Name |
|------------------|------|-----------------|
| `my_duration_ms` | `ms` | `my_duration_ms_milliseconds` |
| `my_counter` | counter | `my_counter_total` |
| `items_inserted` | counter | `items_inserted_inserts_total` |
| `batch_size` | histogram | `batch_size_items_bucket` |

Keep this in mind when writing Grafana dashboards or Prometheus queries—the metric names in Prometheus will differ from the names defined in code.

## Reference

See the schedule engine (`internal-packages/schedule-engine/src/engine/index.ts`) for a good example of low-cardinality metric attributes.
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ClickHouse } from "@internal/clickhouse";
import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { provider } from "~/v3/tracer.server";
import { meter, provider } from "~/v3/tracer.server";
import { RunsReplicationService } from "./runsReplicationService.server";
import { signalsEmitter } from "./signals.server";

Expand Down Expand Up @@ -62,6 +62,7 @@ function initializeRunsReplicationInstance() {
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1",
tracer: provider.getTracer("runs-replication-service"),
meter,
insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES,
insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS,
insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS,
Expand Down
120 changes: 105 additions & 15 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ import {
type MessageUpdate,
type PgoutputMessage,
} from "@internal/replication";
import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing";
import {
getMeter,
recordSpanError,
startSpan,
trace,
type Counter,
type Histogram,
type Meter,
type Tracer,
} from "@internal/tracing";
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
import { tryCatch } from "@trigger.dev/core/utils";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
Expand Down Expand Up @@ -51,6 +60,7 @@ export type RunsReplicationServiceOptions = {
logger?: Logger;
logLevel?: LogLevel;
tracer?: Tracer;
meter?: Meter;
waitForAsyncInsert?: boolean;
insertStrategy?: "insert" | "insert_async";
// Retry configuration for insert operations
Expand Down Expand Up @@ -90,6 +100,7 @@ export class RunsReplicationService {
private _isShuttingDown = false;
private _isShutDownComplete = false;
private _tracer: Tracer;
private _meter: Meter;
private _currentParseDurationMs: number | null = null;
private _lastAcknowledgedAt: number | null = null;
private _acknowledgeTimeoutMs: number;
Expand All @@ -103,13 +114,77 @@ export class RunsReplicationService {
private _insertStrategy: "insert" | "insert_async";
private _disablePayloadInsert: boolean;

// Metrics
private _replicationLagHistogram: Histogram;
private _batchesFlushedCounter: Counter;
private _batchSizeHistogram: Histogram;
private _taskRunsInsertedCounter: Counter;
private _payloadsInsertedCounter: Counter;
private _insertRetriesCounter: Counter;
private _eventsProcessedCounter: Counter;
private _flushDurationHistogram: Histogram;

public readonly events: EventEmitter<RunsReplicationServiceEvents>;

constructor(private readonly options: RunsReplicationServiceOptions) {
this.logger =
options.logger ?? new Logger("RunsReplicationService", options.logLevel ?? "info");
this.events = new EventEmitter();
this._tracer = options.tracer ?? trace.getTracer("runs-replication-service");
this._meter = options.meter ?? getMeter("runs-replication");

// Initialize metrics
this._replicationLagHistogram = this._meter.createHistogram(
"runs_replication.replication_lag_ms",
{
description: "Replication lag from Postgres commit to processing",
unit: "ms",
}
);

this._batchesFlushedCounter = this._meter.createCounter("runs_replication.batches_flushed", {
description: "Total batches flushed to ClickHouse",
});

this._batchSizeHistogram = this._meter.createHistogram("runs_replication.batch_size", {
description: "Number of items per batch flush",
unit: "items",
});

this._taskRunsInsertedCounter = this._meter.createCounter(
"runs_replication.task_runs_inserted",
{
description: "Task run inserts to ClickHouse",
unit: "inserts",
}
);

this._payloadsInsertedCounter = this._meter.createCounter(
"runs_replication.payloads_inserted",
{
description: "Payload inserts to ClickHouse",
unit: "inserts",
}
);

this._insertRetriesCounter = this._meter.createCounter("runs_replication.insert_retries", {
description: "Insert retry attempts",
});

this._eventsProcessedCounter = this._meter.createCounter(
"runs_replication.events_processed",
{
description: "Replication events processed (inserts, updates, deletes)",
}
);

this._flushDurationHistogram = this._meter.createHistogram(
"runs_replication.flush_duration_ms",
{
description: "Duration of batch flush operations",
unit: "ms",
}
);

this._acknowledgeTimeoutMs = options.acknowledgeTimeoutMs ?? 1_000;

Expand Down Expand Up @@ -423,20 +498,13 @@ export class RunsReplicationService {
}))
);

this._tracer
.startSpan("handle_transaction", {
attributes: {
"transaction.xid": transaction.xid,
"transaction.replication_lag_ms": transaction.replicationLagMs,
"transaction.events": transaction.events.length,
"transaction.commit_end_lsn": transaction.commitEndLsn,
"transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined,
"transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs,
"transaction.version": _version.toString(),
},
startTime: transaction.beginStartTimestamp,
})
.end();
// Record metrics
this._replicationLagHistogram.record(transaction.replicationLagMs);

// Count events by type
for (const event of transaction.events) {
this._eventsProcessedCounter.add(1, { event_type: event.tag });
}

this.logger.info("handle_transaction", {
transaction: {
Expand Down Expand Up @@ -501,6 +569,8 @@ export class RunsReplicationService {
batchSize: batch.length,
});

const flushStartTime = performance.now();

await startSpan(this._tracer, "flushBatch", async (span) => {
const preparedInserts = await startSpan(this._tracer, "prepare_inserts", async (span) => {
return await Promise.all(batch.map(this.#prepareRunInserts.bind(this)));
Expand Down Expand Up @@ -584,6 +654,22 @@ export class RunsReplicationService {
});

this.events.emit("batchFlushed", { flushId, taskRunInserts, payloadInserts });

// Record metrics
const flushDurationMs = performance.now() - flushStartTime;
const hasErrors = taskRunError !== null || payloadError !== null;

this._batchSizeHistogram.record(batch.length);
this._flushDurationHistogram.record(flushDurationMs);
this._batchesFlushedCounter.add(1, { success: !hasErrors });

if (!taskRunError) {
this._taskRunsInsertedCounter.add(taskRunInserts.length);
}

if (!payloadError) {
this._payloadsInsertedCounter.add(payloadInserts.length);
}
});
}

Expand Down Expand Up @@ -615,6 +701,10 @@ export class RunsReplicationService {
delay,
});

// Record retry metric
const operation = operationName.includes("task run") ? "task_runs" : "payloads";
this._insertRetriesCounter.add(1, { operation });

await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
Expand Down
Loading
Loading