Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..delay import chain, group
from ..exception import FailedJobError, RetryableJobError
from ..exception import FailedJobError, RetryableJobError, UnexpectedJobStateError
from ..job import ENQUEUED, Job

_logger = logging.getLogger(__name__)
Expand All @@ -29,13 +29,16 @@
class RunJobController(http.Controller):
def _try_perform_job(self, env, job):
"""Try to perform the job."""

# Here we expect that the job is in 'enqueued' state and protected
# from concurrent start by the SELECT FOR UPDATE on the job record.
# that was done at the beginning of the runjob method.
job.set_started()
job.store()
env.cr.commit()
job.lock()

job.lock()
_logger.debug("%s started", job)

job.perform()
# Triggers any stored computed fields before calling 'set_done'
# so that will be part of the 'exec_time'
Expand Down Expand Up @@ -94,15 +97,19 @@ def retry_postpone(job, message, seconds=None):
job.set_pending(reset_retry=False)
job.store()

# ensure the job to run is in the correct state and lock the record
# Ensure the job to run is in the correct state and lock the record.
# Don't wait for the lock, as if something has the job locked at this
# point this is an abnormal condition (job starting twice for instance),
# so it's better to let recovery mechanisms do their work if needed.
env.cr.execute(
"SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE",
"SELECT state FROM queue_job WHERE uuid=%s AND state=%s "
"FOR UPDATE SKIP LOCKED",
(job_uuid, ENQUEUED),
)
if not env.cr.fetchone():
_logger.warning(
"was requested to run job %s, but it does not exist, "
"or is not in state %s",
"or is not in state %s, or is being handled by antother worker",
job_uuid,
ENQUEUED,
)
Expand All @@ -123,6 +130,10 @@ def retry_postpone(job, message, seconds=None):
_logger.debug("%s OperationalError, postponed", job)
raise RetryableJobError(err.pgerror, seconds=PG_RETRY) from err

except UnexpectedJobStateError as err:
_logger.warning("%s did not run (%s)", job, err)
return ""

except RetryableJobError as err:
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
Expand Down
4 changes: 4 additions & 0 deletions queue_job/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class NoSuchJobError(JobError):
"""The job does not exist."""


class UnexpectedJobStateError(JobError):
"""Unexpected job state."""


class FailedJobError(JobError):
"""A job had an error having to be resolved."""

Expand Down
24 changes: 13 additions & 11 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

import odoo

from .exception import FailedJobError, NoSuchJobError, RetryableJobError
from .exception import (
FailedJobError,
NoSuchJobError,
RetryableJobError,
UnexpectedJobStateError,
)

WAIT_DEPENDENCIES = "wait_dependencies"
PENDING = "pending"
Expand Down Expand Up @@ -243,11 +248,11 @@ def add_lock_record(self):

def lock(self):
"""
Lock row of job that is being performed
Lock a job that is being performed.

If a job cannot be locked,
it means that the job wasn't started,
a RetryableJobError is thrown.
If a job cannot be locked, it means that the job was not in 'started'
state, or is already started and locked by another worker. In that case
a UnexpectedJobStateError is thrown.
"""
self.env.cr.execute(
"""
Expand All @@ -265,16 +270,14 @@ def lock(self):
uuid = %s
AND state='started'
)
FOR UPDATE;
FOR UPDATE SKIP LOCKED;
""",
[self.uuid],
)

# 1 job should be locked
if 1 != len(self.env.cr.fetchall()):
raise RetryableJobError(
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
)
raise UnexpectedJobStateError("not in 'started' state or already locked")

@classmethod
def _load_from_db_record(cls, job_db_record):
Expand Down Expand Up @@ -856,8 +859,7 @@ def related_action(self):
funcname = record._default_related_action
if not isinstance(funcname, str):
raise ValueError(
"related_action must be the name of the "
"method on queue.job as string"
"related_action must be the name of the method on queue.job as string"
)
action = getattr(record, funcname)
action_kwargs = self.job_config.related_action_kwargs
Expand Down