Skip to content

Commit 5b2001a

Browse files
committed
Add deployed tasks run timeline metrics
1 parent ffcb239 commit 5b2001a

File tree

13 files changed

+199
-82
lines changed

13 files changed

+199
-82
lines changed

apps/coordinator/src/index.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,25 @@ class TaskCoordinator {
663663

664664
await chaosMonkey.call();
665665

666+
const lazyPayload = {
667+
...lazyAttempt.lazyPayload,
668+
metrics: [
669+
...(message.startTime
670+
? [
671+
{
672+
name: "start",
673+
event: "lazy_payload",
674+
timestamp: message.startTime,
675+
duration: Date.now() - message.startTime,
676+
},
677+
]
678+
: []),
679+
],
680+
};
681+
666682
socket.emit("EXECUTE_TASK_RUN_LAZY_ATTEMPT", {
667683
version: "v1",
668-
lazyPayload: lazyAttempt.lazyPayload,
684+
lazyPayload,
669685
});
670686
} catch (error) {
671687
if (error instanceof ChaosMonkey.Error) {

apps/docker-provider/src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,18 @@ class DockerTaskOperations implements TaskOperations {
122122
`--env=POD_NAME=${containerName}`,
123123
`--env=COORDINATOR_HOST=${COORDINATOR_HOST}`,
124124
`--env=COORDINATOR_PORT=${COORDINATOR_PORT}`,
125-
`--env=SCHEDULED_AT_MS=${Date.now()}`,
125+
`--env=TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`,
126126
`--name=${containerName}`,
127127
];
128128

129129
if (process.env.ENFORCE_MACHINE_PRESETS) {
130130
runArgs.push(`--cpus=${opts.machine.cpu}`, `--memory=${opts.machine.memory}G`);
131131
}
132132

133+
if (opts.dequeuedAt) {
134+
runArgs.push(`--env=TRIGGER_RUN_DEQUEUED_AT_MS=${opts.dequeuedAt}`);
135+
}
136+
133137
runArgs.push(`${opts.image}`);
134138

135139
try {

apps/kubernetes-provider/src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ class KubernetesTaskOperations implements TaskOperations {
202202
name: "TRIGGER_RUN_ID",
203203
value: opts.runId,
204204
},
205+
...(opts.dequeuedAt
206+
? [{ name: "TRIGGER_RUN_DEQUEUED_AT_MS", value: String(opts.dequeuedAt) }]
207+
: []),
205208
],
206209
volumeMounts: [
207210
{
@@ -518,7 +521,7 @@ class KubernetesTaskOperations implements TaskOperations {
518521
},
519522
},
520523
{
521-
name: "SCHEDULED_AT_MS",
524+
name: "TRIGGER_POD_SCHEDULED_AT_MS",
522525
value: Date.now().toString(),
523526
},
524527
...this.#coordinatorEnvVars,

apps/webapp/app/components/run/RunTimeline.tsx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,12 @@ function getFriendlyNameForEvent(event: string): string {
360360
case "import": {
361361
return "Imported task file";
362362
}
363+
case "lazy_payload": {
364+
return "Lazy attempt initialized";
365+
}
366+
case "pod_scheduled": {
367+
return "Pod scheduled";
368+
}
363369
default: {
364370
return event;
365371
}
@@ -380,6 +386,12 @@ function getAdminOnlyForEvent(event: string): boolean {
380386
case "import": {
381387
return true;
382388
}
389+
case "lazy_payload": {
390+
return true;
391+
}
392+
case "pod_scheduled": {
393+
return true;
394+
}
383395
default: {
384396
return true;
385397
}

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 66 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,8 @@ export class SharedQueueConsumer {
439439
};
440440
}
441441

442+
const dequeuedAt = new Date();
443+
442444
logger.log("dequeueMessageInSharedQueue()", { queueMessage: message });
443445

444446
const messageBody = SharedQueueMessageBody.safeParse(message.data);
@@ -472,7 +474,7 @@ export class SharedQueueConsumer {
472474
this._currentMessage = message;
473475
this._currentMessageData = messageBody.data;
474476

475-
const messageResult = await this.#handleMessage(message, messageBody.data);
477+
const messageResult = await this.#handleMessage(message, messageBody.data, dequeuedAt);
476478

477479
switch (messageResult.action) {
478480
case "noop": {
@@ -525,37 +527,39 @@ export class SharedQueueConsumer {
525527

526528
async #handleMessage(
527529
message: MessagePayload,
528-
data: SharedQueueMessageBody
530+
data: SharedQueueMessageBody,
531+
dequeuedAt: Date
529532
): Promise<HandleMessageResult> {
530533
return await this.#startActiveSpan("handleMessage()", async (span) => {
531534
// TODO: For every ACK, decide what should be done with the existing run and attempts. Make sure to check the current statuses first.
532535
switch (data.type) {
533536
// MARK: EXECUTE
534537
case "EXECUTE": {
535-
return await this.#handleExecuteMessage(message, data);
538+
return await this.#handleExecuteMessage(message, data, dequeuedAt);
536539
}
537540
// MARK: DEP RESUME
538541
// Resume after dependency completed with no remaining retries
539542
case "RESUME": {
540-
return await this.#handleResumeMessage(message, data);
543+
return await this.#handleResumeMessage(message, data, dequeuedAt);
541544
}
542545
// MARK: DURATION RESUME
543546
// Resume after duration-based wait
544547
case "RESUME_AFTER_DURATION": {
545-
return await this.#handleResumeAfterDurationMessage(message, data);
548+
return await this.#handleResumeAfterDurationMessage(message, data, dequeuedAt);
546549
}
547550
// MARK: FAIL
548551
// Fail for whatever reason, usually runs that have been resumed but stopped heartbeating
549552
case "FAIL": {
550-
return await this.#handleFailMessage(message, data);
553+
return await this.#handleFailMessage(message, data, dequeuedAt);
551554
}
552555
}
553556
});
554557
}
555558

556559
async #handleExecuteMessage(
557560
message: MessagePayload,
558-
data: SharedQueueExecuteMessageBody
561+
data: SharedQueueExecuteMessageBody,
562+
dequeuedAt: Date
559563
): Promise<HandleMessageResult> {
560564
const existingTaskRun = await prisma.taskRun.findFirst({
561565
where: {
@@ -711,7 +715,7 @@ export class SharedQueueConsumer {
711715
taskVersion: worker.version,
712716
sdkVersion: worker.sdkVersion,
713717
cliVersion: worker.cliVersion,
714-
startedAt: existingTaskRun.startedAt ?? new Date(),
718+
startedAt: existingTaskRun.startedAt ?? dequeuedAt,
715719
baseCostInCents: env.CENTS_PER_RUN,
716720
machinePreset:
717721
existingTaskRun.machinePreset ??
@@ -884,55 +888,56 @@ export class SharedQueueConsumer {
884888
action: "noop",
885889
reason: "retry_checkpoints_disabled",
886890
};
887-
} else {
888-
const machine =
889-
machinePresetFromRun(lockedTaskRun) ??
890-
machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {});
891-
892-
return await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => {
893-
span.setAttributes({
894-
run_id: lockedTaskRun.id,
895-
});
891+
}
896892

897-
if (await this._providerSender.validateCanSendMessage()) {
898-
await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", {
899-
backgroundWorkerId: worker.friendlyId,
900-
data: {
901-
type: "SCHEDULE_ATTEMPT",
902-
image: imageReference,
903-
version: deployment.version,
904-
machine,
905-
nextAttemptNumber,
906-
// identifiers
907-
id: "placeholder", // TODO: Remove this completely in a future release
908-
envId: lockedTaskRun.runtimeEnvironment.id,
909-
envType: lockedTaskRun.runtimeEnvironment.type,
910-
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
911-
projectId: lockedTaskRun.runtimeEnvironment.projectId,
912-
runId: lockedTaskRun.id,
913-
},
914-
});
893+
const machine =
894+
machinePresetFromRun(lockedTaskRun) ??
895+
machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {});
915896

916-
return {
917-
action: "noop",
918-
reason: "scheduled_attempt",
919-
attrs: {
920-
next_attempt_number: nextAttemptNumber,
921-
},
922-
};
923-
} else {
924-
return {
925-
action: "nack_and_do_more_work",
926-
reason: "provider_not_connected",
927-
attrs: {
928-
run_id: lockedTaskRun.id,
929-
},
930-
interval: this._options.nextTickInterval,
931-
retryInMs: 5_000,
932-
};
933-
}
897+
return await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => {
898+
span.setAttributes({
899+
run_id: lockedTaskRun.id,
934900
});
935-
}
901+
902+
if (await this._providerSender.validateCanSendMessage()) {
903+
await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", {
904+
backgroundWorkerId: worker.friendlyId,
905+
data: {
906+
type: "SCHEDULE_ATTEMPT",
907+
image: imageReference,
908+
version: deployment.version,
909+
machine,
910+
nextAttemptNumber,
911+
// identifiers
912+
id: "placeholder", // TODO: Remove this completely in a future release
913+
envId: lockedTaskRun.runtimeEnvironment.id,
914+
envType: lockedTaskRun.runtimeEnvironment.type,
915+
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
916+
projectId: lockedTaskRun.runtimeEnvironment.projectId,
917+
runId: lockedTaskRun.id,
918+
dequeuedAt: dequeuedAt.getTime(),
919+
},
920+
});
921+
922+
return {
923+
action: "noop",
924+
reason: "scheduled_attempt",
925+
attrs: {
926+
next_attempt_number: nextAttemptNumber,
927+
},
928+
};
929+
} else {
930+
return {
931+
action: "nack_and_do_more_work",
932+
reason: "provider_not_connected",
933+
attrs: {
934+
run_id: lockedTaskRun.id,
935+
},
936+
interval: this._options.nextTickInterval,
937+
retryInMs: 5_000,
938+
};
939+
}
940+
});
936941
} catch (e) {
937942
// We now need to unlock the task run and delete the task run attempt
938943
await prisma.$transaction([
@@ -966,7 +971,8 @@ export class SharedQueueConsumer {
966971

967972
async #handleResumeMessage(
968973
message: MessagePayload,
969-
data: SharedQueueResumeMessageBody
974+
data: SharedQueueResumeMessageBody,
975+
dequeuedAt: Date
970976
): Promise<HandleMessageResult> {
971977
if (data.checkpointEventId) {
972978
try {
@@ -1332,7 +1338,8 @@ export class SharedQueueConsumer {
13321338

13331339
async #handleResumeAfterDurationMessage(
13341340
message: MessagePayload,
1335-
data: SharedQueueResumeAfterDurationMessageBody
1341+
data: SharedQueueResumeAfterDurationMessageBody,
1342+
dequeuedAt: Date
13361343
): Promise<HandleMessageResult> {
13371344
try {
13381345
const restoreService = new RestoreCheckpointService();
@@ -1374,7 +1381,8 @@ export class SharedQueueConsumer {
13741381

13751382
async #handleFailMessage(
13761383
message: MessagePayload,
1377-
data: SharedQueueFailMessageBody
1384+
data: SharedQueueFailMessageBody,
1385+
dequeuedAt: Date
13781386
): Promise<HandleMessageResult> {
13791387
const existingTaskRun = await prisma.taskRun.findFirst({
13801388
where: {
@@ -2057,6 +2065,7 @@ class SharedQueueTasks {
20572065
messageId: run.id,
20582066
isTest: run.isTest,
20592067
attemptCount,
2068+
metrics: [],
20602069
} satisfies TaskRunExecutionLazyAttemptPayload;
20612070
}
20622071

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskRun"
4+
ADD
5+
COLUMN "executedAt" TIMESTAMP(3);

internal-packages/database/prisma/schema.prisma

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1703,7 +1703,10 @@ model TaskRun {
17031703
17041704
checkpoints Checkpoint[]
17051705
1706+
/// startedAt marks the point at which a run is dequeued from MarQS
17061707
startedAt DateTime?
1708+
/// executedAt is set when the first attempt is about to execute
1709+
executedAt DateTime?
17071710
completedAt DateTime?
17081711
machinePreset String?
17091712
@@ -1915,8 +1918,8 @@ model TaskRunDependency {
19151918
dependentBatchRun BatchTaskRun? @relation("dependentBatchRun", fields: [dependentBatchRunId], references: [id])
19161919
dependentBatchRunId String?
19171920
1918-
createdAt DateTime @default(now())
1919-
updatedAt DateTime @updatedAt
1921+
createdAt DateTime @default(now())
1922+
updatedAt DateTime @updatedAt
19201923
resumedAt DateTime?
19211924
19221925
@@index([dependentAttemptId])

0 commit comments

Comments
 (0)