Skip to content

Conversation

muhammad-ali-e
Copy link
Contributor

What

  • Implemented centralized database retry mechanism with connection pool refresh for Django ORM operations
  • Added automatic retry logic for Celery database backend operations
  • Applied retry decorators to critical workflow chain components to prevent stuck executions

Why

  • WorkflowFileExecution records were getting stuck in "EXECUTING" status when database connections were dropped
  • Database connection errors in workflow chains were causing permanent execution failures without recovery
  • Lack of unified retry mechanism across Django ORM and Celery SQLAlchemy operations

How

  • Created db_constants.py with centralized error patterns, configuration, and logging templates
  • Implemented db_retry.py with decorator and context manager for Django ORM retry logic with connection pool refresh
  • Added celery_db_retry.py for Celery SQLAlchemy backend retry with engine disposal
  • Applied @db_retry() decorator to critical workflow chain methods in workflow execution modules

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

  • No - The retry mechanism is defensive and only activates on connection errors
  • Existing functionality continues to work normally when database is healthy
  • Retry delays are configurable via environment variables with sensible defaults

Database Migrations

  • None required

Env Config

  • Optional: DB_RETRY_MAX_RETRIES (default: 3)
  • Optional: DB_RETRY_BASE_DELAY (default: 1.0 seconds)
  • Optional: DB_RETRY_MAX_DELAY (default: 30.0 seconds)
  • Optional: CELERY_USE_BUILTIN_RETRY (default: False)

Relevant Docs

  • Added comprehensive docstrings in all retry modules
  • Centralized configuration documentation in db_constants.py

Related Issues or PRs

  • Jira Ticket: UN-2798

Dependencies Versions

  • No new dependencies added

Notes on Testing

  • Test with database connection interruptions during workflow execution
  • Verify WorkflowFileExecution records complete successfully after temporary database outages
  • Monitor retry logs to ensure proper backoff behavior

Screenshots

  • N/A

Checklist

I have read and understood the Contribution Guidelines.

- Add centralized database retry mechanism with connection pool refresh
- Implement retry logic for Django ORM and Celery SQLAlchemy operations
- Apply @db_retry decorators to critical workflow chain components
- Handle stale connections, database unavailability, and transient errors
- Configure exponential backoff with customizable retry parameters
Copy link
Contributor

coderabbitai bot commented Sep 9, 2025

Summary by CodeRabbit

  • New Features

    • Automatic retry for database operations across organization, usage, and workflow actions to reduce transient failures.
    • Celery result backend gains hybrid, configurable retry with exponential backoff; background workers patched for greater resilience and stability.
  • Documentation

    • Added sample environment variables documenting DB/Celery retry and connection pool options.
  • Chores

    • Introduced environment-driven settings for retry counts/delays, connection timeouts, and SQL echo.
    • Enhanced logging of configured retry behavior for visibility during runtime.

Walkthrough

Adds centralized DB error classification and retry utilities, applies a @db_retry() decorator across multiple Django DB entrypoints, introduces a hybrid Celery SQLAlchemy retry/patching module, updates Celery config and worker startup to apply the patch, and adds environment-driven retry and PgBouncer settings plus sample.env documentation.

Changes

Cohort / File(s) Summary
Core retry utilities
backend/utils/db_constants.py, backend/utils/db_retry.py
New modules providing error classification enums/patterns, RetryConfiguration and LogMessages, and reusable retry utilities (decorator, context manager, helpers) for Django ORM and Celery usage.
Celery retry infrastructure
backend/backend/celery_db_retry.py
New hybrid retry implementation supporting Celery built-in retry or custom exponential backoff; provides patching for Celery DatabaseBackend methods, engine options for PgBouncer, and helpers to dispose/refresh SQLAlchemy engines.
Celery integration points
backend/backend/celery_config.py, backend/backend/celery_service.py, backend/backend/workers/file_processing/file_processing.py, backend/backend/workers/file_processing_callback/file_processing_callback.py
Add result_backend_transport_options to CeleryConfig, conditionally configure built-in retry attributes, and call patch_celery_database_backend() after Celery app creation to enable DB retry behavior.
Django retry application (decorators)
backend/account_v2/organization.py, backend/usage_v2/helper.py, backend/utils/models/organization_mixin.py, backend/utils/user_context.py, backend/workflow_manager/file_execution/models.py, backend/workflow_manager/workflow_v2/execution.py, backend/workflow_manager/workflow_v2/models/execution.py
Applied @db_retry() to numerous model/manager/static methods to add automatic retry semantics for transient DB errors; signatures unchanged.
Settings and documentation
backend/backend/settings/base.py, backend/sample.env
Added environment-driven settings for DB/Celery retry policies, connect timeout, and SQL echo; updated sample.env with commented examples documenting the new variables.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Env as Environment
  participant App as Celery App
  participant Patch as celery_db_retry.patch_celery_database_backend
  participant Cfg as CeleryConfig
  participant DBB as Celery DatabaseBackend
  participant DB as PostgreSQL/PgBouncer

  Env->>Cfg: Read CELERY_* env vars
  Note right of Cfg: Set transport options and\nconditional built-in retry attrs
  App->>Patch: Invoke after Celery app creation
  Patch->>DBB: Patch _store_result/_get_task_meta_for/get_result...
  Note over DBB: Methods wrapped with\ncustom backoff or builtin retry

  App->>DBB: Store/get task result
  DBB->>DB: Execute SQL
  alt Connection/transient error
    DBB-->>DBB: Classify error (utils.db_constants)
    DBB-->>DBB: Backoff + optional pool dispose
    DBB->>DB: Retry until success/exhausted
  else Success
    DBB-->>App: Return result/meta
  end
Loading
sequenceDiagram
  autonumber
  participant Caller as App code
  participant Wrap as utils.db_retry.db_retry
  participant ORM as Django ORM
  participant DB as PostgreSQL/PgBouncer

  Caller->>Wrap: Call decorated function
  Wrap->>ORM: Execute query/operation
  ORM->>DB: SQL
  alt Error occurs
    Wrap-->>Wrap: Classify error & decide retry
    opt Requires pool refresh
      Wrap-->>ORM: close_old_connections()
    end
    Wrap->>DB: Retry with exponential backoff
    alt Exhausted
      Wrap-->>Caller: Raise last error
    else Success
      Wrap-->>Caller: Return result
    end
  else Success
    Wrap-->>Caller: Return result
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 76.47% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title concisely and accurately reflects the main goal of the changeset by specifying the fix of WorkflowFileExecution stuck in EXECUTING due to database connection errors, aligning with the PR’s primary objective without unnecessary detail.
Description Check ✅ Passed The description uses all required template sections—What, Why, How, risk assessment, migrations, env config, docs, related issues, dependencies, testing notes, screenshots, and checklist—and each section is populated with relevant details, satisfying the repository’s PR description guidelines.
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch UN-2798-fix-database-retry-workflow-execution-stuck-issue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
backend/workflow_manager/file_execution/models.py (1)

21-57: Handle rare IntegrityError race on get_or_create

Under concurrency, get_or_create can raise IntegrityError due to unique constraints. Retrying with a fetch is a common pattern and aligns with the retry theme of this PR.

Apply this diff within the function:

-        execution_file, is_created = self.get_or_create(**lookup_fields)
+        try:
+            execution_file, is_created = self.get_or_create(**lookup_fields)
+        except Exception as e:
+            # If the exception is an IntegrityError (unique race), fetch existing
+            from django.db import IntegrityError
+            if isinstance(e, IntegrityError):
+                execution_file = self.get(**lookup_fields)
+                is_created = False
+            else:
+                raise

Note: If you prefer explicit imports, add from django.db import IntegrityError at module top.

backend/account_v2/organization.py (1)

24-38: Make create_organization idempotent across retries.

If the first insert commits but a connection error fires post-commit, a retry will raise IntegrityError. Prefer get_or_create to guarantee idempotency under retries.

-    def create_organization(
+    def create_organization(
         name: str, display_name: str, organization_id: str
     ) -> Organization:
-        try:
-            organization: Organization = Organization(
-                name=name,
-                display_name=display_name,
-                organization_id=organization_id,
-            )
-            organization.save()
-        except IntegrityError as error:
-            Logger.info(f"[Duplicate Id] Failed to create Organization Error: {error}")
-            raise error
-        return organization
+        organization, created = Organization.objects.get_or_create(  # type: ignore
+            organization_id=organization_id,
+            defaults={"name": name, "display_name": display_name},
+        )
+        if not created:
+            Logger.info(
+                f"[Duplicate Id] Organization already exists: organization_id={organization_id}"
+            )
+        return organization
backend/workflow_manager/workflow_v2/models/execution.py (2)

242-276: Fix attempts double-increment under retries.

When increment_attempt=True, a post-commit connection error can cause the decorator to re-run and increment attempts twice. Use an optimistic, idempotent DB update.

         if error:
             self.error_message = error[:EXECUTION_ERROR_LENGTH]
         if increment_attempt:
-            self.attempts += 1
+            # Idempotent increment guarded by current value
+            updated = type(self).objects.filter(
+                pk=self.pk, attempts=self.attempts
+            ).update(attempts=models.F("attempts") + 1)
+            if updated:
+                self.attempts += 1
 
         self.save()

286-304: Guard against null workflow in cache handler.

workflow is nullable; dereferencing self.workflow.id will crash and retries won’t help. Use workflow_id and early-return when absent.

-    def _handle_execution_cache(self):
-        if not ExecutionCacheUtils.is_execution_exists(
-            workflow_id=self.workflow.id, execution_id=self.id
-        ):
+    def _handle_execution_cache(self):
+        if not self.workflow_id:
+            logger.debug("Skipping execution cache update: workflow is None")
+            return
+        if not ExecutionCacheUtils.is_execution_exists(
+            workflow_id=self.workflow_id, execution_id=self.id
+        ):
             execution_cache = ExecutionCache(
-                workflow_id=self.workflow.id,
+                workflow_id=self.workflow_id,
                 execution_id=self.id,
                 total_files=self.total_files,
                 status=self.status,
             )
             ExecutionCacheUtils.create_execution(
                 execution=execution_cache,
             )
         else:
             ExecutionCacheUtils.update_status(
-                workflow_id=self.workflow.id, execution_id=self.id, status=self.status
+                workflow_id=self.workflow_id, execution_id=self.id, status=self.status
             )
🧹 Nitpick comments (20)
backend/usage_v2/helper.py (1)

59-66: Remove unreachable exception handler

QuerySet.aggregate(...) does not raise DoesNotExist; it returns a dict with None values when no rows match. This handler won’t execute.

Apply this diff:

-        except Usage.DoesNotExist:
-            # Handle the case where no usage data is found for the given run_id
-            logger.warning(f"Usage data not found for the specified run_id: {run_id}")
-            return {}

Optional: if callers prefer zeros, normalize None to 0 before returning.

backend/workflow_manager/file_execution/models.py (2)

59-69: Limit save to updated fields

Minor write optimization: specify update_fields in save() since we know the exact fields being set.

Apply this diff:

-        execution_file.file_name = file_hash.file_name
-        execution_file.file_size = file_hash.file_size
-        execution_file.mime_type = file_hash.mime_type
-        execution_file.provider_file_uuid = file_hash.provider_file_uuid
-        execution_file.fs_metadata = file_hash.fs_metadata
-        execution_file.save()
+        execution_file.file_name = file_hash.file_name
+        execution_file.file_size = file_hash.file_size
+        execution_file.mime_type = file_hash.mime_type
+        execution_file.provider_file_uuid = file_hash.provider_file_uuid
+        execution_file.fs_metadata = file_hash.fs_metadata
+        execution_file.save(
+            update_fields=[
+                "file_name",
+                "file_size",
+                "mime_type",
+                "provider_file_uuid",
+                "fs_metadata",
+            ]
+        )

124-156: Use update_fields to reduce row write size; confirm enum assignment

As with the other model, restrict writes to touched columns. Also confirm ExecutionStatus type: if it’s a TextChoices (str), assigning the enum is fine; if it’s a plain Enum, use .value. Current code assigns the enum directly.

Apply this diff:

-        self.status = status
+        self.status = status  # if plain Enum, use: status.value
@@
-        self.execution_error = execution_error
-        self.save()
+        self.execution_error = execution_error
+        update_fields = ["status", "execution_error"]
+        if self.execution_time is not None:
+            update_fields.append("execution_time")
+        self.save(update_fields=update_fields)

If ExecutionStatus is a plain Enum in this app, change to self.status = status.value.

backend/workflow_manager/workflow_v2/models/execution.py (1)

198-227: Optional: add retry to aggregated_usage_cost.

This property performs .exists() and .aggregate(); consider wrapping with @db_retry to harden reads against transient errors.

backend/utils/db_retry.py (5)

200-205: Log full trace when retries exhausted.

Use logger.exception to keep stack traces.

-                logger.error(
+                logger.exception(
                     LogMessages.format_message(
                         LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e
                     )
                 )

171-179: Broad except during pool refresh — annotate or narrow.

If you keep catching all exceptions here, add a BLE001 ignore or narrow if possible.

-                    except Exception as refresh_error:
+                    except Exception as refresh_error:  # noqa: BLE001
                         logger.warning(
                             LogMessages.format_message(
                                 LogMessages.POOL_REFRESH_FAILED, error=refresh_error
                             )
                         )

374-381: Log full trace when context retries exhausted.

-                logger.error(
+                logger.exception(
                     LogMessages.format_message(
                         LogMessages.MAX_RETRIES_EXCEEDED,
                         total=final_max_retries + 1,
                         error=e,
                     )
                 )

158-159: Optional: add jitter to backoff to avoid herd effects.

-                delay = min(base_delay * (2**retry_count), max_delay)
+                delay = min(base_delay * (2**retry_count), max_delay)
+                # Optional jitter
+                # delay *= 1 + random.uniform(0, 0.25)

Add at top if you enable jitter:

-import logging
+import logging
+import random

Also applies to: 332-333


114-209: Operational advice: emit metrics for retries.

Expose counters/gauges (attempts, succeeded_on_retry, exhausted) and error-type labels to observe impact and tune defaults.

backend/backend/celery_db_retry.py (8)

162-169: Emit stack traces on terminal failure.

Use logger.exception so we retain the traceback when retries are exhausted.

Apply this diff:

-                            logger.error(
+                            logger.exception(
                                 LogMessages.format_message(
                                     LogMessages.MAX_RETRIES_EXCEEDED,
                                     total=current_max_retries + 1,
                                     error=e,
                                 )
                             )

187-203: Remove unused parameter and improve error logging in engine disposal helper.

func is unused; also prefer logger.exception to retain TB.

Apply this diff:

-def _dispose_sqlalchemy_engine(func):
+def _dispose_sqlalchemy_engine():
@@
-    except Exception as e:
-        logger.warning(f"Could not dispose SQLAlchemy engine: {e}")
+    except Exception:
+        logger.exception("Could not dispose SQLAlchemy engine")

And update the call site:

-                                    _dispose_sqlalchemy_engine(func)
+                                    _dispose_sqlalchemy_engine()

314-335: Config sources for “built-in” vs “custom” retry are inconsistent.

Built-in branch reads generic DB_RETRY_* via get_retry_settings(), while custom decorator uses CELERY_DB_RETRY_* via get_celery_retry_settings(). This is confusing in ops/docs.

Consider standardizing on the CELERY_DB_RETRY_* variables for Celery in both paths (and keep DB_RETRY_* for Django ORM), or clearly document the split. If standardizing, change this block to get_celery_retry_settings() and rename the envs accordingly.

Would you like a follow-up patch to unify the variable names and sample.env?


114-121: Add jitter and consistent delay logging for refresh branch.

  • Add small random jitter to avoid stampedes when multiple workers retry simultaneously.
  • When needs_refresh is True, we don’t log the retry delay; include it for parity.

Example diff:

+                            import random
+                            jitter = random.uniform(0, current_base_delay * 0.1)
-                            delay = min(
-                                current_base_delay * (2**retry_count), current_max_delay
-                            )
+                            delay = min(
+                                current_base_delay * (2**retry_count) + jitter,
+                                current_max_delay,
+                            )
@@
-                                try:
+                                try:
                                     _dispose_sqlalchemy_engine()
                                     logger.info(
                                         "SQLAlchemy connection pool disposed successfully"
                                     )
                                 except Exception as refresh_error:
                                     logger.warning(
                                         LogMessages.format_message(
                                             LogMessages.POOL_REFRESH_FAILED,
                                             error=refresh_error,
                                         )
                                     )
+                                logger.warning(
+                                    LogMessages.format_message(
+                                        LogMessages.CONNECTION_ERROR_RETRY,
+                                        attempt=retry_count,
+                                        total=current_max_retries + 1,
+                                        error=e,
+                                        delay=delay,
+                                    )
+                                )

Also applies to: 149-159


299-303: Use logger.exception when patching fails.

Preserve traceback to aid diagnosis in ops.

Apply this diff:

-    except Exception as e:
-        logger.error(f"Error patching Celery database backend: {e}")
+    except Exception:
+        logger.exception("Error patching Celery database backend")

338-341: Use logger.exception for built-in retry configuration errors.

Keeps TB for debugging misconfigurations.

Apply this diff:

-    except Exception as e:
-        logger.error(f"Error configuring Celery built-in retry: {e}")
+    except Exception:
+        logger.exception("Error configuring Celery built-in retry")

354-369: Minor: centralize env keys for engine options.

"CELERY_DB_CONNECT_TIMEOUT" and "CELERY_DB_ECHO_SQL" are magic strings here; consider adding constants to RetryConfiguration for discoverability and typed conversion.


179-181: Remove unreachable code after the retry loop.

The return func(*args, **kwargs) is not reachable; it can mask future refactors.

Apply this diff:

-            # This should never be reached, but included for completeness
-            return func(*args, **kwargs)
+            return None  # unreachable by design

Or remove the block entirely.

backend/backend/celery_config.py (1)

1-1: Use logging instead of prints in config.

Printing from config code is noisy in production. Prefer module logger.

Apply this diff:

+import logging
 import os
@@
+logger = logging.getLogger(__name__)
@@
-        print(
-            f"[Celery Config] Using built-in retry: max_retries={result_backend_max_retries}, "
-        )
-        print(
-            f"base_sleep={result_backend_base_sleep_between_retries_ms}ms, max_sleep={result_backend_max_sleep_between_retries_ms}ms"
-        )
+        logger.info(
+            "[Celery Config] Using built-in retry: max_retries=%s, base_sleep=%sms, max_sleep=%sms",
+            result_backend_max_retries,
+            result_backend_base_sleep_between_retries_ms,
+            result_backend_max_sleep_between_retries_ms,
+        )
@@
-        print("[Celery Config] Using custom retry system (patching enabled)")
+        logger.info("[Celery Config] Using custom retry system (patching enabled)")

Also applies to: 60-66, 67-68

backend/utils/db_constants.py (2)

128-131: PEP 604 style for optional type.

Adopt str | None per Ruff RUF013 and modern typing.

Apply this diff:

-    def classify_error(
-        cls, error: Exception, error_message: str = None
-    ) -> tuple[DatabaseErrorType, bool]:
+    def classify_error(
+        cls, error: Exception, error_message: str | None = None
+    ) -> tuple[DatabaseErrorType, bool]:

171-195: Centralize Celery engine-related env keys.

Since celery_db_retry.get_celery_db_engine_options() reads CELERY_DB_CONNECT_TIMEOUT and CELERY_DB_ECHO_SQL, consider adding constants for these here to keep all Celery DB config discoverable in one place.

Also applies to: 280-296

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 4957c58 and 86cff59.

📒 Files selected for processing (16)
  • backend/account_v2/organization.py (2 hunks)
  • backend/backend/celery_config.py (2 hunks)
  • backend/backend/celery_db_retry.py (1 hunks)
  • backend/backend/celery_service.py (2 hunks)
  • backend/backend/settings/base.py (1 hunks)
  • backend/backend/workers/file_processing/file_processing.py (2 hunks)
  • backend/backend/workers/file_processing_callback/file_processing_callback.py (2 hunks)
  • backend/sample.env (1 hunks)
  • backend/usage_v2/helper.py (3 hunks)
  • backend/utils/db_constants.py (1 hunks)
  • backend/utils/db_retry.py (1 hunks)
  • backend/utils/models/organization_mixin.py (2 hunks)
  • backend/utils/user_context.py (2 hunks)
  • backend/workflow_manager/file_execution/models.py (5 hunks)
  • backend/workflow_manager/workflow_v2/execution.py (2 hunks)
  • backend/workflow_manager/workflow_v2/models/execution.py (5 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py

74-74: Consider moving this statement to an else block

(TRY300)


135-135: Do not catch blind exception: Exception

(BLE001)


162-168: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


187-187: Unused function argument: func

(ARG001)


201-201: Do not catch blind exception: Exception

(BLE001)


301-301: Do not catch blind exception: Exception

(BLE001)


302-302: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


338-338: Do not catch blind exception: Exception

(BLE001)


339-339: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backend/utils/db_retry.py

174-174: Do not catch blind exception: Exception

(BLE001)


200-204: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


314-314: Consider moving this statement to an else block

(TRY300)


348-348: Do not catch blind exception: Exception

(BLE001)


374-380: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backend/utils/db_constants.py

129-129: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (17)
backend/backend/celery_service.py (1)

10-10: Import looks good

Importing from a single module keeps the patch centralized. No issues.

backend/backend/workers/file_processing/file_processing.py (1)

10-10: Import is fine

Consistent with other workers; OK.

backend/backend/workers/file_processing_callback/file_processing_callback.py (1)

10-10: Import is fine

Consistent and clear.

backend/utils/user_context.py (2)

20-21: Verify retry classification doesn’t include ProgrammingError

@db_retry() here is good, but ProgrammingError is explicitly handled below as a non-retryable path. Ensure the retry classifier excludes ProgrammingError, otherwise this decorator could mask setup issues and loop unnecessarily.


5-5: Decorator import OK

Centralizing retry is consistent with the PR’s goals.

backend/workflow_manager/workflow_v2/execution.py (2)

395-396: Decorator order is correct

@db_retry() applied under @staticmethod is appropriate (decorated function becomes the static target).


395-401: The repository contains two definitions of WorkflowExecution—one in the backend and one in the unstract service. It’s unclear which model is used by update_execution_err. Please verify:

  • That the WorkflowExecution imported in backend/workflow_manager/workflow_v2/execution.py is a Django model with a TextField for status using TextChoices.
  • That ExecutionStatus.ERROR.value is compatible with the model’s field (e.g., "ERROR").
  • Whether tightening save(update_fields=[...]) and updating the return type to WorkflowExecution | None is safe given how callers handle None.

Confirm these points to ensure no breaking changes.

backend/usage_v2/helper.py (2)

17-18: LGTM: retry around aggregation

Wrapping read-only aggregation with @db_retry() is safe and helps during transient drops.


69-70: LGTM: retry around aggregation

Same here; read-only and idempotent.

backend/sample.env (1)

29-39: No changes needed in sample.env
The environment variables in sample.env already match the settings in backend/backend/settings/base.py (DB_CONNECTION_RETRY_* and CELERY_DB_RETRY_) and there is no implementation of the new names (DB_RETRY_ or CELERY_USE_BUILTIN_RETRY).

backend/workflow_manager/file_execution/models.py (2)

228-252: LGTM: targeted updates with update_fields

The selective save(update_fields=...) is good practice; keep it.


20-21: Decorator usage is appropriate

Applying @db_retry() on short, idempotent ORM operations is aligned with the PR’s goal and should help prevent stuck states.

Also applies to: 58-59, 124-125, 228-229

backend/utils/models/organization_mixin.py (1)

21-25: Retrying save() looks fine; minimal side effects.

Wrapping save() here is reasonable; the body is idempotent. No further concerns.

backend/account_v2/organization.py (1)

16-21: LGTM: get by org_id under retry.

Idempotent read; safe with @db_retry.

backend/workflow_manager/workflow_v2/models/execution.py (1)

45-66: LGTM: manager cleanup under retry.

count()/delete() are evaluated DB ops; decorating here is effective.

backend/utils/db_retry.py (1)

71-75: Confirm env var names in docs match settings.

Docs mention DB_RETRY_; base.py currently defines DB_CONNECTION_RETRY_ too. Ensure RetryConfiguration reads the intended names.

backend/utils/db_constants.py (1)

301-319: Nice: consistent, reusable log templates.

Centralized messages will make ops logs coherent across Django and Celery paths.

Comment on lines +6 to +7
from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix import path for retry helpers.

File resides under backend/backend/, so the import should reference backend.backend.celery_db_retry. Current path will fail at runtime.

Apply this diff:

-from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry
+from backend.backend.celery_db_retry import (
+    get_celery_db_engine_options,
+    should_use_builtin_retry,
+)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry
from backend.backend.celery_db_retry import (
get_celery_db_engine_options,
should_use_builtin_retry,
)
🤖 Prompt for AI Agents
In backend/backend/celery_config.py around lines 6 to 7, the import path for the
retry helpers is incorrect; update the import to reference the actual module
location by changing the import to use backend.backend.celery_db_retry and
import get_celery_db_engine_options and should_use_builtin_retry from that
module so the runtime can resolve the symbols.

Comment on lines +41 to +43
# Database backend engine options for PgBouncer compatibility
result_backend_transport_options = get_celery_db_engine_options()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm Celery picks up engine options for SQLAlchemy backend.

result_backend_transport_options is correct for Celery’s SQLAlchemy backend; please verify on your Celery version that these are passed to create_engine.

Run:

If your version differs, we may need to adjust the config key or option names.


🏁 Script executed:

#!/bin/bash
# Inspect Celery DatabaseBackend to confirm it uses `result_backend_transport_options`
python - <<'PY'
import inspect
from celery.backends import database
print("DatabaseBackend.create_backend method:\n")
print(inspect.getsource(database.DatabaseBackend.__init__))
PY

Length of output: 118


🏁 Script executed:

#!/bin/bash
# Locate Celery dependency and its pinned version in the repository
grep -R "celery" -n requirements*.txt pyproject.toml setup.py

Length of output: 255


🏁 Script executed:

#!/bin/bash
# Install the pinned Celery version and inspect DatabaseBackend for transport options handling
pip install celery==5.3.4 >/dev/null 2>&1
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend

print("Signature of DatabaseBackend.__init__:\n")
print(inspect.signature(DatabaseBackend.__init__), "\n")
print("Source snippet around engine creation in DatabaseBackend:\n")
src = inspect.getsource(DatabaseBackend)
for line in src.splitlines():
    if "create_engine" in line or "transport_options" in line:
        print(line)
PY

Length of output: 437


🏁 Script executed:

#!/bin/bash
# Install Celery and SQLAlchemy to enable DatabaseBackend inspection
pip install celery==5.3.4 sqlalchemy >/dev/null 2>&1
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend

print("Signature of DatabaseBackend.__init__:\n", inspect.signature(DatabaseBackend.__init__), "\n")
source = inspect.getsource(DatabaseBackend)
print("Lines containing engine creation or transport options:\n")
for line in source.splitlines():
    if "create_engine" in line or "transport_options" in line:
        print(line)
PY

Length of output: 157


🏁 Script executed:

#!/bin/bash
# Print the source of DatabaseBackend.__init__ to inspect engine_options usage
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend
print(inspect.getsource(DatabaseBackend.__init__))
PY

Length of output: 1243


Use database_engine_options instead of result_backend_transport_options in backend/backend/celery_config.py
Celery’s SQLAlchemy result backend reads engine options from conf.database_engine_options (not result_backend_transport_options) and passes them into create_engine via SessionManager.session_factory (docs.celeryq.dev)

🤖 Prompt for AI Agents
In backend/backend/celery_config.py around lines 41 to 43, the code sets Celery
SQLAlchemy engine options using result_backend_transport_options, but Celery
expects these under database_engine_options; change the assignment to set
conf.database_engine_options (i.e., assign get_celery_db_engine_options() to
database_engine_options) so the SQLAlchemy result backend receives the engine
options passed into create_engine via SessionManager.session_factory.

Comment on lines +28 to +30
# Patch Celery database backend to add connection retry logic
patch_celery_database_backend()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Patch after Celery config is loaded and gate behind config flag

Patching before app.config_from_object(...) risks missing backend URL/engine options that come from config. Also, multiple modules call the patch; ensure it’s gated and idempotent.

Suggested change:

  • Move patch after config_from_object.
  • Only patch when built-in retry is disabled.

Apply this diff:

-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
+# Register custom tasks
 TaskRegistry()
 
 # Load task modules from all registered Django app configs.
 app.config_from_object("backend.celery_config.CeleryConfig")
+if not should_use_builtin_retry():
+    # Patch Celery database backend to add connection retry logic
+    patch_celery_database_backend()
 app.autodiscover_tasks()

Additionally, please confirm patch_celery_database_backend() is safe to call multiple times across processes (idempotent). If it’s not, add a module-level “already_patched” guard inside that function.


🏁 Script executed:

#!/bin/bash
# Locate celery_service.py
fd --type f 'celery_service\.py'
echo -e "\n--- celery_service.py snippet (first 200 lines) ---"
sed -n '1,200p' backend/backend/celery_service.py

echo -e "\n--- patch_celery_database_backend definition in celery_db_retry.py ---"
rg -nP '^def patch_celery_database_backend' -C3 backend/backend/celery_db_retry.py || true

echo -e "\n--- Check for idempotency guard in celery_db_retry.py ---"
rg -nP 'already_patched' -C3 backend/backend/celery_db_retry.py || true

echo -e "\n--- should_use_builtin_retry definition in celery_db_retry.py ---"
rg -nP '^def should_use_builtin_retry' -C3 backend/backend/celery_db_retry.py || true

echo -e "\n--- Usages of patch_celery_database_backend across repository ---"
rg -nR 'patch_celery_database_backend' -C1 . || true

Length of output: 2319


🏁 Script executed:

#!/bin/bash
# Print the beginning of celery_db_retry.py including the patch function
sed -n '1,300p' backend/backend/celery_db_retry.py

echo -e "\n--- Check for idempotency guard usage ---"
grep -R "_celery_backend_patched" -n backend/backend/celery_db_retry.py -C5 || true

echo -e "\n--- should_use_builtin_retry function body ---"
sed -n '1,200p' backend/backend/celery_db_retry.py | sed -n '/def should_use_builtin_retry/,/^$/p' || true

echo -e "\n--- Repository-wide usages of patch_celery_database_backend ---"
rg -n 'patch_celery_database_backend' -C1 . || true

Length of output: 17185


Relocate patch_celery_database_backend call
Move the call to immediately after app.config_from_object("backend.celery_config.CeleryConfig") in backend/backend/celery_service.py. patch_celery_database_backend already switches on should_use_builtin_retry() and uses _celery_backend_patched for idempotency—drop the extra conditional wrapper and no additional guard is required.

🤖 Prompt for AI Agents
In backend/backend/celery_service.py around lines 28-30, move the call to
patch_celery_database_backend() so it runs immediately after
app.config_from_object("backend.celery_config.CeleryConfig"); remove the
surrounding conditional wrapper currently guarding the call (since
patch_celery_database_backend already checks should_use_builtin_retry() and uses
_celery_backend_patched for idempotency). Ensure there is no extra guard or
duplicate checks and keep only a direct call to patch_celery_database_backend()
right after app.config_from_object(...) so the patching logic self-controls when
to apply.

Comment on lines +118 to +122
# Database connection retry settings
DB_CONNECTION_RETRY_COUNT = int(os.environ.get("DB_CONNECTION_RETRY_COUNT", "3"))
DB_CONNECTION_RETRY_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_DELAY", "1"))
DB_CONNECTION_RETRY_MAX_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30"))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use float for retry delays; support both DB_RETRY_ and DB_CONNECTION_RETRY_ env names.**

Current ints prevent sub-second backoff and differ from db_retry.py docs (DB_RETRY_*). Parse floats and fall back to either naming for consistency.

-# Database connection retry settings
-DB_CONNECTION_RETRY_COUNT = int(os.environ.get("DB_CONNECTION_RETRY_COUNT", "3"))
-DB_CONNECTION_RETRY_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_DELAY", "1"))
-DB_CONNECTION_RETRY_MAX_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30"))
+# Database connection retry settings (support legacy and new names)
+DB_RETRY_MAX_RETRIES = int(os.environ.get("DB_RETRY_MAX_RETRIES", os.environ.get("DB_CONNECTION_RETRY_COUNT", "3")))
+DB_RETRY_BASE_DELAY = float(os.environ.get("DB_RETRY_BASE_DELAY", os.environ.get("DB_CONNECTION_RETRY_DELAY", "1.0")))
+DB_RETRY_MAX_DELAY = float(os.environ.get("DB_RETRY_MAX_DELAY", os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30.0")))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Database connection retry settings
DB_CONNECTION_RETRY_COUNT = int(os.environ.get("DB_CONNECTION_RETRY_COUNT", "3"))
DB_CONNECTION_RETRY_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_DELAY", "1"))
DB_CONNECTION_RETRY_MAX_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30"))
# Database connection retry settings (support legacy and new names)
DB_RETRY_MAX_RETRIES = int(
os.environ.get(
"DB_RETRY_MAX_RETRIES",
os.environ.get("DB_CONNECTION_RETRY_COUNT", "3"),
)
)
DB_RETRY_BASE_DELAY = float(
os.environ.get(
"DB_RETRY_BASE_DELAY",
os.environ.get("DB_CONNECTION_RETRY_DELAY", "1.0"),
)
)
DB_RETRY_MAX_DELAY = float(
os.environ.get(
"DB_RETRY_MAX_DELAY",
os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30.0"),
)
)
🤖 Prompt for AI Agents
In backend/backend/settings/base.py around lines 118 to 122, the retry delay
constants are parsed as ints and only read DB_CONNECTION_RETRY_* env names;
change parsing to float so sub-second backoff is allowed and accept either
DB_RETRY_* or DB_CONNECTION_RETRY_* env names for backward-compatibility.
Specifically, for each setting (COUNT, DELAY, MAX_DELAY) check os.environ for
the shorter DB_RETRY_* first then fall back to DB_CONNECTION_RETRY_*, and parse
numeric values with float(...) (use int only for COUNT if desired) providing the
same default strings as before.

Comment on lines +29 to +31
# Patch Celery database backend to add connection retry logic
patch_celery_database_backend()

Copy link
Contributor

@coderabbitai coderabbitai bot Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Defer patch until after Celery config and gate with flag

Mirror the change from other Celery modules for consistency and to ensure engine options are loaded.

Apply this diff:

-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
-app.config_from_object(CeleryConfig)
+# Load task modules from all registered Django app configs.
+app.config_from_object(CeleryConfig)
+if not should_use_builtin_retry():
+    # Patch Celery database backend to add connection retry logic
+    patch_celery_database_backend()

Confirm patch_celery_database_backend() is idempotent.


🏁 Script executed:

#!/bin/bash
set -e
# Search for the function definition of patch_celery_database_backend
rg -nP "def\s+patch_celery_database_backend" -C5 --glob "*.py"
# Search for any internal guard or idempotency checks within the function
rg -nP "patch_celery_database_backend\(" -C5 --glob "*.py"

Length of output: 5204


🏁 Script executed:

#!/bin/bash
set -e
# Print lines around the function definition to inspect idempotency guard
sed -n '180,260p' backend/backend/celery_db_retry.py
# Confirm usage of the _celery_backend_patched flag inside the function
rg -nP '_celery_backend_patched' -C5 backend/backend/celery_db_retry.py

Length of output: 5249


Defer and gate patch invocation
Patch is idempotent (guards against double patching via _celery_backend_patched flag and retry-marker) but should be applied after loading Celery config and only when custom retry is enabled—mirror other modules:

-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
-app.config_from_object(CeleryConfig)
+# Load task modules from all registered Django app configs.
+app.config_from_object(CeleryConfig)
+if not should_use_builtin_retry():
+    # Patch Celery database backend to add connection retry logic
+    patch_celery_database_backend()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Patch Celery database backend to add connection retry logic
patch_celery_database_backend()
from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
# Load task modules from all registered Django app configs.
app.config_from_object(CeleryConfig)
if not should_use_builtin_retry():
# Patch Celery database backend to add connection retry logic
patch_celery_database_backend()
🤖 Prompt for AI Agents
In backend/backend/workers/file_processing_callback/file_processing_callback.py
around lines 29 to 31, the unconditional call to patch_celery_database_backend()
must be deferred and gated: remove the direct invocation and instead call
patch_celery_database_backend() only after Celery configuration has been loaded
and only when the custom retry feature/flag is enabled (matching how other
modules do it). Keep the existing idempotency guards in the patch function, but
add a check against the app/settings flag or init sequence and move the call
into the Celery init/boot path so the patch runs once and only when configured.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhammad-ali-e Sounds valid. Please check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +29 to +31
# Patch Celery database backend to add connection retry logic
patch_celery_database_backend()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Align patch ordering with Celery app configuration

Same concern as in celery_service: patch after app.config_from_object(CeleryConfig) and gate behind the built-in retry flag to avoid redundant/double patching.

Apply this diff:

-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
-app.config_from_object(CeleryConfig)
+# Load task modules from all registered Django app configs.
+app.config_from_object(CeleryConfig)
+if not should_use_builtin_retry():
+    # Patch Celery database backend to add connection retry logic
+    patch_celery_database_backend()

Please also confirm the patch is idempotent across worker modules.


🏁 Script executed:

#!/bin/bash
# Search for usage of patch_celery_database_backend
rg -n "patch_celery_database_backend" -C3

# Search for Celery app configuration ordering
rg -n "config_from_object" -C3

# Locate definition of should_use_builtin_retry
rg -n "def should_use_builtin_retry" -C3

Length of output: 7106


Gate and reorder Celery DB retry patch
Move the patch_celery_database_backend() call to immediately after app.config_from_object(CeleryConfig) and wrap it in if not should_use_builtin_retry(): in all three worker entry‐points and in celery_service.py; the built-in flag check prevents redundant patching and _celery_backend_patched makes the patch idempotent.

Applies to:

  • backend/backend/celery_service.py
  • backend/backend/workers/file_processing/file_processing.py
  • backend/backend/workers/file_processing_callback/file_processing_callback.py
🤖 Prompt for AI Agents
In backend/backend/workers/file_processing/file_processing.py around lines 29 to
31, move the call to patch_celery_database_backend() so it occurs immediately
after app.config_from_object(CeleryConfig) and wrap that call in if not
should_use_builtin_retry(): to avoid redundant patching; also ensure the patch
function remains idempotent by relying on the existing _celery_backend_patched
guard (do not call patch_celery_database_backend() anywhere before
config_from_object in this module).

Comment on lines +141 to +149
except Exception as e:
error_type, needs_refresh, use_extended = _classify_database_error(e)

if not DatabaseErrorPatterns.is_retryable_error(error_type):
logger.debug(
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
)
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Catch only DB connection errors, not all Exceptions.

Avoid broad except; limit to OperationalError and InterfaceError.

-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
             error_type, needs_refresh, use_extended = _classify_database_error(e)
 
             if not DatabaseErrorPatterns.is_retryable_error(error_type):
                 logger.debug(
                     LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
                 )
                 raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
error_type, needs_refresh, use_extended = _classify_database_error(e)
if not DatabaseErrorPatterns.is_retryable_error(error_type):
logger.debug(
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
)
raise
except (OperationalError, InterfaceError) as e:
error_type, needs_refresh, use_extended = _classify_database_error(e)
if not DatabaseErrorPatterns.is_retryable_error(error_type):
logger.debug(
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
)
raise
🤖 Prompt for AI Agents
In backend/utils/db_retry.py around lines 141-149, the code currently uses a
broad "except Exception as e" which should be narrowed to only DB
connection-related errors; replace it with "except (OperationalError,
InterfaceError) as e" and add/import those exceptions from the DB library used
in the project (e.g., from sqlalchemy.exc import OperationalError,
InterfaceError) so non-database exceptions continue to propagate unchanged; keep
the existing handling (classify, log non-retryable, raise) inside that narrowed
except block and do not swallow other exceptions.

Comment on lines 315 to 323
except Exception as e:
error_type, needs_refresh, use_extended = _classify_database_error(e)

if not DatabaseErrorPatterns.is_retryable_error(error_type):
logger.debug(
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
)
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Same here: don’t catch all Exceptions in context manager.

Limit to DB connection errors.

-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
             error_type, needs_refresh, use_extended = _classify_database_error(e)
 
             if not DatabaseErrorPatterns.is_retryable_error(error_type):
                 logger.debug(
                     LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
                 )
                 raise
🤖 Prompt for AI Agents
backend/utils/db_retry.py around lines 315 to 323: the context manager currently
catches all Exceptions; change the except clause to only catch database
connection/operational errors (for example sqlalchemy.exc.OperationalError,
sqlalchemy.exc.InterfaceError and related DBAPI errors such as
psycopg2.OperationalError/DatabaseError), import those exception classes at the
top of the file, keep the existing error classification/handling logic inside
that specific except, and allow any other exceptions to propagate (do not
swallow or convert non-DB exceptions).

Comment on lines +29 to 32
@db_retry() # Add retry for connection drops during queryset organization filtering
def get_queryset(self):
organization = UserContext.get_organization()
return super().get_queryset().filter(organization=organization)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove @db_retry from get_queryset — it won’t help due to lazy evaluation.

get_queryset only builds a QuerySet; DB work happens later. The decorator gives a false sense of protection. Apply retries at evaluation sites (e.g., .count(), .exists(), .get(), .update(), .delete()) or provide manager helpers that perform evaluated operations.

-    @db_retry()  # Add retry for connection drops during queryset organization filtering
-    def get_queryset(self):
+    def get_queryset(self):
         organization = UserContext.get_organization()
         return super().get_queryset().filter(organization=organization)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@db_retry() # Add retry for connection drops during queryset organization filtering
def get_queryset(self):
organization = UserContext.get_organization()
return super().get_queryset().filter(organization=organization)
def get_queryset(self):
organization = UserContext.get_organization()
return super().get_queryset().filter(organization=organization)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
backend/utils/db_retry.py (2)

211-218: Narrow exception scope to DB connection errors only.

Catching Exception swallows non-DB bugs. Limit to Django DB connection exceptions as previously requested.

-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
             error_type, needs_refresh, use_extended = _classify_database_error(e)

             if not DatabaseErrorPatterns.is_retryable_error(error_type):
                 logger.debug(
                     LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
                 )
                 raise

388-395: Also narrow the context manager’s except.

Same rationale as above; don’t intercept unrelated exceptions.

-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
backend/backend/celery_db_retry.py (1)

31-35: Broaden detection to all SQLAlchemy errors, not just OperationalError.

Catching only OperationalError misses InterfaceError/DBAPIError subclasses.

-        from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-
-        return isinstance(exception, SQLAlchemyOperationalError)
+        from sqlalchemy.exc import SQLAlchemyError
+        return isinstance(exception, SQLAlchemyError)
🧹 Nitpick comments (6)
backend/utils/db_retry.py (3)

420-426: Log failures with stack traces.

Use logger.exception for MAX_RETRIES_EXCEEDED to keep tracebacks in logs (helps RCA).

-                logger.error(
+                logger.exception(
                     LogMessages.format_message(
                         LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e
                     )
                 )
-                logger.error(
+                logger.exception(
                     LogMessages.format_message(
                         LogMessages.MAX_RETRIES_EXCEEDED,
                         total=final_max_retries + 1,
                         error=e,
                     )
                 )

Also applies to: 246-250


253-255: Remove unreachable fallback.

The loop exits via return/raise; this tail call won’t run.

-    # This should never be reached, but included for completeness
-    return operation()

135-140: Optionally include traceback on refresh failures.

Use logger.exception to retain stack for failed pool refresh attempts.

-        logger.warning(
+        logger.exception(
             LogMessages.format_message(
                 LogMessages.POOL_REFRESH_FAILED, error=refresh_error
             )
         )
backend/backend/celery_db_retry.py (3)

61-63: Remove unused parameter and fix call sites.

func isn’t used in _handle_pool_refresh; drop it and simplify callers.

-def _handle_pool_refresh(
-    func: Callable, error: Exception, retry_count: int, total_retries: int
-) -> None:
+def _handle_pool_refresh(error: Exception, retry_count: int, total_retries: int) -> None:
-                            _handle_pool_refresh(
-                                func, e, retry_count, current_settings["max_retries"] + 1
-                            )
+                            _handle_pool_refresh(
+                                e, retry_count, current_settings["max_retries"] + 1
+                            )

Also applies to: 192-197


209-215: Emit exceptions with tracebacks.

Prefer logger.exception for failure logs.

-                        logger.error(
+                        logger.exception(
                             LogMessages.format_message(
                                 LogMessages.MAX_RETRIES_EXCEEDED,
                                 total=current_settings["max_retries"] + 1,
                                 error=e,
                             )
                         )
-        logger.error(f"Error patching Celery database backend: {e}")
+        logger.exception(f"Error patching Celery database backend: {e}")
-        logger.error(f"Error configuring Celery built-in retry: {e}")
+        logger.exception(f"Error configuring Celery built-in retry: {e}")

Also applies to: 341-341, 378-378


226-242: Avoid blind except and narrow where possible.

Consider catching SQLAlchemyError during dispose to reduce noise; keep a broad fallback only if necessary.

-    except Exception as e:
+    except Exception as e:
         logger.warning(f"Could not dispose SQLAlchemy engine: {e}")

If SQLAlchemy is guaranteed, prefer:

-    except Exception as e:
+    except SQLAlchemyError as e:  # from sqlalchemy.exc import SQLAlchemyError
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 86cff59 and f0979aa.

📒 Files selected for processing (2)
  • backend/backend/celery_db_retry.py (1 hunks)
  • backend/utils/db_retry.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py

61-61: Unused function argument: func

(ARG001)


75-75: Do not catch blind exception: Exception

(BLE001)


159-159: Consider moving this statement to an else block

(TRY300)


209-215: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


240-240: Do not catch blind exception: Exception

(BLE001)


340-340: Do not catch blind exception: Exception

(BLE001)


341-341: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


377-377: Do not catch blind exception: Exception

(BLE001)


378-378: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backend/utils/db_retry.py

135-135: Do not catch blind exception: Exception

(BLE001)


246-250: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


387-387: Consider moving this statement to an else block

(TRY300)


420-426: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
backend/backend/celery_db_retry.py (1)

350-377: Built-in retry config timing is correctpatch_celery_database_backend() runs immediately after each Celery(...) instantiation and before app.config_from_object("backend.celery_config.CeleryConfig"), and the config class pulls the env vars at load time.

Comment on lines +21 to +25
# Context manager usage
with db_retry_context(max_retries=3):
model.save()
other_model.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Doc example promises unsupported behavior.

The “with db_retry_context(...):” example implies automatic re-execution of the block, which Python context managers cannot do. Please remove or rewrite to avoid misuse.

-    # Context manager usage
-    with db_retry_context(max_retries=3):
-        model.save()
-        other_model.delete()
+    # Prefer decorator or direct call for retries.
+    # Context manager is for connection refresh only (no automatic re-run).

Also applies to: 365-370

🤖 Prompt for AI Agents
In backend/utils/db_retry.py around lines 21-25 (and similarly 365-370), the doc
example shows "with db_retry_context(...):" implying the context manager will
automatically re-run the block on failure — which is incorrect; update the docs
to remove that example and replace it with one of two correct patterns: either
show using a retrying wrapper/decorator that re-invokes the callable on failure,
or show an explicit retry loop inside the with-block (e.g., performing retries
around the operation within the context rather than expecting re-execution by
the context manager). Ensure the text clearly states that the context manager
only manages transactional/cleanup behavior and does not retry the enclosed
block, and provide a short correct usage example pattern (decorator/wrapper or
explicit loop) in place of the misleading "with" example.

Comment on lines 384 to 388
while retry_count <= final_max_retries:
try:
yield
return # Success - exit the retry loop
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Context manager retrier is broken (multiple yields).

A @contextmanager must yield exactly once. The while/yield loop will raise “generator didn't stop after throw” on the first exception and cannot re-run the user block. Either remove this API or change semantics.

Minimal safe fix (no retry, just refresh/log) to avoid runtime error if it’s invoked:

-    retry_count = 0
-
-    while retry_count <= final_max_retries:
-        try:
-            yield
-            return  # Success - exit the retry loop
-        except Exception as e:
-            error_type, needs_refresh, use_extended = _classify_database_error(e)
-
-            if not DatabaseErrorPatterns.is_retryable_error(error_type):
-                logger.debug(
-                    LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
-                )
-                raise
-
-            # Update settings for extended retry if needed
-            extended_settings = _update_retry_settings_for_extended(
-                retry_count, use_extended
-            )
-            if extended_settings:
-                final_max_retries = extended_settings["max_retries"]
-                final_base_delay = extended_settings["base_delay"]
-                final_max_delay = extended_settings["max_delay"]
-
-            if retry_count < final_max_retries:
-                retry_count, delay = _handle_context_retry_logic(
-                    retry_count,
-                    final_max_retries,
-                    final_base_delay,
-                    final_max_delay,
-                    final_force_refresh,
-                    e,
-                    error_type,
-                    needs_refresh,
-                )
-                time.sleep(delay)
-                continue
-            else:
-                logger.error(
-                    LogMessages.format_message(
-                        LogMessages.MAX_RETRIES_EXCEEDED,
-                        total=final_max_retries + 1,
-                        error=e,
-                    )
-                )
-                raise
+    try:
+        yield
+    except (OperationalError, InterfaceError) as e:
+        error_type, needs_refresh, _ = _classify_database_error(e)
+        if needs_refresh and final_force_refresh:
+            _handle_connection_pool_refresh(e, 1, 1)
+        if not DatabaseErrorPatterns.is_retryable_error(error_type):
+            logger.debug(
+                LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
+            )
+        raise

And update docs to steer users to the decorator or retry_database_operation for true retries.
Would you like me to replace the context manager with a small “retry_block(fn)” helper to truly re-run a provided callable?

Also applies to: 389-419


Refactor broken contextmanager retrier in backend/utils/db_retry.py
The @contextmanager at lines 384–388 yields inside a retry loop, causing a “generator didn’t stop after throw” error on any exception and preventing retries. Remove or replace this API with either a contextmanager that yields exactly once (e.g. for refresh/log only) or convert to a retry decorator/function (such as retry_database_operation or a retry_block(fn) helper).

🧰 Tools
🪛 Ruff (0.12.2)

387-387: Consider moving this statement to an else block

(TRY300)

🤖 Prompt for AI Agents
In backend/utils/db_retry.py around lines 384–388 the @contextmanager currently
yields inside the retry loop which causes "generator didn't stop after throw"
and prevents retries; replace this broken contextmanager with a retry function
or decorator that invokes the operation inside the retry loop (e.g. implement
retry_database_operation(fn, *args, **kwargs) or a @retry_database_operation
decorator) so the loop calls the target callable and handles exceptions/backs
off and returns its result, or alternatively change the contextmanager to yield
exactly once and perform all retry logic outside the generator (do not yield
inside the retry loop and remove the return-after-yield pattern so exceptions
are caught and retried by the calling wrapper).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
backend/backend/celery_db_retry.py (1)

28-35: Broaden SQLAlchemy exception detection to cover all retryable DB errors.

Catching only OperationalError misses InterfaceError/DBAPIError and friends. Use SQLAlchemyError as the umbrella type.

-def _is_sqlalchemy_error(exception: Exception) -> bool:
-    """Check if exception is a SQLAlchemy error."""
-    try:
-        from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-
-        return isinstance(exception, SQLAlchemyOperationalError)
-    except ImportError:
-        return False
+def _is_sqlalchemy_error(exception: Exception) -> bool:
+    """Check if exception is a SQLAlchemy error."""
+    try:
+        from sqlalchemy.exc import SQLAlchemyError
+        return isinstance(exception, SQLAlchemyError)
+    except ImportError:
+        # SQLAlchemy not installed; treat as non-SQLAlchemy error.
+        return False
backend/utils/db_retry.py (3)

21-25: Fix docs: context manager does not re-execute the block.

A @contextmanager yields once; it cannot re-run the with-block on failure. Update examples to avoid implying auto-retry.

-    # Context manager usage
-    with db_retry_context(max_retries=3):
-        model.save()
-        other_model.delete()
+    # Prefer decorator or retry_database_operation for retries.
+    # The context manager does NOT re-run the block; it only refreshes connections on certain errors.
-    Example:
-        with db_retry_context(max_retries=5, force_refresh=True):
-            model.save()
-            other_model.delete()
-            MyModel.objects.filter(...).update(...)
+    Example:
+        # Use decorator or retry_database_operation for automatic retries.
+        # db_retry_context is for connection hygiene only; it will still re-raise the error.

Also applies to: 398-403


207-214: Catch only DB connection errors in the core retry loop.

Limit to OperationalError/InterfaceError; let other exceptions propagate immediately.

-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
             error_type, needs_refresh, use_extended = _classify_database_error(e)

377-451: Context manager retries are invalid; yield exactly once and re-raise.

Current loop yields multiple times, which breaks contextlib semantics (“generator didn't stop after throw”). Replace with a single-yield context that optionally refreshes connections, then re-raises.

 @contextmanager
 def db_retry_context(
@@
-    retry_count = 0
-
-    while retry_count <= current_settings["max_retries"]:
-        try:
-            yield
-            return  # Success - exit the retry loop
-        except Exception as e:
-            error_type, needs_refresh, use_extended = _classify_database_error(e)
-
-            if not DatabaseErrorPatterns.is_retryable_error(error_type):
-                _handle_context_non_retryable_error(e)
-
-            # Update settings for extended retry if needed
-            current_settings = _update_context_retry_settings(
-                retry_count, use_extended, current_settings
-            )
-
-            if retry_count < current_settings["max_retries"]:
-                retry_count, delay = _handle_context_retry_logic(
-                    retry_count,
-                    current_settings["max_retries"],
-                    current_settings["base_delay"],
-                    current_settings["max_delay"],
-                    current_settings["force_refresh"],
-                    e,
-                    error_type,
-                    needs_refresh,
-                )
-                time.sleep(delay)
-                continue
-            else:
-                _handle_context_max_retries_exceeded(
-                    e, current_settings["max_retries"] + 1
-                )
+    try:
+        yield
+    except (OperationalError, InterfaceError) as e:
+        error_type, needs_refresh, _ = _classify_database_error(e)
+        if not DatabaseErrorPatterns.is_retryable_error(error_type):
+            _handle_context_non_retryable_error(e)
+        # Hygiene only: optionally refresh pool, then re-raise.
+        if needs_refresh and current_settings["force_refresh"]:
+            _handle_connection_pool_refresh(e, 1, 1)
+        _handle_context_max_retries_exceeded(e, 1)
🧹 Nitpick comments (7)
backend/backend/celery_db_retry.py (5)

207-214: Avoid broad exception catch or explicitly suppress the linter.

The wrapper currently catches Exception, then re-raises non-SQLAlchemy errors. Either narrow the except to SQLAlchemyError, or keep the guard and add an inline noqa to document intent.

Minimal change (keep behavior, fix BLE001):

-                except Exception as e:
+                except Exception as e:  # noqa: BLE001 - non-SQLAlchemy errors are re-raised immediately
                     if not _is_sqlalchemy_error(e):
                         _handle_non_sqlalchemy_error(e)

Alternative (narrowed catch):

+            try:
+                from sqlalchemy.exc import SQLAlchemyError
+            except Exception:
+                SQLAlchemyError = tuple()  # type: ignore[assignment]
...
-                except Exception as e:
+                except SQLAlchemyError as e:
                     # classified below
                     ...

351-356: Mark all patched methods to prevent re-patching, not just _store_result.

-        # Mark as patched to prevent double patching
-        DatabaseBackend._store_result._retry_patched = True
+        # Mark as patched to prevent double patching
+        DatabaseBackend._store_result._retry_patched = True
+        try:
+            DatabaseBackend._get_task_meta_for._retry_patched = True
+            DatabaseBackend.get_result._retry_patched = True
+            if hasattr(DatabaseBackend, "delete_result"):
+                DatabaseBackend.delete_result._retry_patched = True
+        except Exception:
+            pass
         _celery_backend_patched = True

360-360: Use logger.exception to preserve tracebacks on failures.

-        logger.error(f"Error patching Celery database backend: {e}")
+        logger.exception("Error patching Celery database backend")
-        logger.error(f"Error configuring Celery built-in retry: {e}")
+        logger.exception("Error configuring Celery built-in retry")

Also applies to: 396-397


126-131: Add jitter to backoff to reduce thundering herd.

-    delay = min(
-        current_settings["base_delay"] * (2**retry_count),
-        current_settings["max_delay"],
-    )
+    delay = min(
+        current_settings["base_delay"] * (2**retry_count),
+        current_settings["max_delay"],
+    )
+    # add +/-20% jitter
+    try:
+        import random
+        jitter = 0.2 * delay
+        delay = max(0.0, min(current_settings["max_delay"], delay + random.uniform(-jitter, jitter)))
+    except Exception:
+        pass

414-427: Expose pool tuning via env and add pool_timeout for faster failover.

     return {
         # Connection health checking
         "pool_pre_ping": True,  # Test connections before use
-        # Minimal pooling (let PgBouncer handle the real pooling)
-        "pool_size": 5,  # Small pool since PgBouncer handles real pooling
-        "max_overflow": 0,  # No overflow, rely on PgBouncer
+        # Minimal pooling (let PgBouncer handle real pooling)
+        "pool_size": RetryConfiguration.get_setting_value("CELERY_DB_POOL_SIZE", 5),
+        "max_overflow": RetryConfiguration.get_setting_value("CELERY_DB_MAX_OVERFLOW", 0),
+        "pool_timeout": RetryConfiguration.get_setting_value("CELERY_DB_POOL_TIMEOUT", 10),
         "pool_recycle": 3600,  # Recycle connections every hour
backend/utils/db_retry.py (2)

246-250: Preserve traceback when max retries are exceeded.

-                logger.error(
+                logger.exception(
                     LogMessages.format_message(
                         LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e
                     )
                 )

229-233: Add jitter to backoff to spread retries under load.

-                delay = min(base_delay * (2**retry_count), max_delay)
+                delay = min(base_delay * (2**retry_count), max_delay)
+                # add +/-20% jitter
+                try:
+                    import random
+                    jitter = 0.2 * delay
+                    delay = max(0.0, min(max_delay, delay + random.uniform(-jitter, jitter)))
+                except Exception:
+                    pass

Add import (top of file):

import random  # optional: for jitter
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between f0979aa and 78a8660.

📒 Files selected for processing (2)
  • backend/backend/celery_db_retry.py (1 hunks)
  • backend/utils/db_retry.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/utils/db_retry.py

135-135: Do not catch blind exception: Exception

(BLE001)


246-250: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


422-422: Consider moving this statement to an else block

(TRY300)


423-423: Do not catch blind exception: Exception

(BLE001)

backend/backend/celery_db_retry.py

73-73: Do not catch blind exception: Exception

(BLE001)


210-210: Consider moving this statement to an else block

(TRY300)


211-211: Do not catch blind exception: Exception

(BLE001)


259-259: Do not catch blind exception: Exception

(BLE001)


359-359: Do not catch blind exception: Exception

(BLE001)


360-360: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


396-396: Do not catch blind exception: Exception

(BLE001)


397-397: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (5)
backend/utils/db_retry.py (4)

21-25: Fix misleading context manager example (context managers don't auto-retry).

This implies automatic re-execution of the block, which a contextmanager cannot do. Replace with a non-misleading note.

-    # Context manager usage
-    with db_retry_context(max_retries=3):
-        model.save()
-        other_model.delete()
+    # Prefer decorator or direct call for retries.
+    # Context manager is for connection refresh only (no automatic re-run).

400-405: Correct db_retry_context example to avoid implying auto-retry.

Same issue as the top-level example; please replace or remove.

-        with db_retry_context(max_retries=5, force_refresh=True):
-            model.save()
-            other_model.delete()
-            MyModel.objects.filter(...).update(...)
+        # Context manager provides connection refresh/logging only; it does not re-run the block.
+        # Prefer @db_retry() or retry_database_operation(...) for retries.

207-215: Catch only DB connection errors in _execute_with_retry; log with stack; remove unreachable code.

Avoid broad except; propagate non-DB errors unchanged. Also switch to logging.exception and drop the dead return.

-    while retry_count <= max_retries:
+    while retry_count <= max_retries:
         try:
             logger.debug(LogMessages.EXECUTING_WITH_RETRY)
             return operation()
-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
             error_type, needs_refresh, use_extended = _classify_database_error(e)
@@
-            else:
-                logger.error(
+            else:
+                logger.exception(
                     LogMessages.format_message(
                         LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e
                     )
                 )
                 raise
-
-    # This should never be reached, but included for completeness
-    return operation()

Also applies to: 246-255


419-453: Context manager retrier is broken (multiple yields). Make it yield once and stop retrying here.

A @contextmanager must yield exactly once. Current while/yield loop will raise “generator didn't stop after throw” and never re-run the block. Convert to a single-yield manager that optionally refreshes the pool and re-raises, and keep retries to the decorator/wrapper.

-    retry_count = 0
-
-    while retry_count <= current_settings["max_retries"]:
-        try:
-            yield
-            return  # Success - exit the retry loop
-        except Exception as e:
-            error_type, needs_refresh, use_extended = _classify_database_error(e)
-
-            if not DatabaseErrorPatterns.is_retryable_error(error_type):
-                _handle_context_non_retryable_error(e)
-
-            # Update settings for extended retry if needed
-            current_settings = _update_context_retry_settings(
-                retry_count, use_extended, current_settings
-            )
-
-            if retry_count < current_settings["max_retries"]:
-                retry_count, delay = _handle_context_retry_logic(
-                    retry_count,
-                    current_settings["max_retries"],
-                    current_settings["base_delay"],
-                    current_settings["max_delay"],
-                    current_settings["force_refresh"],
-                    e,
-                    error_type,
-                    needs_refresh,
-                )
-                time.sleep(delay)
-                continue
-            else:
-                _handle_context_max_retries_exceeded(
-                    e, current_settings["max_retries"] + 1
-                )
+    try:
+        yield
+    except (OperationalError, InterfaceError) as e:
+        error_type, needs_refresh, _ = _classify_database_error(e)
+        if needs_refresh and current_settings["force_refresh"]:
+            # single refresh attempt; no retry loop here
+            _handle_connection_pool_refresh(e, retry_count=1, max_retries=1)
+        if not DatabaseErrorPatterns.is_retryable_error(error_type):
+            _handle_context_non_retryable_error(e)
+        # retry policy belongs to decorator/_execute_with_retry; re-raise
+        raise
backend/backend/celery_db_retry.py (1)

28-35: Broaden SQLAlchemy exception check to avoid missing retryable errors.

Use SQLAlchemyError (or DBAPIError) instead of just OperationalError so InterfaceError/DBAPIError subclasses are also retried.

-def _is_sqlalchemy_error(exception: Exception) -> bool:
-    """Check if exception is a SQLAlchemy error."""
-    try:
-        from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-
-        return isinstance(exception, SQLAlchemyOperationalError)
+def _is_sqlalchemy_error(exception: Exception) -> bool:
+    """Check if exception is a SQLAlchemy error (inclusive)."""
+    try:
+        from sqlalchemy.exc import SQLAlchemyError
+        return isinstance(exception, SQLAlchemyError)
     except ImportError:
         return False
🧹 Nitpick comments (5)
backend/utils/db_retry.py (2)

135-140: Preserve traceback on pool refresh failure.

Use logging.exception to retain stack; optional to keep broad catch here since refresh errors are unexpected.

-    except Exception as refresh_error:
-        logger.warning(
+    except Exception as refresh_error:
+        logger.exception(
             LogMessages.format_message(
                 LogMessages.POOL_REFRESH_FAILED, error=refresh_error
             )
         )

230-244: Optional: add jitter to backoff to reduce thundering herd.

Small randomized jitter helps when many workers retry simultaneously.

+import random
@@
-                delay = min(base_delay * (2**retry_count), max_delay)
+                delay = min(base_delay * (2**retry_count), max_delay)
+                # 10% jitter
+                delay += random.uniform(0, 0.1 * delay)

Also applies to: 33-34

backend/backend/celery_db_retry.py (3)

380-382: Prefer logger.exception to capture stack trace.

Improves debuggability and satisfies TRY400.

-    except Exception as e:
-        logger.error(f"Error patching Celery database backend: {e}")
+    except Exception:
+        logger.exception("Error patching Celery database backend")

417-419: Prefer logger.exception for configuration errors.

Captures stack trace and aligns with linter hint.

-    except Exception as e:
-        logger.error(f"Error configuring Celery built-in retry: {e}")
+    except Exception:
+        logger.exception("Error configuring Celery built-in retry")

422-449: PgBouncer optimization (optional): allow NullPool mode.

For PgBouncer in transaction mode, SQLAlchemy’s NullPool avoids pool pinning. Consider a toggle to use NullPool.

 def get_celery_db_engine_options():
@@
-    return {
+    opts = {
         # Connection health checking
         "pool_pre_ping": True,  # Test connections before use
         # Minimal pooling (let PgBouncer handle the real pooling)
         "pool_size": 5,  # Small pool since PgBouncer handles real pooling
         "max_overflow": 0,  # No overflow, rely on PgBouncer
         "pool_recycle": 3600,  # Recycle connections every hour
@@
         "echo": RetryConfiguration.get_setting_value("CELERY_DB_ECHO_SQL", False),
-    }
+    }
+    # Optional: NullPool for PgBouncer transaction mode
+    if RetryConfiguration.get_setting_value("CELERY_DB_USE_NULLPOOL", False):
+        try:
+            from sqlalchemy.pool import NullPool
+            opts.pop("pool_size", None)
+            opts.pop("max_overflow", None)
+            opts["poolclass"] = NullPool
+        except Exception:
+            logger.warning("Requested NullPool but SQLAlchemy not available; continuing with default pool options")
+    return opts
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 78a8660 and 7c0d03c.

📒 Files selected for processing (2)
  • backend/backend/celery_db_retry.py (1 hunks)
  • backend/utils/db_retry.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py

75-75: Do not catch blind exception: Exception

(BLE001)


248-248: Consider moving this statement to an else block

(TRY300)


249-249: Do not catch blind exception: Exception

(BLE001)


280-280: Do not catch blind exception: Exception

(BLE001)


380-380: Do not catch blind exception: Exception

(BLE001)


381-381: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


417-417: Do not catch blind exception: Exception

(BLE001)


418-418: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backend/utils/db_retry.py

135-135: Do not catch blind exception: Exception

(BLE001)


246-250: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


424-424: Consider moving this statement to an else block

(TRY300)


425-425: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (6)
backend/utils/db_retry.py (5)

85-111: Error classification reads clean and conservative.

Good guard on non-Django DB exceptions and extended retry toggle for DATABASE_UNAVAILABLE.


257-315: Decorator path LGTM.

Separation of defaults and execution via _execute_with_retry is tidy.


455-501: retry_database_operation wrapper LGTM.

Consistent with decorator path; good defaults handling.


504-521: quick_retry helper LGTM.

Thin, predictable wrapper over retry_database_operation.


379-405: Resolve: no external usages of db_retry_context found; auto-retry semantics unaffected. Search across the codebase only returned the inline examples in db_retry.py—no callers rely on this context manager’s retry behavior.

backend/backend/celery_db_retry.py (1)

304-312: Ensure patch runs before config loading – Confirm in celery_service.py that patch_celery_database_backend() is invoked before app.config_from_object(CeleryConfig) so that any CELERY_* retry env vars are applied by the Celery config.

Comment on lines 108 to 112
def _handle_non_sqlalchemy_error(error: BaseException) -> None:
"""Handle non-SQLAlchemy errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: bare raise here will fail outside the except scope.

Re-raise the captured exception explicitly.

 def _handle_non_sqlalchemy_error(error: BaseException) -> None:
     """Handle non-SQLAlchemy errors by logging and re-raising."""
     logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
-    raise
+    raise error
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _handle_non_sqlalchemy_error(error: BaseException) -> None:
"""Handle non-SQLAlchemy errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
raise
def _handle_non_sqlalchemy_error(error: BaseException) -> None:
"""Handle non-SQLAlchemy errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
raise error
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 108 to 112, the function
_handle_non_sqlalchemy_error uses a bare "raise" outside an except scope which
will fail; change it to explicitly re-raise the captured exception (e.g., raise
error) after logging so the original exception is propagated, and ensure the
function signature/type hints remain correct.

Comment on lines 114 to 118
def _handle_non_retryable_error(error: BaseException) -> None:
"""Handle non-retryable errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: bare raise outside except scope.

Same issue; re-raise the provided error explicitly.

 def _handle_non_retryable_error(error: BaseException) -> None:
     """Handle non-retryable errors by logging and re-raising."""
     logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
-    raise
+    raise error
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _handle_non_retryable_error(error: BaseException) -> None:
"""Handle non-retryable errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
raise
def _handle_non_retryable_error(error: BaseException) -> None:
"""Handle non-retryable errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
raise error
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 114 to 118, the function
_handle_non_retryable_error currently uses a bare raise outside of an except
block which is invalid; change it to explicitly re-raise the provided error by
raising the passed-in error (e.g., raise error) after logging so the original
exception is propagated correctly.

Comment on lines 149 to 159
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None:
"""Handle the case when max retries are exceeded."""
logger.error(
LogMessages.format_message(
LogMessages.MAX_RETRIES_EXCEEDED,
total=total_retries,
error=error,
)
)
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: bare raise outside except scope.

This will raise RuntimeError: No active exception to reraise. Re-raise the original error.

 def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None:
     """Handle the case when max retries are exceeded."""
     logger.error(
         LogMessages.format_message(
             LogMessages.MAX_RETRIES_EXCEEDED,
             total=total_retries,
             error=error,
         )
     )
-    raise
+    raise error
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None:
"""Handle the case when max retries are exceeded."""
logger.error(
LogMessages.format_message(
LogMessages.MAX_RETRIES_EXCEEDED,
total=total_retries,
error=error,
)
)
raise
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None:
"""Handle the case when max retries are exceeded."""
logger.error(
LogMessages.format_message(
LogMessages.MAX_RETRIES_EXCEEDED,
total=total_retries,
error=error,
)
)
raise error
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 149 to 159, the function uses
a bare `raise` outside an except block which triggers "No active exception to
reraise"; replace the bare `raise` with re-raising the original exception
instance (e.g., `raise error`) so the original error is propagated, and keep the
logging call intact.

Comment on lines +235 to +261
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
logger.debug(
LogMessages.format_message(
LogMessages.CELERY_OPERATION_START, operation=func.__name__
)
)
retry_count = 0
while retry_count <= max_retries:
try:
result = func(*args, **kwargs)
_log_success(retry_count)
return result
except Exception as e:
retry_info = _process_celery_exception(
e, retry_count, max_retries, base_delay, max_delay
)
if retry_info:
retry_count, delay = retry_info
time.sleep(delay)
continue

# This should never be reached, but included for completeness
return func(*args, **kwargs)

return wrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Retry loop can terminate early and includes unreachable fallback; simplify and rely on classifier to stop.

  • The while retry_count <= max_retries cap conflicts with dynamic "extended" settings returned by _get_retry_settings_for_error, causing early termination.
  • The fallback call at Line 259 is effectively unreachable and unsafe if reached.
  • Also address TRY300 by moving success handling to else:.
-        def wrapper(*args, **kwargs) -> Any:
+        def wrapper(*args, **kwargs) -> Any:
             logger.debug(
                 LogMessages.format_message(
                     LogMessages.CELERY_OPERATION_START, operation=func.__name__
                 )
             )
             retry_count = 0
-            while retry_count <= max_retries:
-                try:
-                    result = func(*args, **kwargs)
-                    _log_success(retry_count)
-                    return result
-                except Exception as e:
-                    retry_info = _process_celery_exception(
-                        e, retry_count, max_retries, base_delay, max_delay
-                    )
-                    if retry_info:
-                        retry_count, delay = retry_info
-                        time.sleep(delay)
-                        continue
-
-            # This should never be reached, but included for completeness
-            return func(*args, **kwargs)
+            while True:
+                try:
+                    result = func(*args, **kwargs)
+                except Exception as e:
+                    retry_info = _process_celery_exception(
+                        e, retry_count, max_retries, base_delay, max_delay
+                    )
+                    # If we get here, error is retryable and we have retry budget left
+                    retry_count, delay = retry_info  # type: ignore[misc]
+                    time.sleep(delay)
+                    continue
+                else:
+                    _log_success(retry_count)
+                    return result

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

248-248: Consider moving this statement to an else block

(TRY300)


249-249: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 235 to 261, the retry loop
uses a capped while retry_count <= max_retries which can prematurely stop when
dynamic retry settings from _get_retry_settings_for_error should control
continuation; remove the fixed cap and rewrite the loop as an infinite loop
(while True), move the success logging into a while/try/else (log and return in
the else branch after no exception), on exception call _process_celery_exception
and if it returns retry_info unpack and apply retry_count and delay, sleep and
continue, otherwise re-raise the exception (do not call the function again);
also remove the unreachable fallback return at the end of the wrapper; ensure
retry_count is updated from retry_info rather than relying on the loop
condition.

# Example:
# CELERY_BACKEND_DB_NAME=unstract_celery_db

# Database connection retry settings (for handling connection drops with PgBouncer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PgBouncer is not part of the OSS, is this still required then?

Comment on lines 60 to 68
print(
f"[Celery Config] Using built-in retry: max_retries={result_backend_max_retries}, "
)
print(
f"base_sleep={result_backend_base_sleep_between_retries_ms}ms, max_sleep={result_backend_max_sleep_between_retries_ms}ms"
)
else:
# Custom retry is handled by patch_celery_database_backend()
print("[Celery Config] Using custom retry system (patching enabled)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhammad-ali-e use a logger instead of print statements

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
backend/backend/celery_db_retry.py (2)

28-35: Broaden detection to all SQLAlchemy errors, not just OperationalError.

This regresses the earlier fix; use SQLAlchemyError to include InterfaceError/DBAPIError subclasses.

 def _is_sqlalchemy_error(exception: Exception) -> bool:
     """Check if exception is a SQLAlchemy error."""
     try:
-        from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-
-        return isinstance(exception, SQLAlchemyOperationalError)
+        from sqlalchemy.exc import SQLAlchemyError
+        return isinstance(exception, SQLAlchemyError)
     except ImportError:
         return False

234-261: Retry loop can exit early and then re-executes the operation once more; make loop sentinelless and drop fallback.

Current cap (retry_count <= max_retries) fights dynamic settings; the fallback “return func()” risks a duplicate call. Use try/else within an infinite loop.

-        def wrapper(*args, **kwargs) -> Any:
+        def wrapper(*args, **kwargs) -> Any:
             logger.debug(
                 LogMessages.format_message(
                     LogMessages.CELERY_OPERATION_START, operation=func.__name__
                 )
             )
             retry_count = 0
-            while retry_count <= max_retries:
-                try:
-                    result = func(*args, **kwargs)
-                    _log_success(retry_count)
-                    return result
-                except Exception as e:
-                    retry_info = _process_celery_exception(
-                        e, retry_count, max_retries, base_delay, max_delay
-                    )
-                    if retry_info:
-                        retry_count, delay = retry_info
-                        time.sleep(delay)
-                        continue
-
-            # This should never be reached, but included for completeness
-            return func(*args, **kwargs)
+            while True:
+                try:
+                    result = func(*args, **kwargs)
+                except Exception as e:
+                    retry_info = _process_celery_exception(
+                        e, retry_count, max_retries, base_delay, max_delay
+                    )
+                    # Not retryable: _process_celery_exception raises.
+                    retry_count, delay = retry_info  # type: ignore[misc]
+                    time.sleep(delay)
+                    continue
+                else:
+                    _log_success(retry_count)
+                    return result
backend/utils/db_retry.py (3)

207-218: Catch only DB connection errors; let other exceptions propagate.

Avoids masking non-DB bugs and satisfies BLE001.

-        except Exception as e:
+        except (OperationalError, InterfaceError) as e:
             error_type, needs_refresh, use_extended = _classify_database_error(e)
 
             if not DatabaseErrorPatterns.is_retryable_error(error_type):
                 logger.debug(
                     LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
                 )
                 raise

21-25: Docs imply unsupported behavior; clarify context manager semantics.

The context manager does not (and cannot) auto-retry the block.

-    # Context manager usage
-    with db_retry_context(max_retries=3):
-        model.save()
-        other_model.delete()
+    # Context manager usage (no automatic re-run)
+    with db_retry_context():
+        model.save()  # Errors will be logged; connection may be refreshed; exception re-raised.
@@
-    Example:
-        with db_retry_context(max_retries=5, force_refresh=True):
-            model.save()
-            other_model.delete()
-            MyModel.objects.filter(...).update(...)
+    Example:
+        with db_retry_context(force_refresh=True):
+            model.save()

Also applies to: 396-401


417-449: Context manager retrier is invalid (yields inside a loop). Yield exactly once; do not attempt retries here.

Current code will fail with “generator didn't stop after throw” and cannot re-run the block.

 @contextmanager
 def db_retry_context(
@@
 ):
-    """Context manager for retrying database operations on connection errors.
+    """Context manager for connection refresh/logging on DB connection errors.
@@
-    retry_count = 0
-
-    while retry_count <= current_settings["max_retries"]:
-        try:
-            yield
-            return  # Success - exit the retry loop
-        except Exception as e:
-            error_type, needs_refresh, use_extended = _classify_database_error(e)
-
-            if not DatabaseErrorPatterns.is_retryable_error(error_type):
-                _log_context_non_retryable_error(e)
-                raise
-
-            # Update settings for extended retry if needed
-            current_settings = _update_context_retry_settings(
-                retry_count, use_extended, current_settings
-            )
-
-            if retry_count < current_settings["max_retries"]:
-                retry_count, delay = _handle_context_retry_logic(
-                    retry_count,
-                    current_settings["max_retries"],
-                    current_settings["base_delay"],
-                    current_settings["max_delay"],
-                    current_settings["force_refresh"],
-                    e,
-                    error_type,
-                    needs_refresh,
-                )
-                time.sleep(delay)
-                continue
-            else:
-                _log_context_max_retries_exceeded(e, current_settings["max_retries"] + 1)
-                raise
+    try:
+        yield
+    except (OperationalError, InterfaceError) as e:
+        error_type, needs_refresh, _ = _classify_database_error(e)
+        if needs_refresh and current_settings["force_refresh"]:
+            _handle_connection_pool_refresh(e, 1, current_settings["max_retries"])
+        if not DatabaseErrorPatterns.is_retryable_error(error_type):
+            _log_context_non_retryable_error(e)
+        raise
🧹 Nitpick comments (4)
backend/backend/celery_db_retry.py (2)

380-381: Prefer logger.exception for stack traces when handling unexpected failures.

Improves debuggability without changing behavior.

-        logger.error(f"Error patching Celery database backend: {e}")
+        logger.exception("Error patching Celery database backend")
@@
-        logger.error(f"Error configuring Celery built-in retry: {e}")
+        logger.exception("Error configuring Celery built-in retry")

Also applies to: 416-418


433-447: Add pool_timeout to avoid unbounded waits when PgBouncer is saturated.

Small safety knob, sourced from centralized config.

     return {
         # Connection health checking
         "pool_pre_ping": True,  # Test connections before use
         # Minimal pooling (let PgBouncer handle the real pooling)
         "pool_size": 5,  # Small pool since PgBouncer handles real pooling
         "max_overflow": 0,  # No overflow, rely on PgBouncer
         "pool_recycle": 3600,  # Recycle connections every hour
+        "pool_timeout": RetryConfiguration.get_setting_value(
+            "CELERY_DB_POOL_TIMEOUT", 30
+        ),
         # Connection timeouts using centralized configuration
         "connect_args": {
             "connect_timeout": RetryConfiguration.get_setting_value(
                 "CELERY_DB_CONNECT_TIMEOUT", 30
             ),
         },
backend/utils/db_retry.py (2)

253-255: Remove unreachable fallback call.

The loop already returns on success or raises on failure.

-    # This should never be reached, but included for completeness
-    return operation()
+    # Unreachable

324-330: Log with stack trace when retries are exhausted.

Use exception-level logging to capture traceback.

-    logger.error(
+    logger.exception(
         LogMessages.format_message(
             LogMessages.MAX_RETRIES_EXCEEDED,
             total=total_retries,
             error=error,
         )
     )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 7c0d03c and 0eeae21.

📒 Files selected for processing (2)
  • backend/backend/celery_db_retry.py (1 hunks)
  • backend/utils/db_retry.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py

75-75: Do not catch blind exception: Exception

(BLE001)


247-247: Consider moving this statement to an else block

(TRY300)


248-248: Do not catch blind exception: Exception

(BLE001)


279-279: Do not catch blind exception: Exception

(BLE001)


379-379: Do not catch blind exception: Exception

(BLE001)


380-380: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


416-416: Do not catch blind exception: Exception

(BLE001)


417-417: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backend/utils/db_retry.py

135-135: Do not catch blind exception: Exception

(BLE001)


246-250: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


420-420: Consider moving this statement to an else block

(TRY300)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build

Comment on lines +171 to +174
if not _is_sqlalchemy_error(error):
_log_non_sqlalchemy_error(error)
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix invalid bare raise usage (outside except scope).

These will raise “No active exception to reraise”; re-raise the caught error explicitly.

-    if not _is_sqlalchemy_error(error):
-        _log_non_sqlalchemy_error(error)
-        raise
+    if not _is_sqlalchemy_error(error):
+        _log_non_sqlalchemy_error(error)
+        raise error
@@
-    if not DatabaseErrorPatterns.is_retryable_error(error_type):
-        _log_non_retryable_error(error)
-        raise
+    if not DatabaseErrorPatterns.is_retryable_error(error_type):
+        _log_non_retryable_error(error)
+        raise error
@@
-    else:
-        _log_max_retries_exceeded(error, current_settings["max_retries"] + 1)
-        raise
+    else:
+        _log_max_retries_exceeded(error, current_settings["max_retries"] + 1)
+        raise error

Also applies to: 178-181, 192-194

🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 171-174 (and similarly at
178-181 and 192-194), the code uses bare `raise` outside of an except block
which causes "No active exception to reraise"; change those bare `raise`
statements to explicitly re-raise the caught exception (e.g., `raise error`) so
the original exception is propagated correctly, and ensure the variable name
matches the caught exception in that scope.

Copy link
Contributor

filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_logs}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup\_skip}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_client\_init}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config\_without\_mount}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_run\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_for\_sidecar}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_sidecar\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{11}}$$ $$\textcolor{#23d18b}{\tt{11}}$$

Copy link

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
backend/backend/celery_db_retry.py (3)

28-35: Broaden SQLAlchemy exception detection to avoid missing retryable errors.

Catching only OperationalError will skip InterfaceError/DBAPIError subclasses and others. Use SQLAlchemyError as the base.

-def _is_sqlalchemy_error(exception: Exception) -> bool:
+def _is_sqlalchemy_error(exception: BaseException) -> bool:
     """Check if exception is a SQLAlchemy error."""
     try:
-        from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-
-        return isinstance(exception, SQLAlchemyOperationalError)
+        from sqlalchemy.exc import SQLAlchemyError
+        return isinstance(exception, SQLAlchemyError)
     except ImportError:
         return False

171-174: Fix invalid bare raise usages (outside except scope).

These currently trigger “No active exception to reraise”. Re-raise the captured error explicitly.

-    if not _is_sqlalchemy_error(error):
-        _log_non_sqlalchemy_error(error)
-        raise
+    if not _is_sqlalchemy_error(error):
+        _log_non_sqlalchemy_error(error)
+        raise error
@@
-    if not DatabaseErrorPatterns.is_retryable_error(error_type):
-        _log_non_retryable_error(error)
-        raise
+    if not DatabaseErrorPatterns.is_retryable_error(error_type):
+        _log_non_retryable_error(error)
+        raise error
@@
-    else:
-        _log_max_retries_exceeded(error, current_settings["max_retries"] + 1)
-        raise
+    else:
+        _log_max_retries_exceeded(error, current_settings["max_retries"] + 1)
+        raise error

Also applies to: 178-181, 192-194


243-258: Rewrite retry loop to honor dynamic retry settings and remove unreachable fallback.

The capped while condition can terminate early when extended settings are returned; also the fallback call is unreachable. Move success logging to try/else and make the truthiness check explicit to avoid (0, 0) bugs.

-            retry_count = 0
-            while retry_count <= max_retries:
-                try:
-                    result = func(*args, **kwargs)
-                    _log_success(retry_count)
-                    return result
-                except Exception as e:
-                    retry_info = _process_celery_exception(
-                        e, retry_count, max_retries, base_delay, max_delay
-                    )
-                    if retry_info:
-                        retry_count, delay = retry_info
-                        time.sleep(delay)
-                        continue
-
-            # This should never be reached, but included for completeness
-            return func(*args, **kwargs)
+            retry_count = 0
+            while True:
+                try:
+                    result = func(*args, **kwargs)
+                except Exception as e:  # noqa: BLE001 - unify retry classification path
+                    retry_info = _process_celery_exception(
+                        e, retry_count, max_retries, base_delay, max_delay
+                    )
+                    # _process_celery_exception either raises or returns a tuple
+                    if retry_info is not None:
+                        retry_count, delay = retry_info
+                        time.sleep(delay)
+                        continue
+                    raise
+                else:
+                    _log_success(retry_count)
+                    return result
backend/backend/celery_config.py (2)

7-8: Fix import path for retry helpers; current path will fail at runtime.

-from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry
+from backend.backend.celery_db_retry import (
+    get_celery_db_engine_options,
+    should_use_builtin_retry,
+)
#!/bin/bash
# Verify module path resolution matches repo layout
fd -a 'celery_db_retry.py'
python - <<'PY'
import importlib
mods = [
    "backend.celery_db_retry",
    "backend.backend.celery_db_retry",
]
for m in mods:
    try:
        importlib.import_module(m)
        print("OK:", m)
    except Exception as e:
        print("FAIL:", m, "->", e)
PY

44-46: Use database_engine_options for SQLAlchemy backend; result_backend_transport_options is ignored.

-    # Database backend engine options for PgBouncer compatibility
-    result_backend_transport_options = get_celery_db_engine_options()
+    # SQLAlchemy engine options for DatabaseBackend (passed to create_engine)
+    database_engine_options = get_celery_db_engine_options()
#!/bin/bash
# Introspect Celery's DatabaseBackend to confirm it reads database_engine_options
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend
src = inspect.getsource(DatabaseBackend.__init__)
print("database_engine_options" in src, "create_engine" in src)
print("\n--- Snippet ---")
for line in src.splitlines():
    if "database_engine_options" in line or "create_engine" in line:
        print(line)
PY
🧹 Nitpick comments (5)
backend/backend/celery_db_retry.py (5)

279-280: Prefer logger.exception and document broad catch.

Use logger.exception for traceback and explicitly justify the broad catch for dispose() variability.

-    except Exception as e:
-        logger.warning(f"Could not dispose SQLAlchemy engine: {e}")
+    except Exception:  # noqa: BLE001 - engine.dispose may raise varying exception types
+        logger.exception("Could not dispose SQLAlchemy engine")

377-381: Log tracebacks on patch failure and annotate broad catch.

-    except Exception as e:
-        logger.error(f"Error patching Celery database backend: {e}")
+    except Exception:  # noqa: BLE001
+        logger.exception("Error patching Celery database backend")

416-419: Log tracebacks on built-in retry configuration failure and annotate broad catch.

-    except Exception as e:
-        logger.error(f"Error configuring Celery built-in retry: {e}")
+    except Exception:  # noqa: BLE001
+        logger.exception("Error configuring Celery built-in retry")

60-82: Optional: throttle pool refresh logs to reduce noise under flapping failures.

Consider downgrading the first warning to info after N consecutive attempts or coalescing repeated messages.


118-145: Backoff computation OK; consider adding jitter to avoid thundering herd.

Add decorrelated jitter to spread load when many workers retry concurrently.

Example:

-    delay = min(
-        current_settings["base_delay"] * (2**retry_count),
-        current_settings["max_delay"],
-    )
+    import random
+    exp = current_settings["base_delay"] * (2**retry_count)
+    jittered = random.uniform(exp / 2.0, exp)  # decorrelated jitter
+    delay = min(jittered, current_settings["max_delay"])
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 0eeae21 and e606f1e.

📒 Files selected for processing (3)
  • backend/backend/celery_config.py (2 hunks)
  • backend/backend/celery_db_retry.py (1 hunks)
  • backend/sample.env (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • backend/sample.env
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py

75-75: Do not catch blind exception: Exception

(BLE001)


247-247: Consider moving this statement to an else block

(TRY300)


248-248: Do not catch blind exception: Exception

(BLE001)


279-279: Do not catch blind exception: Exception

(BLE001)


379-379: Do not catch blind exception: Exception

(BLE001)


380-380: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


416-416: Do not catch blind exception: Exception

(BLE001)


417-417: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (2)
backend/backend/celery_db_retry.py (1)

314-356: Patch guards look good and avoid double-wrapping.

Nice use of wrapped and a marker attribute to prevent re-patching.

Also applies to: 358-373

backend/backend/celery_config.py (1)

63-70: Config logging is clear; good swap between built-in vs custom paths.

@hari-kuriakose
Copy link
Contributor

@muhammad-ali-e

  1. I was thinking the connection pool mechanism would automatically internally try to connect to db after failures. Ideally we should have only tweaked env settings for retry interval, retry count etc. Instead seemingly we are interfering directly with the connection pool lifecycle, which might over complicate things.

  2. Also I am hoping that if we continue to leave the retrying to the connection pool itself, then we need not sprinkle decorators in each and every call site but instead have it in a single central place.

Let's try to keep it simple and maybe revisit the approach?

@muhammad-ali-e muhammad-ali-e marked this pull request as draft September 10, 2025 11:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants