From 4f7d68752d9f3115918b96c2ebc343885bdd8d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 11 Dec 2025 13:56:53 +0100 Subject: [PATCH] [FIX] handle race conditions that could lead to job running twice Do not wait for locks and start jobs that are not in the expected state. --- queue_job/controllers/main.py | 23 +++++++++++++++++------ queue_job/exception.py | 4 ++++ queue_job/job.py | 24 +++++++++++++----------- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 6365e6efbc..c6e4371113 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -16,7 +16,7 @@ from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY from ..delay import chain, group -from ..exception import FailedJobError, RetryableJobError +from ..exception import FailedJobError, RetryableJobError, UnexpectedJobStateError from ..job import ENQUEUED, Job _logger = logging.getLogger(__name__) @@ -29,13 +29,16 @@ class RunJobController(http.Controller): def _try_perform_job(self, env, job): """Try to perform the job.""" + + # Here we expect that the job is in 'enqueued' state and protected + # from concurrent start by the SELECT FOR UPDATE on the job record. + # that was done at the beginning of the runjob method. job.set_started() job.store() env.cr.commit() - job.lock() + job.lock() _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -94,15 +97,19 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record + # Ensure the job to run is in the correct state and lock the record. + # Don't wait for the lock, as if something has the job locked at this + # point this is an abnormal condition (job starting twice for instance), + # so it's better to let recovery mechanisms do their work if needed. env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", + "SELECT state FROM queue_job WHERE uuid=%s AND state=%s " + "FOR UPDATE SKIP LOCKED", (job_uuid, ENQUEUED), ) if not env.cr.fetchone(): _logger.warning( "was requested to run job %s, but it does not exist, " - "or is not in state %s", + "or is not in state %s, or is being handled by antother worker", job_uuid, ENQUEUED, ) @@ -123,6 +130,10 @@ def retry_postpone(job, message, seconds=None): _logger.debug("%s OperationalError, postponed", job) raise RetryableJobError(err.pgerror, seconds=PG_RETRY) from err + except UnexpectedJobStateError as err: + _logger.warning("%s did not run (%s)", job, err) + return "" + except RetryableJobError as err: # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) diff --git a/queue_job/exception.py b/queue_job/exception.py index c04bc8f0cf..81635b4a7e 100644 --- a/queue_job/exception.py +++ b/queue_job/exception.py @@ -14,6 +14,10 @@ class NoSuchJobError(JobError): """The job does not exist.""" +class UnexpectedJobStateError(JobError): + """Unexpected job state.""" + + class FailedJobError(JobError): """A job had an error having to be resolved.""" diff --git a/queue_job/job.py b/queue_job/job.py index 6cfe12f232..c2f163a730 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -13,7 +13,12 @@ import odoo -from .exception import FailedJobError, NoSuchJobError, RetryableJobError +from .exception import ( + FailedJobError, + NoSuchJobError, + RetryableJobError, + UnexpectedJobStateError, +) WAIT_DEPENDENCIES = "wait_dependencies" PENDING = "pending" @@ -243,11 +248,11 @@ def add_lock_record(self): def lock(self): """ - Lock row of job that is being performed + Lock a job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + If a job cannot be locked, it means that the job was not in 'started' + state, or is already started and locked by another worker. In that case + a UnexpectedJobStateError is thrown. """ self.env.cr.execute( """ @@ -265,16 +270,14 @@ def lock(self): uuid = %s AND state='started' ) - FOR UPDATE; + FOR UPDATE SKIP LOCKED; """, [self.uuid], ) # 1 job should be locked if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + raise UnexpectedJobStateError("not in 'started' state or already locked") @classmethod def _load_from_db_record(cls, job_db_record): @@ -856,8 +859,7 @@ def related_action(self): funcname = record._default_related_action if not isinstance(funcname, str): raise ValueError( - "related_action must be the name of the " - "method on queue.job as string" + "related_action must be the name of the method on queue.job as string" ) action = getattr(record, funcname) action_kwargs = self.job_config.related_action_kwargs