-
Notifications
You must be signed in to change notification settings - Fork 0
Feature implementation from commits 3f2b748..ea6f66b #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature-base-1
Are you sure you want to change the base?
Conversation
…commits (triggerdotdev#2163) * v4: fix stuck batch issue when processing happens before transaction commits * Fix flaky test
### PR: Optimize **TaskRun** indexes for hot-path queries **What changed** | Object | Type | Purpose | | ------------------------------- | --------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------- | | `taskrun_runtime_id_desc_idx` | **BTREE** `(runtimeEnvironmentId, id DESC) INCLUDE (createdAt)` | Eliminates explicit sort for the “latest task runs” query (`ORDER BY id DESC`) while remaining index-only. | | `taskrun_runtime_createdat_idx` | **BTREE** `(runtimeEnvironmentId, createdAt DESC) INCLUDE (id)` | Accelerates the filter-only path that scans by `createdAt >= …` without any ordering requirement. | | `taskrun_createdat_brin` | **BRIN** on `createdAt` (`pages_per_range = 128`) | Lets the planner skip whole blocks older than the time window for both queries at < 100 MB cost. | | *(cleanup)* | **DROP** `TaskRun_runtimeEnvironmentId_createdAt_id_idx` | Retires the 3-column index once the new ones are built. | **Key details** * All indexes created **CONCURRENTLY** to avoid write blocking. * `fillfactor = 90` on b-trees for balanced space vs. future growth. * Net disk usage drops **≈ 15–20 GB** while each query now gets a purpose-built access path. **Why** * Remove planner Sort nodes for the top-N “latest runs” view. * Speed up environment-filtered range scans. * Shrink index bloat and improve cache efficiency.
* Added docs * Improved docs and added video * Added upgrade badge
* Install the kapa sdk * WIP using the SDK for the Kapa Ask AI widget * Removes old kapa from root * Now rendering everything inside the dialog component * Fixes min-height of dialog content * Remove kapa from root * prevents kapa using reCaptcha * Adds more functionailty with temporary UI placement for now * Reset conversation button * Adds a new sparkle list icon * Adds some example questions as a blank state * Animate in the example questions * use “marked” package to render markdown * Improve some animations * Submit a question from the URL param * adds custom scroll bar styling * fixes modal to correct height after re-opening it * Add button to stop generating answer mid-stream * Adds buttons states to show submitting, generating, submittable * Adds a helpfull sentence in the blank state * Show a message if the chat returns an error * Adds reset chat and feedback buttons to the bottom of an answer * Makes sure you can give feedback in the different states of chat * Adds a suble background to the dialog * Fix a button inside button error * Improve the shortcut esc key on dialog and sheet component * Fix classname error * organize imports * Use our custom focus-visible * Move the Tooltip for the button into the AskAI component * Improved error message * Organize imports * Animated the modal gradient * Small layout improvements * Adds most asked questions from Kapa * border glow tweak * AskAI component is now a hook that can take a question * remove kapa script * Add a delay before the modal opens when usign the URL params * Remove old component * Update to the latest Kapa version * Rephrased error message * Use correct types for conversation * Fixed types for addFeedback * Adds DOMPurify package * removed unused const * Removed unnecessary platform specification * Reset the timeout when the ai panel pops up Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Fix for coderabbit bad commit * Clean up imports --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
* Fix realtime re-subscribing stale data issue Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended * removed logs
* Move all the logic into a single component, preparing for client only * Use ClientOnly * Fix for search param not working
* Runs filter by org id and add created at to ordering * CopyableText can accept an alternative value for copying * The runs table now shows the ID instead of run number * Paginating back/forwards fix * The task stats need org id and project id too
* Delayed runs weren’t shown in the Tasks page mini graph * The Tasks page Queued count didn’t include Delayed runs (since ClickHouse switchover)
* shallow clone * fix image tag locking example * electric should use db url env var * add htpasswd note * use local driver with log rotation by default * make worker urls more easily configurable * configure dev otel endpoint via .env * increase min recommended worker specs * move worker url section
| ) {} | ||
|
|
||
| public async call( | ||
| organizationId: string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Breaking API Change: Required Parameter Added.
Adding a required parameter 'organizationId' to a public method will break all existing callers that don't provide this parameter.
Current Code (Diff):
- organizationId: string,
+ organizationId?: string,📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| organizationId: string, | |
| organizationId?: string, |
🔄 Dependencies Affected
apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Function: NextRunListPresenter.call
Issue: Method implementation must handle potentially undefined organizationId
Suggestion: Update method implementation to handle the case when organizationId is undefined
|
|
||
| export interface EnvironmentMetricsRepository { | ||
| getDailyTaskActivity(options: { | ||
| organizationId: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Breaking interface change requires implementation updates.
Adding required parameters to the interface methods will break existing implementations that don't include these parameters.
🔄 Dependencies Affected
webapp/app/services/environmentMetricsRepository.server.ts
Function: EnvironmentMetricsRepository.getDailyTaskActivity
Issue: Implementation missing new required parameters
Suggestion: Update implementation to include organizationId and projectId parameters
Current Code (Diff):
public async getDailyTaskActivity({
environmentId,
days,
tasks,
}: {
environmentId: string;
days: number;
tasks: string[];
}): Promise<DailyTaskActivity> {
// Update to:
public async getDailyTaskActivity({
organizationId,
projectId,
environmentId,
days,
tasks,
}: {
organizationId: string;
projectId: string;
environmentId: string;
days: number;
tasks: string[];
}): Promise<DailyTaskActivity> {webapp/app/services/environmentMetricsRepository.server.ts
Function: EnvironmentMetricsRepository.getCurrentRunningStats
Issue: Implementation missing new required parameters
Suggestion: Update implementation to include organizationId and projectId parameters
Current Code (Diff):
public async getCurrentRunningStats({
environmentId,
days,
tasks,
}: {
environmentId: string;
days: number;
tasks: string[];
}): Promise<CurrentRunningStats> {
// Update to:
public async getCurrentRunningStats({
organizationId,
projectId,
environmentId,
days,
tasks,
}: {
organizationId: string;
projectId: string;
environmentId: string;
days: number;
tasks: string[];
}): Promise<CurrentRunningStats> {webapp/app/services/environmentMetricsRepository.server.ts
Function: EnvironmentMetricsRepository.getAverageDurations
Issue: Implementation missing new required parameters
Suggestion: Update implementation to include organizationId and projectId parameters
Current Code (Diff):
public async getAverageDurations({
environmentId,
days,
tasks,
}: {
environmentId: string;
days: number;
tasks: string[];
}): Promise<AverageDurations> {
// Update to:
public async getAverageDurations({
organizationId,
projectId,
environmentId,
days,
tasks,
}: {
organizationId: string;
projectId: string;
environmentId: string;
days: number;
tasks: string[];
}): Promise<AverageDurations> {| case "backward": { | ||
| const reversedRunIds = [...runIds].reverse(); | ||
| if (hasMore) { | ||
| previousCursor = reversedRunIds.at(1) ?? null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Potential index out-of-bounds error.
Using reversedRunIds.at(1) could return null if there's only one item in the array, causing unexpected behavior in pagination
Current Code (Diff):
- previousCursor = reversedRunIds.at(1) ?? null;
+ previousCursor = reversedRunIds.length > 1 ? reversedRunIds[1] : null;📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| previousCursor = reversedRunIds.at(1) ?? null; | |
| previousCursor = reversedRunIds.length > 1 ? reversedRunIds[1] : null; |
| const reversedRunIds = [...runIds].reverse(); | ||
| if (hasMore) { | ||
| previousCursor = reversedRunIds.at(1) ?? null; | ||
| nextCursor = reversedRunIds.at(options.page.size) ?? null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Potential index out-of-bounds error.
Using reversedRunIds.at(options.page.size) could access an index beyond array bounds if there are fewer items than page size
Current Code (Diff):
- nextCursor = reversedRunIds.at(options.page.size) ?? null;
+ nextCursor = reversedRunIds.length > options.page.size ? reversedRunIds[options.page.size] : null;📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| nextCursor = reversedRunIds.at(options.page.size) ?? null; | |
| nextCursor = reversedRunIds.length > options.page.size ? reversedRunIds[options.page.size] : null; |
| previousCursor = reversedRunIds.at(1) ?? null; | ||
| nextCursor = reversedRunIds.at(options.page.size) ?? null; | ||
| } else { | ||
| nextCursor = reversedRunIds.at(options.page.size - 1) ?? null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Potential index out-of-bounds error.
Using reversedRunIds.at(options.page.size - 1) could access an invalid index if the array is empty or smaller than page size
Current Code (Diff):
- nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
+ nextCursor = reversedRunIds.length >= options.page.size ? reversedRunIds[options.page.size - 1] : (reversedRunIds.length > 0 ? reversedRunIds[reversedRunIds.length - 1] : null);📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| nextCursor = reversedRunIds.at(options.page.size - 1) ?? null; | |
| nextCursor = reversedRunIds.length >= options.page.size ? reversedRunIds[options.page.size - 1] : (reversedRunIds.length > 0 ? reversedRunIds[reversedRunIds.length - 1] : null); |
| password: env.SCHEDULE_WORKER_REDIS_PASSWORD, | ||
| keyPrefix: "schedule:", | ||
| enableAutoPipelining: true, | ||
| ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Issue
Insecure TLS Configuration.
The TLS configuration defaults to an empty object when not explicitly disabled, which may not properly secure Redis connections.
Current Code (Diff):
- ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
+ ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: { rejectUnauthorized: true } }),📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), | |
| ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: { rejectUnauthorized: true } }), |
| concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT, | ||
| pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL, | ||
| shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS, | ||
| disabled: env.SCHEDULE_WORKER_ENABLED === "0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Inverted Worker Enabled Logic.
The worker disabled property uses inverted logic which could cause unexpected worker behavior in production.
Current Code (Diff):
- disabled: env.SCHEDULE_WORKER_ENABLED === "0",
+ disabled: env.SCHEDULE_WORKER_ENABLED !== "1",📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| disabled: env.SCHEDULE_WORKER_ENABLED === "0", | |
| disabled: env.SCHEDULE_WORKER_ENABLED !== "1", |
| } | ||
| ); | ||
|
|
||
| return { success: !!result }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Potential Data Loss in Success Reporting.
Using !!result could mask partial failures by converting any truthy value to true, potentially causing data loss.
Current Code (Diff):
- return { success: !!result };
+ return { success: result !== null && result !== undefined && result !== false };📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| return { success: !!result }; | |
| return { success: result !== null && result !== undefined && result !== false }; |
| oneTimeUseToken: options.oneTimeUseToken, | ||
| }, | ||
| }); | ||
| const batch = await this._prisma.batchTaskRun.create({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Removed transaction protection causing potential data inconsistency.
Removing the transaction wrapper breaks atomicity between batch creation and subsequent operations, which could lead to inconsistent database state if operations fail.
Current Code (Diff):
- const batch = await this._prisma.batchTaskRun.create({
+ return await $transaction(this._prisma, "create batch run", async (tx) => {
+ const batch = await tx.batchTaskRun.create({📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| const batch = await this._prisma.batchTaskRun.create({ | |
| return await $transaction(this._prisma, "create batch run", async (tx) => { | |
| const batch = await tx.batchTaskRun.create({ | |
| data: { | |
| friendlyId: batchId, | |
| runtimeEnvironmentId: environment.id, | |
| idempotencyKey: options.idempotencyKey, | |
| idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, | |
| dependentTaskAttemptId: dependentAttempt?.id, | |
| runCount: body.items.length, | |
| runIds: runs.map((r) => r.id), | |
| payload: payloadPacket.data, | |
| payloadType: payloadPacket.dataType, | |
| options, | |
| batchVersion: "v3", | |
| oneTimeUseToken: options.oneTimeUseToken, | |
| }, | |
| }); | |
| switch (this._batchProcessingStrategy) { | |
| case "sequential": { | |
| await this.#enqueueBatchTaskRun({ | |
| batchId: batch.id, | |
| processingId: batchId, | |
| range: { start: 0, count: PROCESSING_BATCH_SIZE }, | |
| attemptCount: 0, | |
| strategy: this._batchProcessingStrategy, | |
| }); | |
| break; | |
| } | |
| case "parallel": { | |
| const ranges = Array.from({ | |
| length: Math.ceil(newRunCount / PROCESSING_BATCH_SIZE), | |
| }).map((_, index) => ({ | |
| start: index * PROCESSING_BATCH_SIZE, | |
| count: PROCESSING_BATCH_SIZE, | |
| })); | |
| await tx.batchTaskRun.update({ | |
| where: { id: batch.id }, | |
| data: { | |
| processingJobsExpectedCount: ranges.length, | |
| }, | |
| }); | |
| await Promise.all( | |
| ranges.map((range, index) => | |
| this.#enqueueBatchTaskRun({ | |
| batchId: batch.id, | |
| processingId: `${index}`, | |
| range, | |
| attemptCount: 0, | |
| strategy: this._batchProcessingStrategy, | |
| }) | |
| ) | |
| ); | |
| break; | |
| } | |
| } | |
| return batch; | |
| }); |
🔄 Dependencies Affected
apps/webapp/app/v3/services/batchTriggerV3.server.ts
Function: BatchTriggerV3Service.#enqueueBatchTaskRun
Issue: The transaction parameter was removed from calls to #enqueueBatchTaskRun, but the method doesn't use transactions in its implementation
Suggestion: Update the #enqueueBatchTaskRun method to not expect a transaction parameter since it's not using it
apps/webapp/app/v3/services/batchTriggerV3.server.ts
Function: BatchTriggerV3Service.#createAndProcessBatchTaskRun
Issue: The update to the batch record to set processingJobsExpectedCount is now outside the transaction
Suggestion: Ensure the update to set processingJobsExpectedCount uses the transaction object
Current Code (Diff):
- await this._prisma.batchTaskRun.update({
+ await tx.batchTaskRun.update({| return await commonWorker.enqueue({ | ||
| id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, | ||
| job: "v3.resumeBatchRun", | ||
| payload: { | ||
| batchRunId, | ||
| }, | ||
| { | ||
| tx, | ||
| runAt, | ||
| jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, | ||
| } | ||
| ); | ||
| availableAt: runAt, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Transaction parameter (tx) accepted but unused.
The method accepts a tx parameter but no longer uses it in the new implementation, which could break transaction integrity in calling code.
Current Code (Diff):
+ return await commonWorker.enqueue({
+ id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
+ job: "v3.resumeBatchRun",
+ payload: {
batchRunId,
},
+ availableAt: runAt,
+ tx, // Add the transaction parameter
+ });📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| return await commonWorker.enqueue({ | |
| id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, | |
| job: "v3.resumeBatchRun", | |
| payload: { | |
| batchRunId, | |
| }, | |
| { | |
| tx, | |
| runAt, | |
| jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, | |
| } | |
| ); | |
| availableAt: runAt, | |
| }); | |
| return await commonWorker.enqueue({ | |
| id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, | |
| job: "v3.resumeBatchRun", | |
| payload: { | |
| batchRunId, | |
| }, | |
| availableAt: runAt, | |
| tx, // Add the transaction parameter | |
| }); |
🔄 Dependencies Affected
apps/webapp/app/v3/services/batchTriggerV3.server.ts
Function: Unknown function calling ResumeBatchRunService.enqueue
Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity
Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method
apps/webapp/app/v3/services/resumeDependentParents.server.ts
Function: Unknown function calling ResumeBatchRunService.enqueue
Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity
Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method
apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Function: Unknown function calling ResumeBatchRunService.enqueue
Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity
Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method
apps/webapp/app/v3/services/createCheckpoint.server.ts
Function: Unknown function calling ResumeBatchRunService.enqueue
Issue: Caller passes a transaction that's no longer used, potentially breaking transaction integrity
Suggestion: Ensure the transaction is properly used in the ResumeBatchRunService.enqueue method
| const registerNextService = new RegisterNextTaskScheduleInstanceService(tx); | ||
|
|
||
| //create the instances (links to environments) | ||
| const scheduleRecord = await this._prisma.taskSchedule.create({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Transaction Removed - Database Operations No Longer Atomic.
The transaction wrapper has been removed, which means database operations are no longer atomic and could lead to inconsistent data if any operation fails.
Current Code (Diff):
+ return await $transaction(
+ this._prisma,
+ "UpsertTaskSchedule.upsertNewSchedule",
+ async (tx) => {
+ const scheduleRecord = await tx.taskSchedule.create({
data: {
projectId,
friendlyId: generateFriendlyId("sched"),
taskIdentifier: options.taskIdentifier,
deduplicationKey,
userProvidedDeduplicationKey:
options.deduplicationKey !== undefined && options.deduplicationKey !== "",
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : undefined,
},
- });
+ });📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| const scheduleRecord = await this._prisma.taskSchedule.create({ | |
| return await $transaction( | |
| this._prisma, | |
| "UpsertTaskSchedule.upsertNewSchedule", | |
| async (tx) => { | |
| const scheduleRecord = await tx.taskSchedule.create({ | |
| data: { | |
| projectId, | |
| friendlyId: generateFriendlyId("sched"), | |
| taskIdentifier: options.taskIdentifier, | |
| deduplicationKey, | |
| userProvidedDeduplicationKey: | |
| options.deduplicationKey !== undefined && options.deduplicationKey !== "", | |
| generatorExpression: options.cron, | |
| generatorDescription: cronstrue.toString(options.cron), | |
| timezone: options.timezone ?? "UTC", | |
| externalId: options.externalId ? options.externalId : undefined, | |
| }, | |
| }); |
| }, | ||
| //create the instances (links to environments) | ||
| for (const environmentId of options.environments) { | ||
| const instance = await this._prisma.taskScheduleInstance.create({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Non-Transactional Database Operations.
Database operations should use the transaction object to maintain atomicity with the schedule creation.
Current Code (Diff):
- const instance = await this._prisma.taskScheduleInstance.create({
+ const instance = await tx.taskScheduleInstance.create({📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| const instance = await this._prisma.taskScheduleInstance.create({ | |
| const instance = await tx.taskScheduleInstance.create({ |
|
|
||
| await registerNextService.call(instance.id); | ||
| } | ||
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Missing Error Handling for Schedule Registration.
The call to scheduleEngine.registerNextTaskScheduleInstance lacks error handling, which could leave the system in an inconsistent state if registration fails.
Current Code (Diff):
- await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+ try {
+ await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+ } catch (error) {
+ console.error(`Failed to register next task schedule instance: ${error.message}`);
+ throw new Error(`Failed to register schedule instance: ${error.message}`);
+ }📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | |
| try { | |
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | |
| } catch (error) { | |
| console.error(`Failed to register next task schedule instance: ${error.message}`); | |
| throw new Error(`Failed to register schedule instance: ${error.message}`); | |
| } |
| return { scheduleRecord }; | ||
| } | ||
| ); | ||
| return { scheduleRecord }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Transaction Closure Missing.
The transaction closure is missing, which should wrap up all database operations to ensure atomicity.
Current Code (Diff):
- return { scheduleRecord };
+ return { scheduleRecord };
+ }
+ );📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| return { scheduleRecord }; | |
| return { scheduleRecord }; | |
| } | |
| ); |
| const scheduleRecord = await this._prisma.taskSchedule.update({ | ||
| where: { | ||
| id: existingSchedule.id, | ||
| }, | ||
| data: { | ||
| generatorExpression: options.cron, | ||
| generatorDescription: cronstrue.toString(options.cron), | ||
| timezone: options.timezone ?? "UTC", | ||
| externalId: options.externalId ? options.externalId : null, | ||
| }, | ||
| }); | ||
|
|
||
| const scheduleHasChanged = | ||
| scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || | ||
| scheduleRecord.timezone !== existingSchedule.timezone; | ||
| const scheduleHasChanged = | ||
| scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || | ||
| scheduleRecord.timezone !== existingSchedule.timezone; | ||
|
|
||
| // create the new instances | ||
| const newInstances: InstanceWithEnvironment[] = []; | ||
| const updatingInstances: InstanceWithEnvironment[] = []; | ||
| // create the new instances | ||
| const newInstances: InstanceWithEnvironment[] = []; | ||
| const updatingInstances: InstanceWithEnvironment[] = []; | ||
|
|
||
| for (const environmentId of options.environments) { | ||
| const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); | ||
| for (const environmentId of options.environments) { | ||
| const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); | ||
|
|
||
| if (existingInstance) { | ||
| // Update the existing instance | ||
| updatingInstances.push(existingInstance); | ||
| } else { | ||
| // Create a new instance | ||
| const instance = await tx.taskScheduleInstance.create({ | ||
| data: { | ||
| taskScheduleId: scheduleRecord.id, | ||
| environmentId, | ||
| }, | ||
| if (existingInstance) { | ||
| // Update the existing instance | ||
| updatingInstances.push(existingInstance); | ||
| } else { | ||
| // Create a new instance | ||
| const instance = await this._prisma.taskScheduleInstance.create({ | ||
| data: { | ||
| taskScheduleId: scheduleRecord.id, | ||
| environmentId, | ||
| }, | ||
| include: { | ||
| environment: { | ||
| include: { | ||
| environment: { | ||
| orgMember: { | ||
| include: { | ||
| orgMember: { | ||
| include: { | ||
| user: true, | ||
| }, | ||
| }, | ||
| user: true, | ||
| }, | ||
| }, | ||
| }, | ||
| }); | ||
|
|
||
| newInstances.push(instance); | ||
| } | ||
| } | ||
| }, | ||
| }, | ||
| }); | ||
|
|
||
| // find the instances that need to be removed | ||
| const instancesToDeleted = existingInstances.filter( | ||
| (i) => !options.environments.includes(i.environmentId) | ||
| ); | ||
| newInstances.push(instance); | ||
| } | ||
| } | ||
|
|
||
| // delete the instances no longer selected | ||
| for (const instance of instancesToDeleted) { | ||
| await tx.taskScheduleInstance.delete({ | ||
| where: { | ||
| id: instance.id, | ||
| }, | ||
| }); | ||
| } | ||
| // find the instances that need to be removed | ||
| const instancesToDeleted = existingInstances.filter( | ||
| (i) => !options.environments.includes(i.environmentId) | ||
| ); | ||
|
|
||
| const registerService = new RegisterNextTaskScheduleInstanceService(tx); | ||
| // delete the instances no longer selected | ||
| for (const instance of instancesToDeleted) { | ||
| await this._prisma.taskScheduleInstance.delete({ | ||
| where: { | ||
| id: instance.id, | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| for (const instance of newInstances) { | ||
| await registerService.call(instance.id); | ||
| } | ||
| for (const instance of newInstances) { | ||
| // Register the new task schedule instances | ||
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | ||
| } | ||
|
|
||
| if (scheduleHasChanged) { | ||
| for (const instance of updatingInstances) { | ||
| await registerService.call(instance.id); | ||
| } | ||
| } | ||
| if (scheduleHasChanged) { | ||
| for (const instance of updatingInstances) { | ||
| // Update the existing task schedule instances | ||
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | ||
| } | ||
| } | ||
|
|
||
| return { scheduleRecord }; | ||
| }, | ||
| { timeout: 10_000 } | ||
| ); | ||
| return { scheduleRecord }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Removed Database Transaction Protection.
The code now performs multiple database operations without transaction protection, which could lead to data inconsistency if any operation fails.
Current Code (Diff):
- const scheduleRecord = await this._prisma.taskSchedule.update({
- where: {
- id: existingSchedule.id,
- },
- data: {
- generatorExpression: options.cron,
- generatorDescription: cronstrue.toString(options.cron),
- timezone: options.timezone ?? "UTC",
- externalId: options.externalId ? options.externalId : null,
- },
- });
-
- const scheduleHasChanged =
- scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
- scheduleRecord.timezone !== existingSchedule.timezone;
-
- // create the new instances
- const newInstances: InstanceWithEnvironment[] = [];
- const updatingInstances: InstanceWithEnvironment[] = [];
-
- for (const environmentId of options.environments) {
- const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
-
- if (existingInstance) {
- // Update the existing instance
- updatingInstances.push(existingInstance);
- } else {
- // Create a new instance
- const instance = await this._prisma.taskScheduleInstance.create({
- data: {
- taskScheduleId: scheduleRecord.id,
- environmentId,
- },
- include: {
- environment: {
include: {
- orgMember: {
include: {
- user: true,
},
},
},
- },
- },
- });
-
- newInstances.push(instance);
- }
- }
-
- // find the instances that need to be removed
- const instancesToDeleted = existingInstances.filter(
- (i) => !options.environments.includes(i.environmentId)
- );
-
- // delete the instances no longer selected
- for (const instance of instancesToDeleted) {
- await this._prisma.taskScheduleInstance.delete({
- where: {
- id: instance.id,
- },
- });
- }
-
- for (const instance of newInstances) {
- // Register the new task schedule instances
- await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
- }
-
- if (scheduleHasChanged) {
- for (const instance of updatingInstances) {
- // Update the existing task schedule instances
- await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
- }
- }
-
- return { scheduleRecord };
+ return await $transaction(
+ this._prisma,
+ async (tx) => {
+ const scheduleRecord = await tx.taskSchedule.update({
+ where: {
+ id: existingSchedule.id,
+ },
+ data: {
+ generatorExpression: options.cron,
+ generatorDescription: cronstrue.toString(options.cron),
+ timezone: options.timezone ?? "UTC",
+ externalId: options.externalId ? options.externalId : null,
+ },
+ });
+
+ const scheduleHasChanged =
+ scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
+ scheduleRecord.timezone !== existingSchedule.timezone;
+
+ // create the new instances
+ const newInstances: InstanceWithEnvironment[] = [];
+ const updatingInstances: InstanceWithEnvironment[] = [];
+
+ for (const environmentId of options.environments) {
+ const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
+
+ if (existingInstance) {
+ // Update the existing instance
+ updatingInstances.push(existingInstance);
+ } else {
+ // Create a new instance
+ const instance = await tx.taskScheduleInstance.create({
+ data: {
+ taskScheduleId: scheduleRecord.id,
+ environmentId,
+ },
+ include: {
+ environment: {
+ include: {
+ orgMember: {
+ include: {
+ user: true,
+ },
+ },
+ },
+ },
+ },
+ });
+
+ newInstances.push(instance);
+ }
+ }
+
+ // find the instances that need to be removed
+ const instancesToDeleted = existingInstances.filter(
+ (i) => !options.environments.includes(i.environmentId)
+ );
+
+ // delete the instances no longer selected
+ for (const instance of instancesToDeleted) {
+ await tx.taskScheduleInstance.delete({
+ where: {
+ id: instance.id,
+ },
+ });
+ }
+
+ for (const instance of newInstances) {
+ // Register the new task schedule instances
+ await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+ }
+
+ if (scheduleHasChanged) {
+ for (const instance of updatingInstances) {
+ // Update the existing task schedule instances
+ await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
+ }
+ }
+
+ return { scheduleRecord };
+ },
+ { timeout: 10_000 }
+ );📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| const scheduleRecord = await this._prisma.taskSchedule.update({ | |
| where: { | |
| id: existingSchedule.id, | |
| }, | |
| data: { | |
| generatorExpression: options.cron, | |
| generatorDescription: cronstrue.toString(options.cron), | |
| timezone: options.timezone ?? "UTC", | |
| externalId: options.externalId ? options.externalId : null, | |
| }, | |
| }); | |
| const scheduleHasChanged = | |
| scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || | |
| scheduleRecord.timezone !== existingSchedule.timezone; | |
| const scheduleHasChanged = | |
| scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || | |
| scheduleRecord.timezone !== existingSchedule.timezone; | |
| // create the new instances | |
| const newInstances: InstanceWithEnvironment[] = []; | |
| const updatingInstances: InstanceWithEnvironment[] = []; | |
| // create the new instances | |
| const newInstances: InstanceWithEnvironment[] = []; | |
| const updatingInstances: InstanceWithEnvironment[] = []; | |
| for (const environmentId of options.environments) { | |
| const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); | |
| for (const environmentId of options.environments) { | |
| const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); | |
| if (existingInstance) { | |
| // Update the existing instance | |
| updatingInstances.push(existingInstance); | |
| } else { | |
| // Create a new instance | |
| const instance = await tx.taskScheduleInstance.create({ | |
| data: { | |
| taskScheduleId: scheduleRecord.id, | |
| environmentId, | |
| }, | |
| if (existingInstance) { | |
| // Update the existing instance | |
| updatingInstances.push(existingInstance); | |
| } else { | |
| // Create a new instance | |
| const instance = await this._prisma.taskScheduleInstance.create({ | |
| data: { | |
| taskScheduleId: scheduleRecord.id, | |
| environmentId, | |
| }, | |
| include: { | |
| environment: { | |
| include: { | |
| environment: { | |
| orgMember: { | |
| include: { | |
| orgMember: { | |
| include: { | |
| user: true, | |
| }, | |
| }, | |
| user: true, | |
| }, | |
| }, | |
| }, | |
| }); | |
| newInstances.push(instance); | |
| } | |
| } | |
| }, | |
| }, | |
| }); | |
| // find the instances that need to be removed | |
| const instancesToDeleted = existingInstances.filter( | |
| (i) => !options.environments.includes(i.environmentId) | |
| ); | |
| newInstances.push(instance); | |
| } | |
| } | |
| // delete the instances no longer selected | |
| for (const instance of instancesToDeleted) { | |
| await tx.taskScheduleInstance.delete({ | |
| where: { | |
| id: instance.id, | |
| }, | |
| }); | |
| } | |
| // find the instances that need to be removed | |
| const instancesToDeleted = existingInstances.filter( | |
| (i) => !options.environments.includes(i.environmentId) | |
| ); | |
| const registerService = new RegisterNextTaskScheduleInstanceService(tx); | |
| // delete the instances no longer selected | |
| for (const instance of instancesToDeleted) { | |
| await this._prisma.taskScheduleInstance.delete({ | |
| where: { | |
| id: instance.id, | |
| }, | |
| }); | |
| } | |
| for (const instance of newInstances) { | |
| await registerService.call(instance.id); | |
| } | |
| for (const instance of newInstances) { | |
| // Register the new task schedule instances | |
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | |
| } | |
| if (scheduleHasChanged) { | |
| for (const instance of updatingInstances) { | |
| await registerService.call(instance.id); | |
| } | |
| } | |
| if (scheduleHasChanged) { | |
| for (const instance of updatingInstances) { | |
| // Update the existing task schedule instances | |
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | |
| } | |
| } | |
| return { scheduleRecord }; | |
| }, | |
| { timeout: 10_000 } | |
| ); | |
| return { scheduleRecord }; | |
| return await $transaction( | |
| this._prisma, | |
| async (tx) => { | |
| const scheduleRecord = await tx.taskSchedule.update({ | |
| where: { | |
| id: existingSchedule.id, | |
| }, | |
| data: { | |
| generatorExpression: options.cron, | |
| generatorDescription: cronstrue.toString(options.cron), | |
| timezone: options.timezone ?? "UTC", | |
| externalId: options.externalId ? options.externalId : null, | |
| }, | |
| }); | |
| const scheduleHasChanged = | |
| scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || | |
| scheduleRecord.timezone !== existingSchedule.timezone; | |
| // create the new instances | |
| const newInstances: InstanceWithEnvironment[] = []; | |
| const updatingInstances: InstanceWithEnvironment[] = []; | |
| for (const environmentId of options.environments) { | |
| const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); | |
| if (existingInstance) { | |
| // Update the existing instance | |
| updatingInstances.push(existingInstance); | |
| } else { | |
| // Create a new instance | |
| const instance = await tx.taskScheduleInstance.create({ | |
| data: { | |
| taskScheduleId: scheduleRecord.id, | |
| environmentId, | |
| }, | |
| include: { | |
| environment: { | |
| include: { | |
| orgMember: { | |
| include: { | |
| user: true, | |
| }, | |
| }, | |
| }, | |
| }, | |
| }, | |
| }); | |
| newInstances.push(instance); | |
| } | |
| } | |
| // find the instances that need to be removed | |
| const instancesToDeleted = existingInstances.filter( | |
| (i) => !options.environments.includes(i.environmentId) | |
| ); | |
| // delete the instances no longer selected | |
| for (const instance of instancesToDeleted) { | |
| await tx.taskScheduleInstance.delete({ | |
| where: { | |
| id: instance.id, | |
| }, | |
| }); | |
| } | |
| for (const instance of newInstances) { | |
| // Register the new task schedule instances | |
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | |
| } | |
| if (scheduleHasChanged) { | |
| for (const instance of updatingInstances) { | |
| // Update the existing task schedule instances | |
| await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); | |
| } | |
| } | |
| return { scheduleRecord }; | |
| }, | |
| { timeout: 10_000 } | |
| ); |
| organizationId: z.string(), | ||
| projectId: z.string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Breaking Change: New Required Fields Added to Schema.
Adding required organizationId and projectId fields to TaskActivityQueryParams will break existing code that uses this schema without these fields.
Current Code (Diff):
- organizationId: z.string(),
- projectId: z.string(),
+ organizationId: z.string().optional(),
+ projectId: z.string().optional(),📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| organizationId: z.string(), | |
| projectId: z.string(), | |
| organizationId: z.string().optional(), | |
| projectId: z.string().optional(), |
| span.setAttribute("nested", true); | ||
| // We're already inside a lock with the same resources, just run the routine | ||
| return routine(currentContext.signal); | ||
| return routine(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Removed AbortSignal parameter breaks cancellation propagation.
Removing the AbortSignal parameter could prevent proper cancellation propagation in nested contexts, potentially causing operations to continue running when they should be cancelled.
Current Code (Diff):
- return routine();
+ return routine(currentContext.signal);📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| return routine(); | |
| return routine(currentContext.signal); |
| [runId], | ||
| 5000, | ||
| async (signal) => { | ||
| async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Correctness Issue
Removed critical timeout parameter from lock function.
Removing the 5000ms timeout parameter could cause indefinite hanging if the lock can't be acquired, potentially leading to deadlocks in production.
Current Code (Diff):
- async () => {
+ 5000,
+ async (signal) => {📝 Committable suggestion
‼️ IMPORTANT
Trust, but verify! 🕵️ Please review this suggestion with the care of a code archaeologist - check that it perfectly replaces the highlighted code, preserves all lines, maintains proper indentation, and won't break anything in production. Your future self will thank you! 🚀
| async () => { | |
| 5000, | |
| async (signal) => { |
This PR contains changes from a range of commits from the original repository.
Commit Range:
3f2b748..ea6f66bFiles Changed: 105 (66 programming files)
Programming Ratio: 62.9%
Commits included:
... and 5 more commits