Skip to content

Conversation

@yashuatla
Copy link
Owner

This PR contains changes from a range of commits from the original repository.

Commit Range: 3f2b748..ea6f66b
Files Changed: 105 (66 programming files)
Programming Ratio: 62.9%

Commits included:

ericallam and others added 15 commits June 10, 2025 17:02
…commits (triggerdotdev#2163)

* v4: fix stuck batch issue when processing happens before transaction commits

* Fix flaky test
### PR: Optimize **TaskRun** indexes for hot-path queries

**What changed**

| Object                          | Type                                                            | Purpose                                                                                                    |
| ------------------------------- | --------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------- |
| `taskrun_runtime_id_desc_idx`   | **BTREE** `(runtimeEnvironmentId, id DESC) INCLUDE (createdAt)` | Eliminates explicit sort for the “latest task runs” query (`ORDER BY id DESC`) while remaining index-only. |
| `taskrun_runtime_createdat_idx` | **BTREE** `(runtimeEnvironmentId, createdAt DESC) INCLUDE (id)` | Accelerates the filter-only path that scans by `createdAt >= …` without any ordering requirement.          |
| `taskrun_createdat_brin`        | **BRIN** on `createdAt` (`pages_per_range = 128`)               | Lets the planner skip whole blocks older than the time window for both queries at < 100 MB cost.           |
| *(cleanup)*                     | **DROP** `TaskRun_runtimeEnvironmentId_createdAt_id_idx`        | Retires the 3-column index once the new ones are built.                                                    |

**Key details**

* All indexes created **CONCURRENTLY** to avoid write blocking.
* `fillfactor = 90` on b-trees for balanced space vs. future growth.
* Net disk usage drops **≈ 15–20 GB** while each query now gets a purpose-built access path.

**Why**

* Remove planner Sort nodes for the top-N “latest runs” view.
* Speed up environment-filtered range scans.
* Shrink index bloat and improve cache efficiency.
* Added docs

* Improved docs and added video

* Added upgrade badge
* Install the kapa sdk

* WIP using the SDK for the Kapa Ask AI widget

* Removes old kapa from root

* Now rendering everything inside the dialog component

* Fixes min-height of dialog content

* Remove kapa from root

* prevents kapa using reCaptcha

* Adds more functionailty with temporary UI placement for now

* Reset conversation button

* Adds a new sparkle list icon

* Adds some example questions as a blank state

* Animate in the example questions

* use “marked” package to render markdown

* Improve some animations

* Submit a question from the URL param

* adds custom scroll bar styling

* fixes modal to correct height after re-opening it

* Add button to stop generating answer mid-stream

* Adds buttons states to show submitting, generating, submittable

* Adds a helpfull sentence in the blank state

* Show a message if the chat returns an error

* Adds reset chat and feedback buttons to the bottom of an answer

* Makes sure you can give feedback in the different states of chat

* Adds a suble background to the dialog

* Fix a button inside button error

* Improve the shortcut esc key on dialog and sheet component

* Fix classname error

* organize imports

* Use our custom focus-visible

* Move the Tooltip for the button into the AskAI component

* Improved error message

* Organize imports

* Animated the modal gradient

* Small layout improvements

* Adds most asked questions from Kapa

* border glow tweak

* AskAI component is now a hook that can take a question

* remove kapa script

* Add a delay before the modal opens when usign the URL params

* Remove old component

* Update to the latest Kapa version

* Rephrased error message

* Use correct types for conversation

* Fixed types for addFeedback

* Adds DOMPurify package

* removed unused const

* Removed unnecessary platform specification

* Reset the timeout when the ai panel pops up

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Fix for coderabbit bad commit

* Clean up imports

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
* Fix realtime re-subscribing stale data issue

Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended

* removed logs
* Move all the logic into a single component, preparing for client only

* Use ClientOnly

* Fix for search param not working
* Runs filter by org id and add created at to ordering

* CopyableText can accept an alternative value for copying

* The runs table now shows the ID instead of run number

* Paginating back/forwards fix

* The task stats need org id and project id too
* Delayed runs weren’t shown in the Tasks page mini graph

* The Tasks page Queued count didn’t include Delayed runs (since ClickHouse switchover)
* shallow clone

* fix image tag locking example

* electric should use db url env var

* add htpasswd note

* use local driver with log rotation by default

* make worker urls more easily configurable

* configure dev otel endpoint via .env

* increase min recommended worker specs

* move worker url section
) {}

public async call(
organizationId: string,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Breaking API Change: Required Parameter Added.

Adding a required parameter 'organizationId' to a public method will break all existing callers that don't provide this parameter.

Current Code (Diff):

-     organizationId: string,
+     organizationId?: string,
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
organizationId: string,
organizationId?: string,

🔄 Dependencies Affected

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Function: NextRunListPresenter.call

Issue: Method implementation must handle potentially undefined organizationId

Suggestion: Update method implementation to handle the case when organizationId is undefined



export interface EnvironmentMetricsRepository {
getDailyTaskActivity(options: {
organizationId: string;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Breaking interface change requires implementation updates.

Adding required parameters to the interface methods will break existing implementations that don't include these parameters.

🔄 Dependencies Affected

webapp/app/services/environmentMetricsRepository.server.ts

Function: EnvironmentMetricsRepository.getDailyTaskActivity

Issue: Implementation missing new required parameters

Suggestion: Update implementation to include organizationId and projectId parameters

Current Code (Diff):

  public async getDailyTaskActivity({
    environmentId,
    days,
    tasks,
  }: {
    environmentId: string;
    days: number;
    tasks: string[];
  }): Promise<DailyTaskActivity> {

  // Update to:
  public async getDailyTaskActivity({
    organizationId,
    projectId,
    environmentId,
    days,
    tasks,
  }: {
    organizationId: string;
    projectId: string;
    environmentId: string;
    days: number;
    tasks: string[];
  }): Promise<DailyTaskActivity> {

webapp/app/services/environmentMetricsRepository.server.ts

Function: EnvironmentMetricsRepository.getCurrentRunningStats

Issue: Implementation missing new required parameters

Suggestion: Update implementation to include organizationId and projectId parameters

Current Code (Diff):

  public async getCurrentRunningStats({
    environmentId,
    days,
    tasks,
  }: {
    environmentId: string;
    days: number;
    tasks: string[];
  }): Promise<CurrentRunningStats> {

  // Update to:
  public async getCurrentRunningStats({
    organizationId,
    projectId,
    environmentId,
    days,
    tasks,
  }: {
    organizationId: string;
    projectId: string;
    environmentId: string;
    days: number;
    tasks: string[];
  }): Promise<CurrentRunningStats> {

webapp/app/services/environmentMetricsRepository.server.ts

Function: EnvironmentMetricsRepository.getAverageDurations

Issue: Implementation missing new required parameters

Suggestion: Update implementation to include organizationId and projectId parameters

Current Code (Diff):

  public async getAverageDurations({
    environmentId,
    days,
    tasks,
  }: {
    environmentId: string;
    days: number;
    tasks: string[];
  }): Promise<AverageDurations> {

  // Update to:
  public async getAverageDurations({
    organizationId,
    projectId,
    environmentId,
    days,
    tasks,
  }: {
    organizationId: string;
    projectId: string;
    environmentId: string;
    days: number;
    tasks: string[];
  }): Promise<AverageDurations> {

case "backward": {
const reversedRunIds = [...runIds].reverse();
if (hasMore) {
previousCursor = reversedRunIds.at(1) ?? null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Potential index out-of-bounds error.

Using reversedRunIds.at(1) could return null if there's only one item in the array, causing unexpected behavior in pagination

Current Code (Diff):

-           previousCursor = reversedRunIds.at(1) ?? null;
+           previousCursor = reversedRunIds.length > 1 ? reversedRunIds[1] : null;
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
previousCursor = reversedRunIds.at(1) ?? null;
previousCursor = reversedRunIds.length > 1 ? reversedRunIds[1] : null;

const reversedRunIds = [...runIds].reverse();
if (hasMore) {
previousCursor = reversedRunIds.at(1) ?? null;
nextCursor = reversedRunIds.at(options.page.size) ?? null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Potential index out-of-bounds error.

Using reversedRunIds.at(options.page.size) could access an index beyond array bounds if there are fewer items than page size

Current Code (Diff):

-           nextCursor = reversedRunIds.at(options.page.size) ?? null;
+           nextCursor = reversedRunIds.length > options.page.size ? reversedRunIds[options.page.size] : null;
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
nextCursor = reversedRunIds.at(options.page.size) ?? null;
nextCursor = reversedRunIds.length > options.page.size ? reversedRunIds[options.page.size] : null;

previousCursor = reversedRunIds.at(1) ?? null;
nextCursor = reversedRunIds.at(options.page.size) ?? null;
} else {
nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Potential index out-of-bounds error.

Using reversedRunIds.at(options.page.size - 1) could access an invalid index if the array is empty or smaller than page size

Current Code (Diff):

-           nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
+           nextCursor = reversedRunIds.length >= options.page.size ? reversedRunIds[options.page.size - 1] : (reversedRunIds.length > 0 ? reversedRunIds[reversedRunIds.length - 1] : null);
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
nextCursor = reversedRunIds.length >= options.page.size ? reversedRunIds[options.page.size - 1] : (reversedRunIds.length > 0 ? reversedRunIds[reversedRunIds.length - 1] : null);

password: env.SCHEDULE_WORKER_REDIS_PASSWORD,
keyPrefix: "schedule:",
enableAutoPipelining: true,
...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 Security Issue

Insecure TLS Configuration.

The TLS configuration defaults to an empty object when not explicitly disabled, which may not properly secure Redis connections.

Current Code (Diff):

-       ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
+       ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: { rejectUnauthorized: true } }),
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: { rejectUnauthorized: true } }),

concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
disabled: env.SCHEDULE_WORKER_ENABLED === "0",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Inverted Worker Enabled Logic.

The worker disabled property uses inverted logic which could cause unexpected worker behavior in production.

Current Code (Diff):

-       disabled: env.SCHEDULE_WORKER_ENABLED === "0",
+       disabled: env.SCHEDULE_WORKER_ENABLED !== "1",
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
disabled: env.SCHEDULE_WORKER_ENABLED === "0",
disabled: env.SCHEDULE_WORKER_ENABLED !== "1",

}
);

return { success: !!result };
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Potential Data Loss in Success Reporting.

Using !!result could mask partial failures by converting any truthy value to true, potentially causing data loss.

Current Code (Diff):

-         return { success: !!result };
+         return { success: result !== null && result !== undefined && result !== false };
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
return { success: !!result };
return { success: result !== null && result !== undefined && result !== false };

oneTimeUseToken: options.oneTimeUseToken,
},
});
const batch = await this._prisma.batchTaskRun.create({
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Removed transaction protection causing potential data inconsistency.

Removing the transaction wrapper breaks atomicity between batch creation and subsequent operations, which could lead to inconsistent database state if operations fail.

Current Code (Diff):

- const batch = await this._prisma.batchTaskRun.create({
+ return await $transaction(this._prisma, "create batch run", async (tx) => {
+   const batch = await tx.batchTaskRun.create({
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
const batch = await this._prisma.batchTaskRun.create({
return await $transaction(this._prisma, "create batch run", async (tx) => {
const batch = await tx.batchTaskRun.create({
data: {
friendlyId: batchId,
runtimeEnvironmentId: environment.id,
idempotencyKey: options.idempotencyKey,
idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt,
dependentTaskAttemptId: dependentAttempt?.id,
runCount: body.items.length,
runIds: runs.map((r) => r.id),
payload: payloadPacket.data,
payloadType: payloadPacket.dataType,
options,
batchVersion: "v3",
oneTimeUseToken: options.oneTimeUseToken,
},
});
switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
attemptCount: 0,
strategy: this._batchProcessingStrategy,
});
break;
}
case "parallel": {
const ranges = Array.from({
length: Math.ceil(newRunCount / PROCESSING_BATCH_SIZE),
}).map((_, index) => ({
start: index * PROCESSING_BATCH_SIZE,
count: PROCESSING_BATCH_SIZE,
}));
await tx.batchTaskRun.update({
where: { id: batch.id },
data: {
processingJobsExpectedCount: ranges.length,
},
});
await Promise.all(
ranges.map((range, index) =>
this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: `${index}`,
range,
attemptCount: 0,
strategy: this._batchProcessingStrategy,
})
)
);
break;
}
}
return batch;
});

🔄 Dependencies Affected

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Function: BatchTriggerV3Service.#enqueueBatchTaskRun

Issue: The transaction parameter was removed from calls to #enqueueBatchTaskRun, but the method doesn't use transactions in its implementation

Suggestion: Update the #enqueueBatchTaskRun method to not expect a transaction parameter since it's not using it


apps/webapp/app/v3/services/batchTriggerV3.server.ts

Function: BatchTriggerV3Service.#createAndProcessBatchTaskRun

Issue: The update to the batch record to set processingJobsExpectedCount is now outside the transaction

Suggestion: Ensure the update to set processingJobsExpectedCount uses the transaction object

Current Code (Diff):

- await this._prisma.batchTaskRun.update({
+ await tx.batchTaskRun.update({

Comment on lines +337 to +344
return await commonWorker.enqueue({
id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
job: "v3.resumeBatchRun",
payload: {
batchRunId,
},
{
tx,
runAt,
jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
}
);
availableAt: runAt,
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Transaction parameter (tx) accepted but unused.

The method accepts a tx parameter but no longer uses it in the new implementation, which could break transaction integrity in calling code.

Current Code (Diff):

+     return await commonWorker.enqueue({
+       id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
+       job: "v3.resumeBatchRun",
+       payload: {
        batchRunId,
      },
+       availableAt: runAt,
+       tx, // Add the transaction parameter
+     });
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
return await commonWorker.enqueue({
id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
job: "v3.resumeBatchRun",
payload: {
batchRunId,
},
{
tx,
runAt,
jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
}
);
availableAt: runAt,
});
return await commonWorker.enqueue({
id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
job: "v3.resumeBatchRun",
payload: {
batchRunId,
},
availableAt: runAt,
tx, // Add the transaction parameter
});

🔄 Dependencies Affected

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Function: Unknown function calling ResumeBatchRunService.enqueue

Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity

Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method


apps/webapp/app/v3/services/resumeDependentParents.server.ts

Function: Unknown function calling ResumeBatchRunService.enqueue

Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity

Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method


apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Function: Unknown function calling ResumeBatchRunService.enqueue

Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity

Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method


apps/webapp/app/v3/services/createCheckpoint.server.ts

Function: Unknown function calling ResumeBatchRunService.enqueue

Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity

Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method


const registerNextService = new RegisterNextTaskScheduleInstanceService(tx);

//create the instances (links to environments)
const scheduleRecord = await this._prisma.taskSchedule.create({
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Transaction Removed - Database Operations No Longer Atomic.

The transaction wrapper has been removed, which means database operations are no longer atomic and could lead to inconsistent data if any operation fails.

Current Code (Diff):

+ return await $transaction(
+   this._prisma,
+   "UpsertTaskSchedule.upsertNewSchedule",
+   async (tx) => {
+     const scheduleRecord = await tx.taskSchedule.create({
       data: {
         projectId,
         friendlyId: generateFriendlyId("sched"),
         taskIdentifier: options.taskIdentifier,
         deduplicationKey,
         userProvidedDeduplicationKey:
           options.deduplicationKey !== undefined && options.deduplicationKey !== "",
         generatorExpression: options.cron,
         generatorDescription: cronstrue.toString(options.cron),
         timezone: options.timezone ?? "UTC",
         externalId: options.externalId ? options.externalId : undefined,
       },
-     });
+     });
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
const scheduleRecord = await this._prisma.taskSchedule.create({
return await $transaction(
this._prisma,
"UpsertTaskSchedule.upsertNewSchedule",
async (tx) => {
const scheduleRecord = await tx.taskSchedule.create({
data: {
projectId,
friendlyId: generateFriendlyId("sched"),
taskIdentifier: options.taskIdentifier,
deduplicationKey,
userProvidedDeduplicationKey:
options.deduplicationKey !== undefined && options.deduplicationKey !== "",
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : undefined,
},
});

},
//create the instances (links to environments)
for (const environmentId of options.environments) {
const instance = await this._prisma.taskScheduleInstance.create({
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Non-Transactional Database Operations.

Database operations should use the transaction object to maintain atomicity with the schedule creation.

Current Code (Diff):

-      const instance = await this._prisma.taskScheduleInstance.create({
+      const instance = await tx.taskScheduleInstance.create({
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
const instance = await this._prisma.taskScheduleInstance.create({
const instance = await tx.taskScheduleInstance.create({


await registerNextService.call(instance.id);
}
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Missing Error Handling for Schedule Registration.

The call to scheduleEngine.registerNextTaskScheduleInstance lacks error handling, which could leave the system in an inconsistent state if registration fails.

Current Code (Diff):

-      await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+      try {
+        await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+      } catch (error) {
+        console.error(`Failed to register next task schedule instance: ${error.message}`);
+        throw new Error(`Failed to register schedule instance: ${error.message}`);
+      }
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
try {
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
} catch (error) {
console.error(`Failed to register next task schedule instance: ${error.message}`);
throw new Error(`Failed to register schedule instance: ${error.message}`);
}

return { scheduleRecord };
}
);
return { scheduleRecord };
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Transaction Closure Missing.

The transaction closure is missing, which should wrap up all database operations to ensure atomicity.

Current Code (Diff):

-    return { scheduleRecord };
+    return { scheduleRecord };
+      }
+    );
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
return { scheduleRecord };
return { scheduleRecord };
}
);

Comment on lines +155 to +231
const scheduleRecord = await this._prisma.taskSchedule.update({
where: {
id: existingSchedule.id,
},
data: {
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : null,
},
});

const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;
const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;

// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];
// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];

for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);

if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
const instance = await tx.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
const instance = await this._prisma.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
include: {
environment: {
include: {
environment: {
orgMember: {
include: {
orgMember: {
include: {
user: true,
},
},
user: true,
},
},
},
});

newInstances.push(instance);
}
}
},
},
});

// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);
newInstances.push(instance);
}
}

// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await tx.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}
// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);

const registerService = new RegisterNextTaskScheduleInstanceService(tx);
// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await this._prisma.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}

for (const instance of newInstances) {
await registerService.call(instance.id);
}
for (const instance of newInstances) {
// Register the new task schedule instances
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
}

if (scheduleHasChanged) {
for (const instance of updatingInstances) {
await registerService.call(instance.id);
}
}
if (scheduleHasChanged) {
for (const instance of updatingInstances) {
// Update the existing task schedule instances
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
}
}

return { scheduleRecord };
},
{ timeout: 10_000 }
);
return { scheduleRecord };
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Removed Database Transaction Protection.

The code now performs multiple database operations without transaction protection, which could lead to data inconsistency if any operation fails.

Current Code (Diff):

- const scheduleRecord = await this._prisma.taskSchedule.update({
-       where: {
-         id: existingSchedule.id,
-       },
-       data: {
-         generatorExpression: options.cron,
-         generatorDescription: cronstrue.toString(options.cron),
-         timezone: options.timezone ?? "UTC",
-         externalId: options.externalId ? options.externalId : null,
-       },
-     });
-
-     const scheduleHasChanged =
-       scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
-       scheduleRecord.timezone !== existingSchedule.timezone;
-
-     // create the new instances
-     const newInstances: InstanceWithEnvironment[] = [];
-     const updatingInstances: InstanceWithEnvironment[] = [];
-
-     for (const environmentId of options.environments) {
-       const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
-
-       if (existingInstance) {
-         // Update the existing instance
-         updatingInstances.push(existingInstance);
-       } else {
-         // Create a new instance
-         const instance = await this._prisma.taskScheduleInstance.create({
-           data: {
-             taskScheduleId: scheduleRecord.id,
-             environmentId,
-           },
-           include: {
-             environment: {
                include: {
-                 orgMember: {
                    include: {
-                     user: true,
                    },
                  },
                },
-             },
-           },
-         });
-
-         newInstances.push(instance);
-       }
-     }
-
-     // find the instances that need to be removed
-     const instancesToDeleted = existingInstances.filter(
-       (i) => !options.environments.includes(i.environmentId)
-     );
-
-     // delete the instances no longer selected
-     for (const instance of instancesToDeleted) {
-       await this._prisma.taskScheduleInstance.delete({
-         where: {
-           id: instance.id,
-         },
-       });
-     }
-
-     for (const instance of newInstances) {
-       // Register the new task schedule instances
-       await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
-     }
-
-     if (scheduleHasChanged) {
-       for (const instance of updatingInstances) {
-         // Update the existing task schedule instances
-         await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
-       }
-     }
-
-     return { scheduleRecord };
+ return await $transaction(
+       this._prisma,
+       async (tx) => {
+         const scheduleRecord = await tx.taskSchedule.update({
+           where: {
+             id: existingSchedule.id,
+           },
+           data: {
+             generatorExpression: options.cron,
+             generatorDescription: cronstrue.toString(options.cron),
+             timezone: options.timezone ?? "UTC",
+             externalId: options.externalId ? options.externalId : null,
+           },
+         });
+
+         const scheduleHasChanged =
+           scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
+           scheduleRecord.timezone !== existingSchedule.timezone;
+
+         // create the new instances
+         const newInstances: InstanceWithEnvironment[] = [];
+         const updatingInstances: InstanceWithEnvironment[] = [];
+
+         for (const environmentId of options.environments) {
+           const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
+
+           if (existingInstance) {
+             // Update the existing instance
+             updatingInstances.push(existingInstance);
+           } else {
+             // Create a new instance
+             const instance = await tx.taskScheduleInstance.create({
+               data: {
+                 taskScheduleId: scheduleRecord.id,
+                 environmentId,
+               },
+               include: {
+                 environment: {
+                   include: {
+                     orgMember: {
+                       include: {
+                         user: true,
+                       },
+                     },
+                   },
+                 },
+               },
+             });
+
+             newInstances.push(instance);
+           }
+         }
+
+         // find the instances that need to be removed
+         const instancesToDeleted = existingInstances.filter(
+           (i) => !options.environments.includes(i.environmentId)
+         );
+
+         // delete the instances no longer selected
+         for (const instance of instancesToDeleted) {
+           await tx.taskScheduleInstance.delete({
+             where: {
+               id: instance.id,
+             },
+           });
+         }
+
+         for (const instance of newInstances) {
+           // Register the new task schedule instances
+           await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+         }
+
+         if (scheduleHasChanged) {
+           for (const instance of updatingInstances) {
+             // Update the existing task schedule instances
+             await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+           }
+         }
+
+         return { scheduleRecord };
+       },
+       { timeout: 10_000 }
+     );
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
const scheduleRecord = await this._prisma.taskSchedule.update({
where: {
id: existingSchedule.id,
},
data: {
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : null,
},
});
const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;
const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;
// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];
// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];
for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
const instance = await tx.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
const instance = await this._prisma.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
include: {
environment: {
include: {
environment: {
orgMember: {
include: {
orgMember: {
include: {
user: true,
},
},
user: true,
},
},
},
});
newInstances.push(instance);
}
}
},
},
});
// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);
newInstances.push(instance);
}
}
// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await tx.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}
// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);
const registerService = new RegisterNextTaskScheduleInstanceService(tx);
// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await this._prisma.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}
for (const instance of newInstances) {
await registerService.call(instance.id);
}
for (const instance of newInstances) {
// Register the new task schedule instances
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
}
if (scheduleHasChanged) {
for (const instance of updatingInstances) {
await registerService.call(instance.id);
}
}
if (scheduleHasChanged) {
for (const instance of updatingInstances) {
// Update the existing task schedule instances
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
}
}
return { scheduleRecord };
},
{ timeout: 10_000 }
);
return { scheduleRecord };
return await $transaction(
this._prisma,
async (tx) => {
const scheduleRecord = await tx.taskSchedule.update({
where: {
id: existingSchedule.id,
},
data: {
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : null,
},
});
const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;
// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];
for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
const instance = await tx.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
include: {
environment: {
include: {
orgMember: {
include: {
user: true,
},
},
},
},
},
});
newInstances.push(instance);
}
}
// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);
// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await tx.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}
for (const instance of newInstances) {
// Register the new task schedule instances
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
}
if (scheduleHasChanged) {
for (const instance of updatingInstances) {
// Update the existing task schedule instances
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
}
}
return { scheduleRecord };
},
{ timeout: 10_000 }
);

Comment on lines +116 to +117
organizationId: z.string(),
projectId: z.string(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Breaking Change: New Required Fields Added to Schema.

Adding required organizationId and projectId fields to TaskActivityQueryParams will break existing code that uses this schema without these fields.

Current Code (Diff):

-   organizationId: z.string(),
-   projectId: z.string(),
+   organizationId: z.string().optional(),
+   projectId: z.string().optional(),
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
organizationId: z.string(),
projectId: z.string(),
organizationId: z.string().optional(),
projectId: z.string().optional(),

span.setAttribute("nested", true);
// We're already inside a lock with the same resources, just run the routine
return routine(currentContext.signal);
return routine();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Removed AbortSignal parameter breaks cancellation propagation.

Removing the AbortSignal parameter could prevent proper cancellation propagation in nested contexts, potentially causing operations to continue running when they should be cancelled.

Current Code (Diff):

-           return routine();
+           return routine(currentContext.signal);
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
return routine();
return routine(currentContext.signal);

[runId],
5000,
async (signal) => {
async () => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Correctness Issue

Removed critical timeout parameter from lock function.

Removing the 5000ms timeout parameter could cause indefinite hanging if the lock can't be acquired, potentially leading to deadlocks in production.

Current Code (Diff):

-             async () => {
+             5000,
+             async (signal) => {
📝 Committable suggestion

‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀

Suggested change
async () => {
5000,
async (signal) => {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants