-
Notifications
You must be signed in to change notification settings - Fork 553
UN-2798 [FIX] Fix WorkflowFileExecution stuck in EXECUTING status due to database connection errors #1524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
UN-2798 [FIX] Fix WorkflowFileExecution stuck in EXECUTING status due to database connection errors #1524
Conversation
- Add centralized database retry mechanism with connection pool refresh - Implement retry logic for Django ORM and Celery SQLAlchemy operations - Apply @db_retry decorators to critical workflow chain components - Handle stale connections, database unavailability, and transient errors - Configure exponential backoff with customizable retry parameters
Summary by CodeRabbit
WalkthroughAdds centralized DB error classification and retry utilities, applies a Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Env as Environment
participant App as Celery App
participant Patch as celery_db_retry.patch_celery_database_backend
participant Cfg as CeleryConfig
participant DBB as Celery DatabaseBackend
participant DB as PostgreSQL/PgBouncer
Env->>Cfg: Read CELERY_* env vars
Note right of Cfg: Set transport options and\nconditional built-in retry attrs
App->>Patch: Invoke after Celery app creation
Patch->>DBB: Patch _store_result/_get_task_meta_for/get_result...
Note over DBB: Methods wrapped with\ncustom backoff or builtin retry
App->>DBB: Store/get task result
DBB->>DB: Execute SQL
alt Connection/transient error
DBB-->>DBB: Classify error (utils.db_constants)
DBB-->>DBB: Backoff + optional pool dispose
DBB->>DB: Retry until success/exhausted
else Success
DBB-->>App: Return result/meta
end
sequenceDiagram
autonumber
participant Caller as App code
participant Wrap as utils.db_retry.db_retry
participant ORM as Django ORM
participant DB as PostgreSQL/PgBouncer
Caller->>Wrap: Call decorated function
Wrap->>ORM: Execute query/operation
ORM->>DB: SQL
alt Error occurs
Wrap-->>Wrap: Classify error & decide retry
opt Requires pool refresh
Wrap-->>ORM: close_old_connections()
end
Wrap->>DB: Retry with exponential backoff
alt Exhausted
Wrap-->>Caller: Raise last error
else Success
Wrap-->>Caller: Return result
end
else Success
Wrap-->>Caller: Return result
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
backend/workflow_manager/file_execution/models.py (1)
21-57
: Handle rare IntegrityError race on get_or_createUnder concurrency,
get_or_create
can raiseIntegrityError
due to unique constraints. Retrying with a fetch is a common pattern and aligns with the retry theme of this PR.Apply this diff within the function:
- execution_file, is_created = self.get_or_create(**lookup_fields) + try: + execution_file, is_created = self.get_or_create(**lookup_fields) + except Exception as e: + # If the exception is an IntegrityError (unique race), fetch existing + from django.db import IntegrityError + if isinstance(e, IntegrityError): + execution_file = self.get(**lookup_fields) + is_created = False + else: + raiseNote: If you prefer explicit imports, add
from django.db import IntegrityError
at module top.backend/account_v2/organization.py (1)
24-38
: Make create_organization idempotent across retries.If the first insert commits but a connection error fires post-commit, a retry will raise IntegrityError. Prefer get_or_create to guarantee idempotency under retries.
- def create_organization( + def create_organization( name: str, display_name: str, organization_id: str ) -> Organization: - try: - organization: Organization = Organization( - name=name, - display_name=display_name, - organization_id=organization_id, - ) - organization.save() - except IntegrityError as error: - Logger.info(f"[Duplicate Id] Failed to create Organization Error: {error}") - raise error - return organization + organization, created = Organization.objects.get_or_create( # type: ignore + organization_id=organization_id, + defaults={"name": name, "display_name": display_name}, + ) + if not created: + Logger.info( + f"[Duplicate Id] Organization already exists: organization_id={organization_id}" + ) + return organizationbackend/workflow_manager/workflow_v2/models/execution.py (2)
242-276
: Fix attempts double-increment under retries.When increment_attempt=True, a post-commit connection error can cause the decorator to re-run and increment attempts twice. Use an optimistic, idempotent DB update.
if error: self.error_message = error[:EXECUTION_ERROR_LENGTH] if increment_attempt: - self.attempts += 1 + # Idempotent increment guarded by current value + updated = type(self).objects.filter( + pk=self.pk, attempts=self.attempts + ).update(attempts=models.F("attempts") + 1) + if updated: + self.attempts += 1 self.save()
286-304
: Guard against null workflow in cache handler.workflow is nullable; dereferencing self.workflow.id will crash and retries won’t help. Use workflow_id and early-return when absent.
- def _handle_execution_cache(self): - if not ExecutionCacheUtils.is_execution_exists( - workflow_id=self.workflow.id, execution_id=self.id - ): + def _handle_execution_cache(self): + if not self.workflow_id: + logger.debug("Skipping execution cache update: workflow is None") + return + if not ExecutionCacheUtils.is_execution_exists( + workflow_id=self.workflow_id, execution_id=self.id + ): execution_cache = ExecutionCache( - workflow_id=self.workflow.id, + workflow_id=self.workflow_id, execution_id=self.id, total_files=self.total_files, status=self.status, ) ExecutionCacheUtils.create_execution( execution=execution_cache, ) else: ExecutionCacheUtils.update_status( - workflow_id=self.workflow.id, execution_id=self.id, status=self.status + workflow_id=self.workflow_id, execution_id=self.id, status=self.status )
🧹 Nitpick comments (20)
backend/usage_v2/helper.py (1)
59-66
: Remove unreachable exception handler
QuerySet.aggregate(...)
does not raiseDoesNotExist
; it returns a dict with None values when no rows match. This handler won’t execute.Apply this diff:
- except Usage.DoesNotExist: - # Handle the case where no usage data is found for the given run_id - logger.warning(f"Usage data not found for the specified run_id: {run_id}") - return {}Optional: if callers prefer zeros, normalize
None
to0
before returning.backend/workflow_manager/file_execution/models.py (2)
59-69
: Limit save to updated fieldsMinor write optimization: specify
update_fields
insave()
since we know the exact fields being set.Apply this diff:
- execution_file.file_name = file_hash.file_name - execution_file.file_size = file_hash.file_size - execution_file.mime_type = file_hash.mime_type - execution_file.provider_file_uuid = file_hash.provider_file_uuid - execution_file.fs_metadata = file_hash.fs_metadata - execution_file.save() + execution_file.file_name = file_hash.file_name + execution_file.file_size = file_hash.file_size + execution_file.mime_type = file_hash.mime_type + execution_file.provider_file_uuid = file_hash.provider_file_uuid + execution_file.fs_metadata = file_hash.fs_metadata + execution_file.save( + update_fields=[ + "file_name", + "file_size", + "mime_type", + "provider_file_uuid", + "fs_metadata", + ] + )
124-156
: Use update_fields to reduce row write size; confirm enum assignmentAs with the other model, restrict writes to touched columns. Also confirm
ExecutionStatus
type: if it’s aTextChoices
(str), assigning the enum is fine; if it’s a plain Enum, use.value
. Current code assigns the enum directly.Apply this diff:
- self.status = status + self.status = status # if plain Enum, use: status.value @@ - self.execution_error = execution_error - self.save() + self.execution_error = execution_error + update_fields = ["status", "execution_error"] + if self.execution_time is not None: + update_fields.append("execution_time") + self.save(update_fields=update_fields)If
ExecutionStatus
is a plain Enum in this app, change toself.status = status.value
.backend/workflow_manager/workflow_v2/models/execution.py (1)
198-227
: Optional: add retry to aggregated_usage_cost.This property performs .exists() and .aggregate(); consider wrapping with @db_retry to harden reads against transient errors.
backend/utils/db_retry.py (5)
200-205
: Log full trace when retries exhausted.Use logger.exception to keep stack traces.
- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e ) )
171-179
: Broad except during pool refresh — annotate or narrow.If you keep catching all exceptions here, add a BLE001 ignore or narrow if possible.
- except Exception as refresh_error: + except Exception as refresh_error: # noqa: BLE001 logger.warning( LogMessages.format_message( LogMessages.POOL_REFRESH_FAILED, error=refresh_error ) )
374-381
: Log full trace when context retries exhausted.- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=final_max_retries + 1, error=e, ) )
158-159
: Optional: add jitter to backoff to avoid herd effects.- delay = min(base_delay * (2**retry_count), max_delay) + delay = min(base_delay * (2**retry_count), max_delay) + # Optional jitter + # delay *= 1 + random.uniform(0, 0.25)Add at top if you enable jitter:
-import logging +import logging +import randomAlso applies to: 332-333
114-209
: Operational advice: emit metrics for retries.Expose counters/gauges (attempts, succeeded_on_retry, exhausted) and error-type labels to observe impact and tune defaults.
backend/backend/celery_db_retry.py (8)
162-169
: Emit stack traces on terminal failure.Use
logger.exception
so we retain the traceback when retries are exhausted.Apply this diff:
- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=current_max_retries + 1, error=e, ) )
187-203
: Remove unused parameter and improve error logging in engine disposal helper.
func
is unused; also preferlogger.exception
to retain TB.Apply this diff:
-def _dispose_sqlalchemy_engine(func): +def _dispose_sqlalchemy_engine(): @@ - except Exception as e: - logger.warning(f"Could not dispose SQLAlchemy engine: {e}") + except Exception: + logger.exception("Could not dispose SQLAlchemy engine")And update the call site:
- _dispose_sqlalchemy_engine(func) + _dispose_sqlalchemy_engine()
314-335
: Config sources for “built-in” vs “custom” retry are inconsistent.Built-in branch reads generic
DB_RETRY_*
viaget_retry_settings()
, while custom decorator usesCELERY_DB_RETRY_*
viaget_celery_retry_settings()
. This is confusing in ops/docs.Consider standardizing on the
CELERY_DB_RETRY_*
variables for Celery in both paths (and keepDB_RETRY_*
for Django ORM), or clearly document the split. If standardizing, change this block toget_celery_retry_settings()
and rename the envs accordingly.Would you like a follow-up patch to unify the variable names and sample.env?
114-121
: Add jitter and consistent delay logging for refresh branch.
- Add small random jitter to avoid stampedes when multiple workers retry simultaneously.
- When
needs_refresh
is True, we don’t log the retry delay; include it for parity.Example diff:
+ import random + jitter = random.uniform(0, current_base_delay * 0.1) - delay = min( - current_base_delay * (2**retry_count), current_max_delay - ) + delay = min( + current_base_delay * (2**retry_count) + jitter, + current_max_delay, + ) @@ - try: + try: _dispose_sqlalchemy_engine() logger.info( "SQLAlchemy connection pool disposed successfully" ) except Exception as refresh_error: logger.warning( LogMessages.format_message( LogMessages.POOL_REFRESH_FAILED, error=refresh_error, ) ) + logger.warning( + LogMessages.format_message( + LogMessages.CONNECTION_ERROR_RETRY, + attempt=retry_count, + total=current_max_retries + 1, + error=e, + delay=delay, + ) + )Also applies to: 149-159
299-303
: Uselogger.exception
when patching fails.Preserve traceback to aid diagnosis in ops.
Apply this diff:
- except Exception as e: - logger.error(f"Error patching Celery database backend: {e}") + except Exception: + logger.exception("Error patching Celery database backend")
338-341
: Uselogger.exception
for built-in retry configuration errors.Keeps TB for debugging misconfigurations.
Apply this diff:
- except Exception as e: - logger.error(f"Error configuring Celery built-in retry: {e}") + except Exception: + logger.exception("Error configuring Celery built-in retry")
354-369
: Minor: centralize env keys for engine options.
"CELERY_DB_CONNECT_TIMEOUT"
and"CELERY_DB_ECHO_SQL"
are magic strings here; consider adding constants toRetryConfiguration
for discoverability and typed conversion.
179-181
: Remove unreachable code after the retry loop.The
return func(*args, **kwargs)
is not reachable; it can mask future refactors.Apply this diff:
- # This should never be reached, but included for completeness - return func(*args, **kwargs) + return None # unreachable by designOr remove the block entirely.
backend/backend/celery_config.py (1)
1-1
: Use logging instead of prints in config.Printing from config code is noisy in production. Prefer module logger.
Apply this diff:
+import logging import os @@ +logger = logging.getLogger(__name__) @@ - print( - f"[Celery Config] Using built-in retry: max_retries={result_backend_max_retries}, " - ) - print( - f"base_sleep={result_backend_base_sleep_between_retries_ms}ms, max_sleep={result_backend_max_sleep_between_retries_ms}ms" - ) + logger.info( + "[Celery Config] Using built-in retry: max_retries=%s, base_sleep=%sms, max_sleep=%sms", + result_backend_max_retries, + result_backend_base_sleep_between_retries_ms, + result_backend_max_sleep_between_retries_ms, + ) @@ - print("[Celery Config] Using custom retry system (patching enabled)") + logger.info("[Celery Config] Using custom retry system (patching enabled)")Also applies to: 60-66, 67-68
backend/utils/db_constants.py (2)
128-131
: PEP 604 style for optional type.Adopt
str | None
per Ruff RUF013 and modern typing.Apply this diff:
- def classify_error( - cls, error: Exception, error_message: str = None - ) -> tuple[DatabaseErrorType, bool]: + def classify_error( + cls, error: Exception, error_message: str | None = None + ) -> tuple[DatabaseErrorType, bool]:
171-195
: Centralize Celery engine-related env keys.Since
celery_db_retry.get_celery_db_engine_options()
readsCELERY_DB_CONNECT_TIMEOUT
andCELERY_DB_ECHO_SQL
, consider adding constants for these here to keep all Celery DB config discoverable in one place.Also applies to: 280-296
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (16)
backend/account_v2/organization.py
(2 hunks)backend/backend/celery_config.py
(2 hunks)backend/backend/celery_db_retry.py
(1 hunks)backend/backend/celery_service.py
(2 hunks)backend/backend/settings/base.py
(1 hunks)backend/backend/workers/file_processing/file_processing.py
(2 hunks)backend/backend/workers/file_processing_callback/file_processing_callback.py
(2 hunks)backend/sample.env
(1 hunks)backend/usage_v2/helper.py
(3 hunks)backend/utils/db_constants.py
(1 hunks)backend/utils/db_retry.py
(1 hunks)backend/utils/models/organization_mixin.py
(2 hunks)backend/utils/user_context.py
(2 hunks)backend/workflow_manager/file_execution/models.py
(5 hunks)backend/workflow_manager/workflow_v2/execution.py
(2 hunks)backend/workflow_manager/workflow_v2/models/execution.py
(5 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py
74-74: Consider moving this statement to an else
block
(TRY300)
135-135: Do not catch blind exception: Exception
(BLE001)
162-168: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
187-187: Unused function argument: func
(ARG001)
201-201: Do not catch blind exception: Exception
(BLE001)
301-301: Do not catch blind exception: Exception
(BLE001)
302-302: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
338-338: Do not catch blind exception: Exception
(BLE001)
339-339: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
backend/utils/db_retry.py
174-174: Do not catch blind exception: Exception
(BLE001)
200-204: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
314-314: Consider moving this statement to an else
block
(TRY300)
348-348: Do not catch blind exception: Exception
(BLE001)
374-380: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
backend/utils/db_constants.py
129-129: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (17)
backend/backend/celery_service.py (1)
10-10
: Import looks goodImporting from a single module keeps the patch centralized. No issues.
backend/backend/workers/file_processing/file_processing.py (1)
10-10
: Import is fineConsistent with other workers; OK.
backend/backend/workers/file_processing_callback/file_processing_callback.py (1)
10-10
: Import is fineConsistent and clear.
backend/utils/user_context.py (2)
20-21
: Verify retry classification doesn’t include ProgrammingError
@db_retry()
here is good, butProgrammingError
is explicitly handled below as a non-retryable path. Ensure the retry classifier excludesProgrammingError
, otherwise this decorator could mask setup issues and loop unnecessarily.
5-5
: Decorator import OKCentralizing retry is consistent with the PR’s goals.
backend/workflow_manager/workflow_v2/execution.py (2)
395-396
: Decorator order is correct
@db_retry()
applied under@staticmethod
is appropriate (decorated function becomes the static target).
395-401
: The repository contains two definitions ofWorkflowExecution
—one in the backend and one in theunstract
service. It’s unclear which model is used byupdate_execution_err
. Please verify:
- That the
WorkflowExecution
imported inbackend/workflow_manager/workflow_v2/execution.py
is a Django model with a TextField forstatus
usingTextChoices
.- That
ExecutionStatus.ERROR.value
is compatible with the model’s field (e.g.,"ERROR"
).- Whether tightening
save(update_fields=[...])
and updating the return type toWorkflowExecution | None
is safe given how callers handle None.Confirm these points to ensure no breaking changes.
backend/usage_v2/helper.py (2)
17-18
: LGTM: retry around aggregationWrapping read-only aggregation with
@db_retry()
is safe and helps during transient drops.
69-70
: LGTM: retry around aggregationSame here; read-only and idempotent.
backend/sample.env (1)
29-39
: No changes needed in sample.env
The environment variables in sample.env already match the settings inbackend/backend/settings/base.py
(DB_CONNECTION_RETRY_* and CELERY_DB_RETRY_) and there is no implementation of the new names (DB_RETRY_ or CELERY_USE_BUILTIN_RETRY).backend/workflow_manager/file_execution/models.py (2)
228-252
: LGTM: targeted updates with update_fieldsThe selective
save(update_fields=...)
is good practice; keep it.
20-21
: Decorator usage is appropriateApplying
@db_retry()
on short, idempotent ORM operations is aligned with the PR’s goal and should help prevent stuck states.Also applies to: 58-59, 124-125, 228-229
backend/utils/models/organization_mixin.py (1)
21-25
: Retrying save() looks fine; minimal side effects.Wrapping save() here is reasonable; the body is idempotent. No further concerns.
backend/account_v2/organization.py (1)
16-21
: LGTM: get by org_id under retry.Idempotent read; safe with @db_retry.
backend/workflow_manager/workflow_v2/models/execution.py (1)
45-66
: LGTM: manager cleanup under retry.count()/delete() are evaluated DB ops; decorating here is effective.
backend/utils/db_retry.py (1)
71-75
: Confirm env var names in docs match settings.Docs mention DB_RETRY_; base.py currently defines DB_CONNECTION_RETRY_ too. Ensure RetryConfiguration reads the intended names.
backend/utils/db_constants.py (1)
301-319
: Nice: consistent, reusable log templates.Centralized messages will make ops logs coherent across Django and Celery paths.
from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix import path for retry helpers.
File resides under backend/backend/
, so the import should reference backend.backend.celery_db_retry
. Current path will fail at runtime.
Apply this diff:
-from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry
+from backend.backend.celery_db_retry import (
+ get_celery_db_engine_options,
+ should_use_builtin_retry,
+)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry | |
from backend.backend.celery_db_retry import ( | |
get_celery_db_engine_options, | |
should_use_builtin_retry, | |
) |
🤖 Prompt for AI Agents
In backend/backend/celery_config.py around lines 6 to 7, the import path for the
retry helpers is incorrect; update the import to reference the actual module
location by changing the import to use backend.backend.celery_db_retry and
import get_celery_db_engine_options and should_use_builtin_retry from that
module so the runtime can resolve the symbols.
# Database backend engine options for PgBouncer compatibility | ||
result_backend_transport_options = get_celery_db_engine_options() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Confirm Celery picks up engine options for SQLAlchemy backend.
result_backend_transport_options
is correct for Celery’s SQLAlchemy backend; please verify on your Celery version that these are passed to create_engine
.
Run:
If your version differs, we may need to adjust the config key or option names.
🏁 Script executed:
#!/bin/bash
# Inspect Celery DatabaseBackend to confirm it uses `result_backend_transport_options`
python - <<'PY'
import inspect
from celery.backends import database
print("DatabaseBackend.create_backend method:\n")
print(inspect.getsource(database.DatabaseBackend.__init__))
PY
Length of output: 118
🏁 Script executed:
#!/bin/bash
# Locate Celery dependency and its pinned version in the repository
grep -R "celery" -n requirements*.txt pyproject.toml setup.py
Length of output: 255
🏁 Script executed:
#!/bin/bash
# Install the pinned Celery version and inspect DatabaseBackend for transport options handling
pip install celery==5.3.4 >/dev/null 2>&1
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend
print("Signature of DatabaseBackend.__init__:\n")
print(inspect.signature(DatabaseBackend.__init__), "\n")
print("Source snippet around engine creation in DatabaseBackend:\n")
src = inspect.getsource(DatabaseBackend)
for line in src.splitlines():
if "create_engine" in line or "transport_options" in line:
print(line)
PY
Length of output: 437
🏁 Script executed:
#!/bin/bash
# Install Celery and SQLAlchemy to enable DatabaseBackend inspection
pip install celery==5.3.4 sqlalchemy >/dev/null 2>&1
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend
print("Signature of DatabaseBackend.__init__:\n", inspect.signature(DatabaseBackend.__init__), "\n")
source = inspect.getsource(DatabaseBackend)
print("Lines containing engine creation or transport options:\n")
for line in source.splitlines():
if "create_engine" in line or "transport_options" in line:
print(line)
PY
Length of output: 157
🏁 Script executed:
#!/bin/bash
# Print the source of DatabaseBackend.__init__ to inspect engine_options usage
python - <<'PY'
import inspect
from celery.backends.database import DatabaseBackend
print(inspect.getsource(DatabaseBackend.__init__))
PY
Length of output: 1243
Use database_engine_options
instead of result_backend_transport_options
in backend/backend/celery_config.py
Celery’s SQLAlchemy result backend reads engine options from conf.database_engine_options
(not result_backend_transport_options
) and passes them into create_engine
via SessionManager.session_factory
(docs.celeryq.dev)
🤖 Prompt for AI Agents
In backend/backend/celery_config.py around lines 41 to 43, the code sets Celery
SQLAlchemy engine options using result_backend_transport_options, but Celery
expects these under database_engine_options; change the assignment to set
conf.database_engine_options (i.e., assign get_celery_db_engine_options() to
database_engine_options) so the SQLAlchemy result backend receives the engine
options passed into create_engine via SessionManager.session_factory.
# Patch Celery database backend to add connection retry logic | ||
patch_celery_database_backend() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Patch after Celery config is loaded and gate behind config flag
Patching before app.config_from_object(...)
risks missing backend URL/engine options that come from config. Also, multiple modules call the patch; ensure it’s gated and idempotent.
Suggested change:
- Move patch after
config_from_object
. - Only patch when built-in retry is disabled.
Apply this diff:
-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
+# Register custom tasks
TaskRegistry()
# Load task modules from all registered Django app configs.
app.config_from_object("backend.celery_config.CeleryConfig")
+if not should_use_builtin_retry():
+ # Patch Celery database backend to add connection retry logic
+ patch_celery_database_backend()
app.autodiscover_tasks()
Additionally, please confirm patch_celery_database_backend()
is safe to call multiple times across processes (idempotent). If it’s not, add a module-level “already_patched” guard inside that function.
🏁 Script executed:
#!/bin/bash
# Locate celery_service.py
fd --type f 'celery_service\.py'
echo -e "\n--- celery_service.py snippet (first 200 lines) ---"
sed -n '1,200p' backend/backend/celery_service.py
echo -e "\n--- patch_celery_database_backend definition in celery_db_retry.py ---"
rg -nP '^def patch_celery_database_backend' -C3 backend/backend/celery_db_retry.py || true
echo -e "\n--- Check for idempotency guard in celery_db_retry.py ---"
rg -nP 'already_patched' -C3 backend/backend/celery_db_retry.py || true
echo -e "\n--- should_use_builtin_retry definition in celery_db_retry.py ---"
rg -nP '^def should_use_builtin_retry' -C3 backend/backend/celery_db_retry.py || true
echo -e "\n--- Usages of patch_celery_database_backend across repository ---"
rg -nR 'patch_celery_database_backend' -C1 . || true
Length of output: 2319
🏁 Script executed:
#!/bin/bash
# Print the beginning of celery_db_retry.py including the patch function
sed -n '1,300p' backend/backend/celery_db_retry.py
echo -e "\n--- Check for idempotency guard usage ---"
grep -R "_celery_backend_patched" -n backend/backend/celery_db_retry.py -C5 || true
echo -e "\n--- should_use_builtin_retry function body ---"
sed -n '1,200p' backend/backend/celery_db_retry.py | sed -n '/def should_use_builtin_retry/,/^$/p' || true
echo -e "\n--- Repository-wide usages of patch_celery_database_backend ---"
rg -n 'patch_celery_database_backend' -C1 . || true
Length of output: 17185
Relocate patch_celery_database_backend call
Move the call to immediately after app.config_from_object("backend.celery_config.CeleryConfig") in backend/backend/celery_service.py. patch_celery_database_backend already switches on should_use_builtin_retry() and uses _celery_backend_patched for idempotency—drop the extra conditional wrapper and no additional guard is required.
🤖 Prompt for AI Agents
In backend/backend/celery_service.py around lines 28-30, move the call to
patch_celery_database_backend() so it runs immediately after
app.config_from_object("backend.celery_config.CeleryConfig"); remove the
surrounding conditional wrapper currently guarding the call (since
patch_celery_database_backend already checks should_use_builtin_retry() and uses
_celery_backend_patched for idempotency). Ensure there is no extra guard or
duplicate checks and keep only a direct call to patch_celery_database_backend()
right after app.config_from_object(...) so the patching logic self-controls when
to apply.
# Database connection retry settings | ||
DB_CONNECTION_RETRY_COUNT = int(os.environ.get("DB_CONNECTION_RETRY_COUNT", "3")) | ||
DB_CONNECTION_RETRY_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_DELAY", "1")) | ||
DB_CONNECTION_RETRY_MAX_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30")) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use float for retry delays; support both DB_RETRY_ and DB_CONNECTION_RETRY_ env names.**
Current ints prevent sub-second backoff and differ from db_retry.py docs (DB_RETRY_*). Parse floats and fall back to either naming for consistency.
-# Database connection retry settings
-DB_CONNECTION_RETRY_COUNT = int(os.environ.get("DB_CONNECTION_RETRY_COUNT", "3"))
-DB_CONNECTION_RETRY_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_DELAY", "1"))
-DB_CONNECTION_RETRY_MAX_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30"))
+# Database connection retry settings (support legacy and new names)
+DB_RETRY_MAX_RETRIES = int(os.environ.get("DB_RETRY_MAX_RETRIES", os.environ.get("DB_CONNECTION_RETRY_COUNT", "3")))
+DB_RETRY_BASE_DELAY = float(os.environ.get("DB_RETRY_BASE_DELAY", os.environ.get("DB_CONNECTION_RETRY_DELAY", "1.0")))
+DB_RETRY_MAX_DELAY = float(os.environ.get("DB_RETRY_MAX_DELAY", os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30.0")))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Database connection retry settings | |
DB_CONNECTION_RETRY_COUNT = int(os.environ.get("DB_CONNECTION_RETRY_COUNT", "3")) | |
DB_CONNECTION_RETRY_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_DELAY", "1")) | |
DB_CONNECTION_RETRY_MAX_DELAY = int(os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30")) | |
# Database connection retry settings (support legacy and new names) | |
DB_RETRY_MAX_RETRIES = int( | |
os.environ.get( | |
"DB_RETRY_MAX_RETRIES", | |
os.environ.get("DB_CONNECTION_RETRY_COUNT", "3"), | |
) | |
) | |
DB_RETRY_BASE_DELAY = float( | |
os.environ.get( | |
"DB_RETRY_BASE_DELAY", | |
os.environ.get("DB_CONNECTION_RETRY_DELAY", "1.0"), | |
) | |
) | |
DB_RETRY_MAX_DELAY = float( | |
os.environ.get( | |
"DB_RETRY_MAX_DELAY", | |
os.environ.get("DB_CONNECTION_RETRY_MAX_DELAY", "30.0"), | |
) | |
) |
🤖 Prompt for AI Agents
In backend/backend/settings/base.py around lines 118 to 122, the retry delay
constants are parsed as ints and only read DB_CONNECTION_RETRY_* env names;
change parsing to float so sub-second backoff is allowed and accept either
DB_RETRY_* or DB_CONNECTION_RETRY_* env names for backward-compatibility.
Specifically, for each setting (COUNT, DELAY, MAX_DELAY) check os.environ for
the shorter DB_RETRY_* first then fall back to DB_CONNECTION_RETRY_*, and parse
numeric values with float(...) (use int only for COUNT if desired) providing the
same default strings as before.
# Patch Celery database backend to add connection retry logic | ||
patch_celery_database_backend() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Defer patch until after Celery config and gate with flag
Mirror the change from other Celery modules for consistency and to ensure engine options are loaded.
Apply this diff:
-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
-app.config_from_object(CeleryConfig)
+# Load task modules from all registered Django app configs.
+app.config_from_object(CeleryConfig)
+if not should_use_builtin_retry():
+ # Patch Celery database backend to add connection retry logic
+ patch_celery_database_backend()
Confirm patch_celery_database_backend()
is idempotent.
🏁 Script executed:
#!/bin/bash
set -e
# Search for the function definition of patch_celery_database_backend
rg -nP "def\s+patch_celery_database_backend" -C5 --glob "*.py"
# Search for any internal guard or idempotency checks within the function
rg -nP "patch_celery_database_backend\(" -C5 --glob "*.py"
Length of output: 5204
🏁 Script executed:
#!/bin/bash
set -e
# Print lines around the function definition to inspect idempotency guard
sed -n '180,260p' backend/backend/celery_db_retry.py
# Confirm usage of the _celery_backend_patched flag inside the function
rg -nP '_celery_backend_patched' -C5 backend/backend/celery_db_retry.py
Length of output: 5249
Defer and gate patch invocation
Patch is idempotent (guards against double patching via _celery_backend_patched
flag and retry-marker) but should be applied after loading Celery config and only when custom retry is enabled—mirror other modules:
-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
-app.config_from_object(CeleryConfig)
+# Load task modules from all registered Django app configs.
+app.config_from_object(CeleryConfig)
+if not should_use_builtin_retry():
+ # Patch Celery database backend to add connection retry logic
+ patch_celery_database_backend()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Patch Celery database backend to add connection retry logic | |
patch_celery_database_backend() | |
from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry | |
@@ | |
-# Patch Celery database backend to add connection retry logic | |
-patch_celery_database_backend() | |
- | |
-# Load task modules from all registered Django app configs. | |
# Load task modules from all registered Django app configs. | |
app.config_from_object(CeleryConfig) | |
if not should_use_builtin_retry(): | |
# Patch Celery database backend to add connection retry logic | |
patch_celery_database_backend() |
🤖 Prompt for AI Agents
In backend/backend/workers/file_processing_callback/file_processing_callback.py
around lines 29 to 31, the unconditional call to patch_celery_database_backend()
must be deferred and gated: remove the direct invocation and instead call
patch_celery_database_backend() only after Celery configuration has been loaded
and only when the custom retry feature/flag is enabled (matching how other
modules do it). Keep the existing idempotency guards in the patch function, but
add a check against the app/settings flag or init sequence and move the call
into the Celery init/boot path so the patch runs once and only when configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@muhammad-ali-e Sounds valid. Please check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
# Patch Celery database backend to add connection retry logic | ||
patch_celery_database_backend() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Align patch ordering with Celery app configuration
Same concern as in celery_service: patch after app.config_from_object(CeleryConfig)
and gate behind the built-in retry flag to avoid redundant/double patching.
Apply this diff:
-from backend.celery_db_retry import patch_celery_database_backend
+from backend.celery_db_retry import patch_celery_database_backend, should_use_builtin_retry
@@
-# Patch Celery database backend to add connection retry logic
-patch_celery_database_backend()
-
-# Load task modules from all registered Django app configs.
-app.config_from_object(CeleryConfig)
+# Load task modules from all registered Django app configs.
+app.config_from_object(CeleryConfig)
+if not should_use_builtin_retry():
+ # Patch Celery database backend to add connection retry logic
+ patch_celery_database_backend()
Please also confirm the patch is idempotent across worker modules.
🏁 Script executed:
#!/bin/bash
# Search for usage of patch_celery_database_backend
rg -n "patch_celery_database_backend" -C3
# Search for Celery app configuration ordering
rg -n "config_from_object" -C3
# Locate definition of should_use_builtin_retry
rg -n "def should_use_builtin_retry" -C3
Length of output: 7106
Gate and reorder Celery DB retry patch
Move the patch_celery_database_backend()
call to immediately after app.config_from_object(CeleryConfig)
and wrap it in if not should_use_builtin_retry():
in all three worker entry‐points and in celery_service.py
; the built-in flag check prevents redundant patching and _celery_backend_patched
makes the patch idempotent.
Applies to:
- backend/backend/celery_service.py
- backend/backend/workers/file_processing/file_processing.py
- backend/backend/workers/file_processing_callback/file_processing_callback.py
🤖 Prompt for AI Agents
In backend/backend/workers/file_processing/file_processing.py around lines 29 to
31, move the call to patch_celery_database_backend() so it occurs immediately
after app.config_from_object(CeleryConfig) and wrap that call in if not
should_use_builtin_retry(): to avoid redundant patching; also ensure the patch
function remains idempotent by relying on the existing _celery_backend_patched
guard (do not call patch_celery_database_backend() anywhere before
config_from_object in this module).
except Exception as e: | ||
error_type, needs_refresh, use_extended = _classify_database_error(e) | ||
|
||
if not DatabaseErrorPatterns.is_retryable_error(error_type): | ||
logger.debug( | ||
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e) | ||
) | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Catch only DB connection errors, not all Exceptions.
Avoid broad except; limit to OperationalError and InterfaceError.
- except Exception as e:
+ except (OperationalError, InterfaceError) as e:
error_type, needs_refresh, use_extended = _classify_database_error(e)
if not DatabaseErrorPatterns.is_retryable_error(error_type):
logger.debug(
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
)
raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
except Exception as e: | |
error_type, needs_refresh, use_extended = _classify_database_error(e) | |
if not DatabaseErrorPatterns.is_retryable_error(error_type): | |
logger.debug( | |
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e) | |
) | |
raise | |
except (OperationalError, InterfaceError) as e: | |
error_type, needs_refresh, use_extended = _classify_database_error(e) | |
if not DatabaseErrorPatterns.is_retryable_error(error_type): | |
logger.debug( | |
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e) | |
) | |
raise |
🤖 Prompt for AI Agents
In backend/utils/db_retry.py around lines 141-149, the code currently uses a
broad "except Exception as e" which should be narrowed to only DB
connection-related errors; replace it with "except (OperationalError,
InterfaceError) as e" and add/import those exceptions from the DB library used
in the project (e.g., from sqlalchemy.exc import OperationalError,
InterfaceError) so non-database exceptions continue to propagate unchanged; keep
the existing handling (classify, log non-retryable, raise) inside that narrowed
except block and do not swallow other exceptions.
except Exception as e: | ||
error_type, needs_refresh, use_extended = _classify_database_error(e) | ||
|
||
if not DatabaseErrorPatterns.is_retryable_error(error_type): | ||
logger.debug( | ||
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e) | ||
) | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Same here: don’t catch all Exceptions in context manager.
Limit to DB connection errors.
- except Exception as e:
+ except (OperationalError, InterfaceError) as e:
error_type, needs_refresh, use_extended = _classify_database_error(e)
if not DatabaseErrorPatterns.is_retryable_error(error_type):
logger.debug(
LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
)
raise
🤖 Prompt for AI Agents
backend/utils/db_retry.py around lines 315 to 323: the context manager currently
catches all Exceptions; change the except clause to only catch database
connection/operational errors (for example sqlalchemy.exc.OperationalError,
sqlalchemy.exc.InterfaceError and related DBAPI errors such as
psycopg2.OperationalError/DatabaseError), import those exception classes at the
top of the file, keep the existing error classification/handling logic inside
that specific except, and allow any other exceptions to propagate (do not
swallow or convert non-DB exceptions).
@db_retry() # Add retry for connection drops during queryset organization filtering | ||
def get_queryset(self): | ||
organization = UserContext.get_organization() | ||
return super().get_queryset().filter(organization=organization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove @db_retry from get_queryset — it won’t help due to lazy evaluation.
get_queryset only builds a QuerySet; DB work happens later. The decorator gives a false sense of protection. Apply retries at evaluation sites (e.g., .count(), .exists(), .get(), .update(), .delete()) or provide manager helpers that perform evaluated operations.
- @db_retry() # Add retry for connection drops during queryset organization filtering
- def get_queryset(self):
+ def get_queryset(self):
organization = UserContext.get_organization()
return super().get_queryset().filter(organization=organization)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@db_retry() # Add retry for connection drops during queryset organization filtering | |
def get_queryset(self): | |
organization = UserContext.get_organization() | |
return super().get_queryset().filter(organization=organization) | |
def get_queryset(self): | |
organization = UserContext.get_organization() | |
return super().get_queryset().filter(organization=organization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
backend/utils/db_retry.py (2)
211-218
: Narrow exception scope to DB connection errors only.Catching Exception swallows non-DB bugs. Limit to Django DB connection exceptions as previously requested.
- except Exception as e: + except (OperationalError, InterfaceError) as e: error_type, needs_refresh, use_extended = _classify_database_error(e) if not DatabaseErrorPatterns.is_retryable_error(error_type): logger.debug( LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e) ) raise
388-395
: Also narrow the context manager’s except.Same rationale as above; don’t intercept unrelated exceptions.
- except Exception as e: + except (OperationalError, InterfaceError) as e:backend/backend/celery_db_retry.py (1)
31-35
: Broaden detection to all SQLAlchemy errors, not just OperationalError.Catching only OperationalError misses InterfaceError/DBAPIError subclasses.
- from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError - - return isinstance(exception, SQLAlchemyOperationalError) + from sqlalchemy.exc import SQLAlchemyError + return isinstance(exception, SQLAlchemyError)
🧹 Nitpick comments (6)
backend/utils/db_retry.py (3)
420-426
: Log failures with stack traces.Use logger.exception for MAX_RETRIES_EXCEEDED to keep tracebacks in logs (helps RCA).
- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e ) )- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=final_max_retries + 1, error=e, ) )Also applies to: 246-250
253-255
: Remove unreachable fallback.The loop exits via return/raise; this tail call won’t run.
- # This should never be reached, but included for completeness - return operation()
135-140
: Optionally include traceback on refresh failures.Use logger.exception to retain stack for failed pool refresh attempts.
- logger.warning( + logger.exception( LogMessages.format_message( LogMessages.POOL_REFRESH_FAILED, error=refresh_error ) )backend/backend/celery_db_retry.py (3)
61-63
: Remove unused parameter and fix call sites.func isn’t used in _handle_pool_refresh; drop it and simplify callers.
-def _handle_pool_refresh( - func: Callable, error: Exception, retry_count: int, total_retries: int -) -> None: +def _handle_pool_refresh(error: Exception, retry_count: int, total_retries: int) -> None:- _handle_pool_refresh( - func, e, retry_count, current_settings["max_retries"] + 1 - ) + _handle_pool_refresh( + e, retry_count, current_settings["max_retries"] + 1 + )Also applies to: 192-197
209-215
: Emit exceptions with tracebacks.Prefer logger.exception for failure logs.
- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=current_settings["max_retries"] + 1, error=e, ) )- logger.error(f"Error patching Celery database backend: {e}") + logger.exception(f"Error patching Celery database backend: {e}")- logger.error(f"Error configuring Celery built-in retry: {e}") + logger.exception(f"Error configuring Celery built-in retry: {e}")Also applies to: 341-341, 378-378
226-242
: Avoid blind except and narrow where possible.Consider catching SQLAlchemyError during dispose to reduce noise; keep a broad fallback only if necessary.
- except Exception as e: + except Exception as e: logger.warning(f"Could not dispose SQLAlchemy engine: {e}")If SQLAlchemy is guaranteed, prefer:
- except Exception as e: + except SQLAlchemyError as e: # from sqlalchemy.exc import SQLAlchemyError
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/backend/celery_db_retry.py
(1 hunks)backend/utils/db_retry.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py
61-61: Unused function argument: func
(ARG001)
75-75: Do not catch blind exception: Exception
(BLE001)
159-159: Consider moving this statement to an else
block
(TRY300)
209-215: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
240-240: Do not catch blind exception: Exception
(BLE001)
340-340: Do not catch blind exception: Exception
(BLE001)
341-341: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
377-377: Do not catch blind exception: Exception
(BLE001)
378-378: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
backend/utils/db_retry.py
135-135: Do not catch blind exception: Exception
(BLE001)
246-250: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
387-387: Consider moving this statement to an else
block
(TRY300)
420-426: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (1)
backend/backend/celery_db_retry.py (1)
350-377
: Built-in retry config timing is correct –patch_celery_database_backend()
runs immediately after eachCelery(...)
instantiation and beforeapp.config_from_object("backend.celery_config.CeleryConfig")
, and the config class pulls the env vars at load time.
# Context manager usage | ||
with db_retry_context(max_retries=3): | ||
model.save() | ||
other_model.delete() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Doc example promises unsupported behavior.
The “with db_retry_context(...):” example implies automatic re-execution of the block, which Python context managers cannot do. Please remove or rewrite to avoid misuse.
- # Context manager usage
- with db_retry_context(max_retries=3):
- model.save()
- other_model.delete()
+ # Prefer decorator or direct call for retries.
+ # Context manager is for connection refresh only (no automatic re-run).
Also applies to: 365-370
🤖 Prompt for AI Agents
In backend/utils/db_retry.py around lines 21-25 (and similarly 365-370), the doc
example shows "with db_retry_context(...):" implying the context manager will
automatically re-run the block on failure — which is incorrect; update the docs
to remove that example and replace it with one of two correct patterns: either
show using a retrying wrapper/decorator that re-invokes the callable on failure,
or show an explicit retry loop inside the with-block (e.g., performing retries
around the operation within the context rather than expecting re-execution by
the context manager). Ensure the text clearly states that the context manager
only manages transactional/cleanup behavior and does not retry the enclosed
block, and provide a short correct usage example pattern (decorator/wrapper or
explicit loop) in place of the misleading "with" example.
backend/utils/db_retry.py
Outdated
while retry_count <= final_max_retries: | ||
try: | ||
yield | ||
return # Success - exit the retry loop | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Context manager retrier is broken (multiple yields).
A @contextmanager must yield exactly once. The while/yield loop will raise “generator didn't stop after throw” on the first exception and cannot re-run the user block. Either remove this API or change semantics.
Minimal safe fix (no retry, just refresh/log) to avoid runtime error if it’s invoked:
- retry_count = 0
-
- while retry_count <= final_max_retries:
- try:
- yield
- return # Success - exit the retry loop
- except Exception as e:
- error_type, needs_refresh, use_extended = _classify_database_error(e)
-
- if not DatabaseErrorPatterns.is_retryable_error(error_type):
- logger.debug(
- LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
- )
- raise
-
- # Update settings for extended retry if needed
- extended_settings = _update_retry_settings_for_extended(
- retry_count, use_extended
- )
- if extended_settings:
- final_max_retries = extended_settings["max_retries"]
- final_base_delay = extended_settings["base_delay"]
- final_max_delay = extended_settings["max_delay"]
-
- if retry_count < final_max_retries:
- retry_count, delay = _handle_context_retry_logic(
- retry_count,
- final_max_retries,
- final_base_delay,
- final_max_delay,
- final_force_refresh,
- e,
- error_type,
- needs_refresh,
- )
- time.sleep(delay)
- continue
- else:
- logger.error(
- LogMessages.format_message(
- LogMessages.MAX_RETRIES_EXCEEDED,
- total=final_max_retries + 1,
- error=e,
- )
- )
- raise
+ try:
+ yield
+ except (OperationalError, InterfaceError) as e:
+ error_type, needs_refresh, _ = _classify_database_error(e)
+ if needs_refresh and final_force_refresh:
+ _handle_connection_pool_refresh(e, 1, 1)
+ if not DatabaseErrorPatterns.is_retryable_error(error_type):
+ logger.debug(
+ LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e)
+ )
+ raise
And update docs to steer users to the decorator or retry_database_operation for true retries.
Would you like me to replace the context manager with a small “retry_block(fn)” helper to truly re-run a provided callable?
Also applies to: 389-419
Refactor broken contextmanager retrier in backend/utils/db_retry.py
The @contextmanager
at lines 384–388 yields inside a retry loop, causing a “generator didn’t stop after throw” error on any exception and preventing retries. Remove or replace this API with either a contextmanager that yields exactly once (e.g. for refresh/log only) or convert to a retry decorator/function (such as retry_database_operation
or a retry_block(fn)
helper).
🧰 Tools
🪛 Ruff (0.12.2)
387-387: Consider moving this statement to an else
block
(TRY300)
🤖 Prompt for AI Agents
In backend/utils/db_retry.py around lines 384–388 the @contextmanager currently
yields inside the retry loop which causes "generator didn't stop after throw"
and prevents retries; replace this broken contextmanager with a retry function
or decorator that invokes the operation inside the retry loop (e.g. implement
retry_database_operation(fn, *args, **kwargs) or a @retry_database_operation
decorator) so the loop calls the target callable and handles exceptions/backs
off and returns its result, or alternatively change the contextmanager to yield
exactly once and perform all retry logic outside the generator (do not yield
inside the retry loop and remove the return-after-yield pattern so exceptions
are caught and retried by the calling wrapper).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
backend/backend/celery_db_retry.py (1)
28-35
: Broaden SQLAlchemy exception detection to cover all retryable DB errors.Catching only OperationalError misses InterfaceError/DBAPIError and friends. Use SQLAlchemyError as the umbrella type.
-def _is_sqlalchemy_error(exception: Exception) -> bool: - """Check if exception is a SQLAlchemy error.""" - try: - from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError - - return isinstance(exception, SQLAlchemyOperationalError) - except ImportError: - return False +def _is_sqlalchemy_error(exception: Exception) -> bool: + """Check if exception is a SQLAlchemy error.""" + try: + from sqlalchemy.exc import SQLAlchemyError + return isinstance(exception, SQLAlchemyError) + except ImportError: + # SQLAlchemy not installed; treat as non-SQLAlchemy error. + return Falsebackend/utils/db_retry.py (3)
21-25
: Fix docs: context manager does not re-execute the block.A @contextmanager yields once; it cannot re-run the with-block on failure. Update examples to avoid implying auto-retry.
- # Context manager usage - with db_retry_context(max_retries=3): - model.save() - other_model.delete() + # Prefer decorator or retry_database_operation for retries. + # The context manager does NOT re-run the block; it only refreshes connections on certain errors.- Example: - with db_retry_context(max_retries=5, force_refresh=True): - model.save() - other_model.delete() - MyModel.objects.filter(...).update(...) + Example: + # Use decorator or retry_database_operation for automatic retries. + # db_retry_context is for connection hygiene only; it will still re-raise the error.Also applies to: 398-403
207-214
: Catch only DB connection errors in the core retry loop.Limit to OperationalError/InterfaceError; let other exceptions propagate immediately.
- except Exception as e: + except (OperationalError, InterfaceError) as e: error_type, needs_refresh, use_extended = _classify_database_error(e)
377-451
: Context manager retries are invalid; yield exactly once and re-raise.Current loop yields multiple times, which breaks contextlib semantics (“generator didn't stop after throw”). Replace with a single-yield context that optionally refreshes connections, then re-raises.
@contextmanager def db_retry_context( @@ - retry_count = 0 - - while retry_count <= current_settings["max_retries"]: - try: - yield - return # Success - exit the retry loop - except Exception as e: - error_type, needs_refresh, use_extended = _classify_database_error(e) - - if not DatabaseErrorPatterns.is_retryable_error(error_type): - _handle_context_non_retryable_error(e) - - # Update settings for extended retry if needed - current_settings = _update_context_retry_settings( - retry_count, use_extended, current_settings - ) - - if retry_count < current_settings["max_retries"]: - retry_count, delay = _handle_context_retry_logic( - retry_count, - current_settings["max_retries"], - current_settings["base_delay"], - current_settings["max_delay"], - current_settings["force_refresh"], - e, - error_type, - needs_refresh, - ) - time.sleep(delay) - continue - else: - _handle_context_max_retries_exceeded( - e, current_settings["max_retries"] + 1 - ) + try: + yield + except (OperationalError, InterfaceError) as e: + error_type, needs_refresh, _ = _classify_database_error(e) + if not DatabaseErrorPatterns.is_retryable_error(error_type): + _handle_context_non_retryable_error(e) + # Hygiene only: optionally refresh pool, then re-raise. + if needs_refresh and current_settings["force_refresh"]: + _handle_connection_pool_refresh(e, 1, 1) + _handle_context_max_retries_exceeded(e, 1)
🧹 Nitpick comments (7)
backend/backend/celery_db_retry.py (5)
207-214
: Avoid broad exception catch or explicitly suppress the linter.The wrapper currently catches Exception, then re-raises non-SQLAlchemy errors. Either narrow the except to SQLAlchemyError, or keep the guard and add an inline noqa to document intent.
Minimal change (keep behavior, fix BLE001):
- except Exception as e: + except Exception as e: # noqa: BLE001 - non-SQLAlchemy errors are re-raised immediately if not _is_sqlalchemy_error(e): _handle_non_sqlalchemy_error(e)Alternative (narrowed catch):
+ try: + from sqlalchemy.exc import SQLAlchemyError + except Exception: + SQLAlchemyError = tuple() # type: ignore[assignment] ... - except Exception as e: + except SQLAlchemyError as e: # classified below ...
351-356
: Mark all patched methods to prevent re-patching, not just _store_result.- # Mark as patched to prevent double patching - DatabaseBackend._store_result._retry_patched = True + # Mark as patched to prevent double patching + DatabaseBackend._store_result._retry_patched = True + try: + DatabaseBackend._get_task_meta_for._retry_patched = True + DatabaseBackend.get_result._retry_patched = True + if hasattr(DatabaseBackend, "delete_result"): + DatabaseBackend.delete_result._retry_patched = True + except Exception: + pass _celery_backend_patched = True
360-360
: Use logger.exception to preserve tracebacks on failures.- logger.error(f"Error patching Celery database backend: {e}") + logger.exception("Error patching Celery database backend")- logger.error(f"Error configuring Celery built-in retry: {e}") + logger.exception("Error configuring Celery built-in retry")Also applies to: 396-397
126-131
: Add jitter to backoff to reduce thundering herd.- delay = min( - current_settings["base_delay"] * (2**retry_count), - current_settings["max_delay"], - ) + delay = min( + current_settings["base_delay"] * (2**retry_count), + current_settings["max_delay"], + ) + # add +/-20% jitter + try: + import random + jitter = 0.2 * delay + delay = max(0.0, min(current_settings["max_delay"], delay + random.uniform(-jitter, jitter))) + except Exception: + pass
414-427
: Expose pool tuning via env and add pool_timeout for faster failover.return { # Connection health checking "pool_pre_ping": True, # Test connections before use - # Minimal pooling (let PgBouncer handle the real pooling) - "pool_size": 5, # Small pool since PgBouncer handles real pooling - "max_overflow": 0, # No overflow, rely on PgBouncer + # Minimal pooling (let PgBouncer handle real pooling) + "pool_size": RetryConfiguration.get_setting_value("CELERY_DB_POOL_SIZE", 5), + "max_overflow": RetryConfiguration.get_setting_value("CELERY_DB_MAX_OVERFLOW", 0), + "pool_timeout": RetryConfiguration.get_setting_value("CELERY_DB_POOL_TIMEOUT", 10), "pool_recycle": 3600, # Recycle connections every hourbackend/utils/db_retry.py (2)
246-250
: Preserve traceback when max retries are exceeded.- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e ) )
229-233
: Add jitter to backoff to spread retries under load.- delay = min(base_delay * (2**retry_count), max_delay) + delay = min(base_delay * (2**retry_count), max_delay) + # add +/-20% jitter + try: + import random + jitter = 0.2 * delay + delay = max(0.0, min(max_delay, delay + random.uniform(-jitter, jitter))) + except Exception: + passAdd import (top of file):
import random # optional: for jitter
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/backend/celery_db_retry.py
(1 hunks)backend/utils/db_retry.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/utils/db_retry.py
135-135: Do not catch blind exception: Exception
(BLE001)
246-250: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
422-422: Consider moving this statement to an else
block
(TRY300)
423-423: Do not catch blind exception: Exception
(BLE001)
backend/backend/celery_db_retry.py
73-73: Do not catch blind exception: Exception
(BLE001)
210-210: Consider moving this statement to an else
block
(TRY300)
211-211: Do not catch blind exception: Exception
(BLE001)
259-259: Do not catch blind exception: Exception
(BLE001)
359-359: Do not catch blind exception: Exception
(BLE001)
360-360: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
396-396: Do not catch blind exception: Exception
(BLE001)
397-397: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (5)
backend/utils/db_retry.py (4)
21-25
: Fix misleading context manager example (context managers don't auto-retry).This implies automatic re-execution of the block, which a contextmanager cannot do. Replace with a non-misleading note.
- # Context manager usage - with db_retry_context(max_retries=3): - model.save() - other_model.delete() + # Prefer decorator or direct call for retries. + # Context manager is for connection refresh only (no automatic re-run).
400-405
: Correct db_retry_context example to avoid implying auto-retry.Same issue as the top-level example; please replace or remove.
- with db_retry_context(max_retries=5, force_refresh=True): - model.save() - other_model.delete() - MyModel.objects.filter(...).update(...) + # Context manager provides connection refresh/logging only; it does not re-run the block. + # Prefer @db_retry() or retry_database_operation(...) for retries.
207-215
: Catch only DB connection errors in _execute_with_retry; log with stack; remove unreachable code.Avoid broad except; propagate non-DB errors unchanged. Also switch to logging.exception and drop the dead return.
- while retry_count <= max_retries: + while retry_count <= max_retries: try: logger.debug(LogMessages.EXECUTING_WITH_RETRY) return operation() - except Exception as e: + except (OperationalError, InterfaceError) as e: error_type, needs_refresh, use_extended = _classify_database_error(e) @@ - else: - logger.error( + else: + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=max_retries + 1, error=e ) ) raise - - # This should never be reached, but included for completeness - return operation()Also applies to: 246-255
419-453
: Context manager retrier is broken (multiple yields). Make it yield once and stop retrying here.A @contextmanager must yield exactly once. Current while/yield loop will raise “generator didn't stop after throw” and never re-run the block. Convert to a single-yield manager that optionally refreshes the pool and re-raises, and keep retries to the decorator/wrapper.
- retry_count = 0 - - while retry_count <= current_settings["max_retries"]: - try: - yield - return # Success - exit the retry loop - except Exception as e: - error_type, needs_refresh, use_extended = _classify_database_error(e) - - if not DatabaseErrorPatterns.is_retryable_error(error_type): - _handle_context_non_retryable_error(e) - - # Update settings for extended retry if needed - current_settings = _update_context_retry_settings( - retry_count, use_extended, current_settings - ) - - if retry_count < current_settings["max_retries"]: - retry_count, delay = _handle_context_retry_logic( - retry_count, - current_settings["max_retries"], - current_settings["base_delay"], - current_settings["max_delay"], - current_settings["force_refresh"], - e, - error_type, - needs_refresh, - ) - time.sleep(delay) - continue - else: - _handle_context_max_retries_exceeded( - e, current_settings["max_retries"] + 1 - ) + try: + yield + except (OperationalError, InterfaceError) as e: + error_type, needs_refresh, _ = _classify_database_error(e) + if needs_refresh and current_settings["force_refresh"]: + # single refresh attempt; no retry loop here + _handle_connection_pool_refresh(e, retry_count=1, max_retries=1) + if not DatabaseErrorPatterns.is_retryable_error(error_type): + _handle_context_non_retryable_error(e) + # retry policy belongs to decorator/_execute_with_retry; re-raise + raisebackend/backend/celery_db_retry.py (1)
28-35
: Broaden SQLAlchemy exception check to avoid missing retryable errors.Use SQLAlchemyError (or DBAPIError) instead of just OperationalError so InterfaceError/DBAPIError subclasses are also retried.
-def _is_sqlalchemy_error(exception: Exception) -> bool: - """Check if exception is a SQLAlchemy error.""" - try: - from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError - - return isinstance(exception, SQLAlchemyOperationalError) +def _is_sqlalchemy_error(exception: Exception) -> bool: + """Check if exception is a SQLAlchemy error (inclusive).""" + try: + from sqlalchemy.exc import SQLAlchemyError + return isinstance(exception, SQLAlchemyError) except ImportError: return False
🧹 Nitpick comments (5)
backend/utils/db_retry.py (2)
135-140
: Preserve traceback on pool refresh failure.Use logging.exception to retain stack; optional to keep broad catch here since refresh errors are unexpected.
- except Exception as refresh_error: - logger.warning( + except Exception as refresh_error: + logger.exception( LogMessages.format_message( LogMessages.POOL_REFRESH_FAILED, error=refresh_error ) )
230-244
: Optional: add jitter to backoff to reduce thundering herd.Small randomized jitter helps when many workers retry simultaneously.
+import random @@ - delay = min(base_delay * (2**retry_count), max_delay) + delay = min(base_delay * (2**retry_count), max_delay) + # 10% jitter + delay += random.uniform(0, 0.1 * delay)Also applies to: 33-34
backend/backend/celery_db_retry.py (3)
380-382
: Preferlogger.exception
to capture stack trace.Improves debuggability and satisfies TRY400.
- except Exception as e: - logger.error(f"Error patching Celery database backend: {e}") + except Exception: + logger.exception("Error patching Celery database backend")
417-419
: Preferlogger.exception
for configuration errors.Captures stack trace and aligns with linter hint.
- except Exception as e: - logger.error(f"Error configuring Celery built-in retry: {e}") + except Exception: + logger.exception("Error configuring Celery built-in retry")
422-449
: PgBouncer optimization (optional): allow NullPool mode.For PgBouncer in transaction mode, SQLAlchemy’s NullPool avoids pool pinning. Consider a toggle to use NullPool.
def get_celery_db_engine_options(): @@ - return { + opts = { # Connection health checking "pool_pre_ping": True, # Test connections before use # Minimal pooling (let PgBouncer handle the real pooling) "pool_size": 5, # Small pool since PgBouncer handles real pooling "max_overflow": 0, # No overflow, rely on PgBouncer "pool_recycle": 3600, # Recycle connections every hour @@ "echo": RetryConfiguration.get_setting_value("CELERY_DB_ECHO_SQL", False), - } + } + # Optional: NullPool for PgBouncer transaction mode + if RetryConfiguration.get_setting_value("CELERY_DB_USE_NULLPOOL", False): + try: + from sqlalchemy.pool import NullPool + opts.pop("pool_size", None) + opts.pop("max_overflow", None) + opts["poolclass"] = NullPool + except Exception: + logger.warning("Requested NullPool but SQLAlchemy not available; continuing with default pool options") + return opts
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/backend/celery_db_retry.py
(1 hunks)backend/utils/db_retry.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py
75-75: Do not catch blind exception: Exception
(BLE001)
248-248: Consider moving this statement to an else
block
(TRY300)
249-249: Do not catch blind exception: Exception
(BLE001)
280-280: Do not catch blind exception: Exception
(BLE001)
380-380: Do not catch blind exception: Exception
(BLE001)
381-381: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
417-417: Do not catch blind exception: Exception
(BLE001)
418-418: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
backend/utils/db_retry.py
135-135: Do not catch blind exception: Exception
(BLE001)
246-250: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
424-424: Consider moving this statement to an else
block
(TRY300)
425-425: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (6)
backend/utils/db_retry.py (5)
85-111
: Error classification reads clean and conservative.Good guard on non-Django DB exceptions and extended retry toggle for DATABASE_UNAVAILABLE.
257-315
: Decorator path LGTM.Separation of defaults and execution via _execute_with_retry is tidy.
455-501
: retry_database_operation wrapper LGTM.Consistent with decorator path; good defaults handling.
504-521
: quick_retry helper LGTM.Thin, predictable wrapper over retry_database_operation.
379-405
: Resolve: no external usages ofdb_retry_context
found; auto-retry semantics unaffected. Search across the codebase only returned the inline examples indb_retry.py
—no callers rely on this context manager’s retry behavior.backend/backend/celery_db_retry.py (1)
304-312
: Ensure patch runs before config loading – Confirm incelery_service.py
thatpatch_celery_database_backend()
is invoked beforeapp.config_from_object(CeleryConfig)
so that anyCELERY_*
retry env vars are applied by the Celery config.
backend/backend/celery_db_retry.py
Outdated
def _handle_non_sqlalchemy_error(error: BaseException) -> None: | ||
"""Handle non-SQLAlchemy errors by logging and re-raising.""" | ||
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error)) | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: bare raise
here will fail outside the except scope.
Re-raise the captured exception explicitly.
def _handle_non_sqlalchemy_error(error: BaseException) -> None:
"""Handle non-SQLAlchemy errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
- raise
+ raise error
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _handle_non_sqlalchemy_error(error: BaseException) -> None: | |
"""Handle non-SQLAlchemy errors by logging and re-raising.""" | |
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error)) | |
raise | |
def _handle_non_sqlalchemy_error(error: BaseException) -> None: | |
"""Handle non-SQLAlchemy errors by logging and re-raising.""" | |
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error)) | |
raise error |
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 108 to 112, the function
_handle_non_sqlalchemy_error uses a bare "raise" outside an except scope which
will fail; change it to explicitly re-raise the captured exception (e.g., raise
error) after logging so the original exception is propagated, and ensure the
function signature/type hints remain correct.
backend/backend/celery_db_retry.py
Outdated
def _handle_non_retryable_error(error: BaseException) -> None: | ||
"""Handle non-retryable errors by logging and re-raising.""" | ||
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error)) | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: bare raise
outside except scope.
Same issue; re-raise the provided error explicitly.
def _handle_non_retryable_error(error: BaseException) -> None:
"""Handle non-retryable errors by logging and re-raising."""
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error))
- raise
+ raise error
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _handle_non_retryable_error(error: BaseException) -> None: | |
"""Handle non-retryable errors by logging and re-raising.""" | |
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error)) | |
raise | |
def _handle_non_retryable_error(error: BaseException) -> None: | |
"""Handle non-retryable errors by logging and re-raising.""" | |
logger.debug(LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=error)) | |
raise error |
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 114 to 118, the function
_handle_non_retryable_error currently uses a bare raise outside of an except
block which is invalid; change it to explicitly re-raise the provided error by
raising the passed-in error (e.g., raise error) after logging so the original
exception is propagated correctly.
backend/backend/celery_db_retry.py
Outdated
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None: | ||
"""Handle the case when max retries are exceeded.""" | ||
logger.error( | ||
LogMessages.format_message( | ||
LogMessages.MAX_RETRIES_EXCEEDED, | ||
total=total_retries, | ||
error=error, | ||
) | ||
) | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: bare raise
outside except scope.
This will raise RuntimeError: No active exception to reraise
. Re-raise the original error.
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None:
"""Handle the case when max retries are exceeded."""
logger.error(
LogMessages.format_message(
LogMessages.MAX_RETRIES_EXCEEDED,
total=total_retries,
error=error,
)
)
- raise
+ raise error
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None: | |
"""Handle the case when max retries are exceeded.""" | |
logger.error( | |
LogMessages.format_message( | |
LogMessages.MAX_RETRIES_EXCEEDED, | |
total=total_retries, | |
error=error, | |
) | |
) | |
raise | |
def _handle_max_retries_exceeded(error: BaseException, total_retries: int) -> None: | |
"""Handle the case when max retries are exceeded.""" | |
logger.error( | |
LogMessages.format_message( | |
LogMessages.MAX_RETRIES_EXCEEDED, | |
total=total_retries, | |
error=error, | |
) | |
) | |
raise error |
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 149 to 159, the function uses
a bare `raise` outside an except block which triggers "No active exception to
reraise"; replace the bare `raise` with re-raising the original exception
instance (e.g., `raise error`) so the original error is propagated, and keep the
logging call intact.
def decorator(func: Callable) -> Callable: | ||
@wraps(func) | ||
def wrapper(*args, **kwargs) -> Any: | ||
logger.debug( | ||
LogMessages.format_message( | ||
LogMessages.CELERY_OPERATION_START, operation=func.__name__ | ||
) | ||
) | ||
retry_count = 0 | ||
while retry_count <= max_retries: | ||
try: | ||
result = func(*args, **kwargs) | ||
_log_success(retry_count) | ||
return result | ||
except Exception as e: | ||
retry_info = _process_celery_exception( | ||
e, retry_count, max_retries, base_delay, max_delay | ||
) | ||
if retry_info: | ||
retry_count, delay = retry_info | ||
time.sleep(delay) | ||
continue | ||
|
||
# This should never be reached, but included for completeness | ||
return func(*args, **kwargs) | ||
|
||
return wrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Retry loop can terminate early and includes unreachable fallback; simplify and rely on classifier to stop.
- The
while retry_count <= max_retries
cap conflicts with dynamic "extended" settings returned by_get_retry_settings_for_error
, causing early termination. - The fallback call at Line 259 is effectively unreachable and unsafe if reached.
- Also address TRY300 by moving success handling to
else:
.
- def wrapper(*args, **kwargs) -> Any:
+ def wrapper(*args, **kwargs) -> Any:
logger.debug(
LogMessages.format_message(
LogMessages.CELERY_OPERATION_START, operation=func.__name__
)
)
retry_count = 0
- while retry_count <= max_retries:
- try:
- result = func(*args, **kwargs)
- _log_success(retry_count)
- return result
- except Exception as e:
- retry_info = _process_celery_exception(
- e, retry_count, max_retries, base_delay, max_delay
- )
- if retry_info:
- retry_count, delay = retry_info
- time.sleep(delay)
- continue
-
- # This should never be reached, but included for completeness
- return func(*args, **kwargs)
+ while True:
+ try:
+ result = func(*args, **kwargs)
+ except Exception as e:
+ retry_info = _process_celery_exception(
+ e, retry_count, max_retries, base_delay, max_delay
+ )
+ # If we get here, error is retryable and we have retry budget left
+ retry_count, delay = retry_info # type: ignore[misc]
+ time.sleep(delay)
+ continue
+ else:
+ _log_success(retry_count)
+ return result
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.12.2)
248-248: Consider moving this statement to an else
block
(TRY300)
249-249: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 235 to 261, the retry loop
uses a capped while retry_count <= max_retries which can prematurely stop when
dynamic retry settings from _get_retry_settings_for_error should control
continuation; remove the fixed cap and rewrite the loop as an infinite loop
(while True), move the success logging into a while/try/else (log and return in
the else branch after no exception), on exception call _process_celery_exception
and if it returns retry_info unpack and apply retry_count and delay, sleep and
continue, otherwise re-raise the exception (do not call the function again);
also remove the unreachable fallback return at the end of the wrapper; ensure
retry_count is updated from retry_info rather than relying on the loop
condition.
backend/sample.env
Outdated
# Example: | ||
# CELERY_BACKEND_DB_NAME=unstract_celery_db | ||
|
||
# Database connection retry settings (for handling connection drops with PgBouncer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PgBouncer is not part of the OSS, is this still required then?
backend/backend/celery_config.py
Outdated
print( | ||
f"[Celery Config] Using built-in retry: max_retries={result_backend_max_retries}, " | ||
) | ||
print( | ||
f"base_sleep={result_backend_base_sleep_between_retries_ms}ms, max_sleep={result_backend_max_sleep_between_retries_ms}ms" | ||
) | ||
else: | ||
# Custom retry is handled by patch_celery_database_backend() | ||
print("[Celery Config] Using custom retry system (patching enabled)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@muhammad-ali-e use a logger instead of print statements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (5)
backend/backend/celery_db_retry.py (2)
28-35
: Broaden detection to all SQLAlchemy errors, not just OperationalError.This regresses the earlier fix; use SQLAlchemyError to include InterfaceError/DBAPIError subclasses.
def _is_sqlalchemy_error(exception: Exception) -> bool: """Check if exception is a SQLAlchemy error.""" try: - from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError - - return isinstance(exception, SQLAlchemyOperationalError) + from sqlalchemy.exc import SQLAlchemyError + return isinstance(exception, SQLAlchemyError) except ImportError: return False
234-261
: Retry loop can exit early and then re-executes the operation once more; make loop sentinelless and drop fallback.Current cap (retry_count <= max_retries) fights dynamic settings; the fallback “return func()” risks a duplicate call. Use try/else within an infinite loop.
- def wrapper(*args, **kwargs) -> Any: + def wrapper(*args, **kwargs) -> Any: logger.debug( LogMessages.format_message( LogMessages.CELERY_OPERATION_START, operation=func.__name__ ) ) retry_count = 0 - while retry_count <= max_retries: - try: - result = func(*args, **kwargs) - _log_success(retry_count) - return result - except Exception as e: - retry_info = _process_celery_exception( - e, retry_count, max_retries, base_delay, max_delay - ) - if retry_info: - retry_count, delay = retry_info - time.sleep(delay) - continue - - # This should never be reached, but included for completeness - return func(*args, **kwargs) + while True: + try: + result = func(*args, **kwargs) + except Exception as e: + retry_info = _process_celery_exception( + e, retry_count, max_retries, base_delay, max_delay + ) + # Not retryable: _process_celery_exception raises. + retry_count, delay = retry_info # type: ignore[misc] + time.sleep(delay) + continue + else: + _log_success(retry_count) + return resultbackend/utils/db_retry.py (3)
207-218
: Catch only DB connection errors; let other exceptions propagate.Avoids masking non-DB bugs and satisfies BLE001.
- except Exception as e: + except (OperationalError, InterfaceError) as e: error_type, needs_refresh, use_extended = _classify_database_error(e) if not DatabaseErrorPatterns.is_retryable_error(error_type): logger.debug( LogMessages.format_message(LogMessages.NON_RETRYABLE_ERROR, error=e) ) raise
21-25
: Docs imply unsupported behavior; clarify context manager semantics.The context manager does not (and cannot) auto-retry the block.
- # Context manager usage - with db_retry_context(max_retries=3): - model.save() - other_model.delete() + # Context manager usage (no automatic re-run) + with db_retry_context(): + model.save() # Errors will be logged; connection may be refreshed; exception re-raised. @@ - Example: - with db_retry_context(max_retries=5, force_refresh=True): - model.save() - other_model.delete() - MyModel.objects.filter(...).update(...) + Example: + with db_retry_context(force_refresh=True): + model.save()Also applies to: 396-401
417-449
: Context manager retrier is invalid (yields inside a loop). Yield exactly once; do not attempt retries here.Current code will fail with “generator didn't stop after throw” and cannot re-run the block.
@contextmanager def db_retry_context( @@ ): - """Context manager for retrying database operations on connection errors. + """Context manager for connection refresh/logging on DB connection errors. @@ - retry_count = 0 - - while retry_count <= current_settings["max_retries"]: - try: - yield - return # Success - exit the retry loop - except Exception as e: - error_type, needs_refresh, use_extended = _classify_database_error(e) - - if not DatabaseErrorPatterns.is_retryable_error(error_type): - _log_context_non_retryable_error(e) - raise - - # Update settings for extended retry if needed - current_settings = _update_context_retry_settings( - retry_count, use_extended, current_settings - ) - - if retry_count < current_settings["max_retries"]: - retry_count, delay = _handle_context_retry_logic( - retry_count, - current_settings["max_retries"], - current_settings["base_delay"], - current_settings["max_delay"], - current_settings["force_refresh"], - e, - error_type, - needs_refresh, - ) - time.sleep(delay) - continue - else: - _log_context_max_retries_exceeded(e, current_settings["max_retries"] + 1) - raise + try: + yield + except (OperationalError, InterfaceError) as e: + error_type, needs_refresh, _ = _classify_database_error(e) + if needs_refresh and current_settings["force_refresh"]: + _handle_connection_pool_refresh(e, 1, current_settings["max_retries"]) + if not DatabaseErrorPatterns.is_retryable_error(error_type): + _log_context_non_retryable_error(e) + raise
🧹 Nitpick comments (4)
backend/backend/celery_db_retry.py (2)
380-381
: Preferlogger.exception
for stack traces when handling unexpected failures.Improves debuggability without changing behavior.
- logger.error(f"Error patching Celery database backend: {e}") + logger.exception("Error patching Celery database backend") @@ - logger.error(f"Error configuring Celery built-in retry: {e}") + logger.exception("Error configuring Celery built-in retry")Also applies to: 416-418
433-447
: Add pool_timeout to avoid unbounded waits when PgBouncer is saturated.Small safety knob, sourced from centralized config.
return { # Connection health checking "pool_pre_ping": True, # Test connections before use # Minimal pooling (let PgBouncer handle the real pooling) "pool_size": 5, # Small pool since PgBouncer handles real pooling "max_overflow": 0, # No overflow, rely on PgBouncer "pool_recycle": 3600, # Recycle connections every hour + "pool_timeout": RetryConfiguration.get_setting_value( + "CELERY_DB_POOL_TIMEOUT", 30 + ), # Connection timeouts using centralized configuration "connect_args": { "connect_timeout": RetryConfiguration.get_setting_value( "CELERY_DB_CONNECT_TIMEOUT", 30 ), },backend/utils/db_retry.py (2)
253-255
: Remove unreachable fallback call.The loop already returns on success or raises on failure.
- # This should never be reached, but included for completeness - return operation() + # Unreachable
324-330
: Log with stack trace when retries are exhausted.Use exception-level logging to capture traceback.
- logger.error( + logger.exception( LogMessages.format_message( LogMessages.MAX_RETRIES_EXCEEDED, total=total_retries, error=error, ) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/backend/celery_db_retry.py
(1 hunks)backend/utils/db_retry.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py
75-75: Do not catch blind exception: Exception
(BLE001)
247-247: Consider moving this statement to an else
block
(TRY300)
248-248: Do not catch blind exception: Exception
(BLE001)
279-279: Do not catch blind exception: Exception
(BLE001)
379-379: Do not catch blind exception: Exception
(BLE001)
380-380: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
416-416: Do not catch blind exception: Exception
(BLE001)
417-417: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
backend/utils/db_retry.py
135-135: Do not catch blind exception: Exception
(BLE001)
246-250: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
420-420: Consider moving this statement to an else
block
(TRY300)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
if not _is_sqlalchemy_error(error): | ||
_log_non_sqlalchemy_error(error) | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix invalid bare raise
usage (outside except scope).
These will raise “No active exception to reraise”; re-raise the caught error explicitly.
- if not _is_sqlalchemy_error(error):
- _log_non_sqlalchemy_error(error)
- raise
+ if not _is_sqlalchemy_error(error):
+ _log_non_sqlalchemy_error(error)
+ raise error
@@
- if not DatabaseErrorPatterns.is_retryable_error(error_type):
- _log_non_retryable_error(error)
- raise
+ if not DatabaseErrorPatterns.is_retryable_error(error_type):
+ _log_non_retryable_error(error)
+ raise error
@@
- else:
- _log_max_retries_exceeded(error, current_settings["max_retries"] + 1)
- raise
+ else:
+ _log_max_retries_exceeded(error, current_settings["max_retries"] + 1)
+ raise error
Also applies to: 178-181, 192-194
🤖 Prompt for AI Agents
In backend/backend/celery_db_retry.py around lines 171-174 (and similarly at
178-181 and 192-194), the code uses bare `raise` outside of an except block
which causes "No active exception to reraise"; change those bare `raise`
statements to explicitly re-raise the caught exception (e.g., `raise error`) so
the original exception is propagated correctly, and ensure the variable name
matches the caught exception in that scope.
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (5)
backend/backend/celery_db_retry.py (3)
28-35
: Broaden SQLAlchemy exception detection to avoid missing retryable errors.Catching only OperationalError will skip InterfaceError/DBAPIError subclasses and others. Use SQLAlchemyError as the base.
-def _is_sqlalchemy_error(exception: Exception) -> bool: +def _is_sqlalchemy_error(exception: BaseException) -> bool: """Check if exception is a SQLAlchemy error.""" try: - from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError - - return isinstance(exception, SQLAlchemyOperationalError) + from sqlalchemy.exc import SQLAlchemyError + return isinstance(exception, SQLAlchemyError) except ImportError: return False
171-174
: Fix invalid bareraise
usages (outside except scope).These currently trigger “No active exception to reraise”. Re-raise the captured error explicitly.
- if not _is_sqlalchemy_error(error): - _log_non_sqlalchemy_error(error) - raise + if not _is_sqlalchemy_error(error): + _log_non_sqlalchemy_error(error) + raise error @@ - if not DatabaseErrorPatterns.is_retryable_error(error_type): - _log_non_retryable_error(error) - raise + if not DatabaseErrorPatterns.is_retryable_error(error_type): + _log_non_retryable_error(error) + raise error @@ - else: - _log_max_retries_exceeded(error, current_settings["max_retries"] + 1) - raise + else: + _log_max_retries_exceeded(error, current_settings["max_retries"] + 1) + raise errorAlso applies to: 178-181, 192-194
243-258
: Rewrite retry loop to honor dynamic retry settings and remove unreachable fallback.The capped while condition can terminate early when extended settings are returned; also the fallback call is unreachable. Move success logging to try/else and make the truthiness check explicit to avoid (0, 0) bugs.
- retry_count = 0 - while retry_count <= max_retries: - try: - result = func(*args, **kwargs) - _log_success(retry_count) - return result - except Exception as e: - retry_info = _process_celery_exception( - e, retry_count, max_retries, base_delay, max_delay - ) - if retry_info: - retry_count, delay = retry_info - time.sleep(delay) - continue - - # This should never be reached, but included for completeness - return func(*args, **kwargs) + retry_count = 0 + while True: + try: + result = func(*args, **kwargs) + except Exception as e: # noqa: BLE001 - unify retry classification path + retry_info = _process_celery_exception( + e, retry_count, max_retries, base_delay, max_delay + ) + # _process_celery_exception either raises or returns a tuple + if retry_info is not None: + retry_count, delay = retry_info + time.sleep(delay) + continue + raise + else: + _log_success(retry_count) + return resultbackend/backend/celery_config.py (2)
7-8
: Fix import path for retry helpers; current path will fail at runtime.-from backend.celery_db_retry import get_celery_db_engine_options, should_use_builtin_retry +from backend.backend.celery_db_retry import ( + get_celery_db_engine_options, + should_use_builtin_retry, +)#!/bin/bash # Verify module path resolution matches repo layout fd -a 'celery_db_retry.py' python - <<'PY' import importlib mods = [ "backend.celery_db_retry", "backend.backend.celery_db_retry", ] for m in mods: try: importlib.import_module(m) print("OK:", m) except Exception as e: print("FAIL:", m, "->", e) PY
44-46
: Usedatabase_engine_options
for SQLAlchemy backend;result_backend_transport_options
is ignored.- # Database backend engine options for PgBouncer compatibility - result_backend_transport_options = get_celery_db_engine_options() + # SQLAlchemy engine options for DatabaseBackend (passed to create_engine) + database_engine_options = get_celery_db_engine_options()#!/bin/bash # Introspect Celery's DatabaseBackend to confirm it reads database_engine_options python - <<'PY' import inspect from celery.backends.database import DatabaseBackend src = inspect.getsource(DatabaseBackend.__init__) print("database_engine_options" in src, "create_engine" in src) print("\n--- Snippet ---") for line in src.splitlines(): if "database_engine_options" in line or "create_engine" in line: print(line) PY
🧹 Nitpick comments (5)
backend/backend/celery_db_retry.py (5)
279-280
: Prefer logger.exception and document broad catch.Use logger.exception for traceback and explicitly justify the broad catch for dispose() variability.
- except Exception as e: - logger.warning(f"Could not dispose SQLAlchemy engine: {e}") + except Exception: # noqa: BLE001 - engine.dispose may raise varying exception types + logger.exception("Could not dispose SQLAlchemy engine")
377-381
: Log tracebacks on patch failure and annotate broad catch.- except Exception as e: - logger.error(f"Error patching Celery database backend: {e}") + except Exception: # noqa: BLE001 + logger.exception("Error patching Celery database backend")
416-419
: Log tracebacks on built-in retry configuration failure and annotate broad catch.- except Exception as e: - logger.error(f"Error configuring Celery built-in retry: {e}") + except Exception: # noqa: BLE001 + logger.exception("Error configuring Celery built-in retry")
60-82
: Optional: throttle pool refresh logs to reduce noise under flapping failures.Consider downgrading the first warning to info after N consecutive attempts or coalescing repeated messages.
118-145
: Backoff computation OK; consider adding jitter to avoid thundering herd.Add decorrelated jitter to spread load when many workers retry concurrently.
Example:
- delay = min( - current_settings["base_delay"] * (2**retry_count), - current_settings["max_delay"], - ) + import random + exp = current_settings["base_delay"] * (2**retry_count) + jittered = random.uniform(exp / 2.0, exp) # decorrelated jitter + delay = min(jittered, current_settings["max_delay"])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (3)
backend/backend/celery_config.py
(2 hunks)backend/backend/celery_db_retry.py
(1 hunks)backend/sample.env
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- backend/sample.env
🧰 Additional context used
🪛 Ruff (0.12.2)
backend/backend/celery_db_retry.py
75-75: Do not catch blind exception: Exception
(BLE001)
247-247: Consider moving this statement to an else
block
(TRY300)
248-248: Do not catch blind exception: Exception
(BLE001)
279-279: Do not catch blind exception: Exception
(BLE001)
379-379: Do not catch blind exception: Exception
(BLE001)
380-380: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
416-416: Do not catch blind exception: Exception
(BLE001)
417-417: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (2)
backend/backend/celery_db_retry.py (1)
314-356
: Patch guards look good and avoid double-wrapping.Nice use of wrapped and a marker attribute to prevent re-patching.
Also applies to: 358-373
backend/backend/celery_config.py (1)
63-70
: Config logging is clear; good swap between built-in vs custom paths.
Let's try to keep it simple and maybe revisit the approach? |
What
Why
How
db_constants.py
with centralized error patterns, configuration, and logging templatesdb_retry.py
with decorator and context manager for Django ORM retry logic with connection pool refreshcelery_db_retry.py
for Celery SQLAlchemy backend retry with engine disposal@db_retry()
decorator to critical workflow chain methods in workflow execution modulesCan this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Database Migrations
Env Config
DB_RETRY_MAX_RETRIES
(default: 3)DB_RETRY_BASE_DELAY
(default: 1.0 seconds)DB_RETRY_MAX_DELAY
(default: 30.0 seconds)CELERY_USE_BUILTIN_RETRY
(default: False)Relevant Docs
db_constants.py
Related Issues or PRs
Dependencies Versions
Notes on Testing
Screenshots
Checklist
I have read and understood the Contribution Guidelines.