Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ a job, is transferred to the job according to an allow-list.

The default allow-list is `("tz", "lang", "allowed_company_ids", "force_company", "active_test")`. It can
be customized in ``Base._job_prepare_context_before_enqueue_keys``.

**Bypass jobs on running Odoo**

When you are developing (ie: connector modules) you might want
Expand Down
8 changes: 7 additions & 1 deletion queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ def _enqueue_dependent_jobs(self, env, job):
else:
break

@http.route("/queue_job/runjob", type="http", auth="none", save_session=False)
@http.route(
"/queue_job/runjob",
type="http",
auth="none",
save_session=False,
readonly=False,
)
def runjob(self, db, job_uuid, **kw):
http.request.session.db = db
env = http.request.env(user=SUPERUSER_ID)
Expand Down
71 changes: 0 additions & 71 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import uuid
import weakref
from datetime import datetime, timedelta
from functools import total_ordering
from random import randint

import odoo
Expand Down Expand Up @@ -104,7 +103,6 @@ def identity_exact_hasher(job_):
return hasher


@total_ordering
class Job:
"""A Job is a task to execute. It is the in-memory representation of a job.

Expand Down Expand Up @@ -367,65 +365,6 @@ def job_record_with_same_identity_key(self):
)
return existing

# TODO to deprecate (not called anymore)
@classmethod
def enqueue(
cls,
func,
args=None,
kwargs=None,
priority=None,
eta=None,
max_retries=None,
description=None,
channel=None,
identity_key=None,
):
"""Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted
from the ones to pass to the job function.

If the identity key is the same than the one in a pending job,
no job is created and the existing job is returned

"""
new_job = cls(
func=func,
args=args,
kwargs=kwargs,
priority=priority,
eta=eta,
max_retries=max_retries,
description=description,
channel=channel,
identity_key=identity_key,
)
return new_job._enqueue_job()

# TODO to deprecate (not called anymore)
def _enqueue_job(self):
if self.identity_key:
existing = self.job_record_with_same_identity_key()
if existing:
_logger.debug(
"a job has not been enqueued due to having "
"the same identity key (%s) than job %s",
self.identity_key,
existing.uuid,
)
return Job._load_from_db_record(existing)
self.store()
_logger.debug(
"enqueued %s:%s(*%r, **%r) with uuid: %s",
self.recordset,
self.method_name,
self.args,
self.kwargs,
self.uuid,
)
return self

@staticmethod
def db_record_from_uuid(env, job_uuid):
# TODO remove in 15.0 or 16.0
Expand Down Expand Up @@ -749,16 +688,6 @@ def __eq__(self, other):
def __hash__(self):
return self.uuid.__hash__()

def sorting_key(self):
return self.eta, self.priority, self.date_created, self.seq

def __lt__(self, other):
if self.eta and not other.eta:
return True
elif not self.eta and other.eta:
return False
return self.sorting_key() < other.sorting_key()

def db_record(self):
return self.db_records_from_uuids(self.env, [self.uuid])

Expand Down
55 changes: 39 additions & 16 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import logging
from collections import namedtuple
from functools import total_ordering
from heapq import heappop, heappush
from weakref import WeakValueDictionary
Expand All @@ -10,6 +11,7 @@
from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES

NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED)
JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq")

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,7 +110,7 @@
job that are necessary to prioritise them.

Channel jobs are comparable according to the following rules:
* jobs with an eta come before all other jobs
* jobs with an eta cannot be compared with jobs without
* then jobs with a smaller eta come first
* then jobs with a smaller priority come first
* then jobs with a smaller creation time come first
Expand All @@ -135,14 +137,18 @@
>>> j3 < j1
True

j4 and j5 comes even before j3, because they have an eta
j4 and j5 have an eta, they cannot be compared with j3

>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=9, eta=9)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=9, eta=9)
>>> j4 < j5 < j3
>>> j4 < j5
True
>>> j4 < j3
Traceback (most recent call last):
...
TypeError: '<' not supported between instances of 'int' and 'NoneType'

j6 has same date_created and priority as j5 but a smaller eta

Expand All @@ -153,7 +159,7 @@

Here is the complete suite:

>>> j6 < j4 < j5 < j3 < j1 < j2
>>> j6 < j4 < j5 and j3 < j1 < j2
True

j0 has the same properties as j1 but they are not considered
Expand All @@ -173,14 +179,13 @@

"""

__slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__")

def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta):
self.db_name = db_name
self.channel = channel
self.uuid = uuid
self.seq = seq
self.date_created = date_created
self.priority = priority
self.eta = eta
self._sorting_key = JobSortingKey(eta, priority, date_created, seq)

def __repr__(self):
return "<ChannelJob %s>" % self.uuid
Expand All @@ -191,18 +196,36 @@
def __hash__(self):
return id(self)

def set_no_eta(self):
self._sorting_key = JobSortingKey(None, *self._sorting_key[1:])

@property
def seq(self):
return self._sorting_key.seq

@property
def date_created(self):
return self._sorting_key.date_created

@property
def priority(self):
return self._sorting_key.priority

@property
def eta(self):
return self._sorting_key.eta

def sorting_key(self):
return self.eta, self.priority, self.date_created, self.seq
# DEPRECATED
return self._sorting_key

Check warning on line 220 in queue_job/jobrunner/channels.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/channels.py#L220

Added line #L220 was not covered by tests

def sorting_key_ignoring_eta(self):
return self.priority, self.date_created, self.seq
return self._sorting_key[1:]

def __lt__(self, other):
if self.eta and not other.eta:
return True
elif not self.eta and other.eta:
return False
return self.sorting_key() < other.sorting_key()
# Do not compare job where ETA is set with job where it is not
# If one job 'eta' is set, and the other is None, it raises TypeError
return self._sorting_key < other._sorting_key


class ChannelQueue:
Expand Down Expand Up @@ -312,7 +335,7 @@
def pop(self, now):
while self._eta_queue and self._eta_queue[0].eta <= now:
eta_job = self._eta_queue.pop()
eta_job.eta = None
eta_job.set_no_eta()
self._queue.add(eta_job)
if self.sequential and self._eta_queue and self._queue:
eta_job = self._eta_queue[0]
Expand Down
11 changes: 11 additions & 0 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,17 @@ def __init__(
self._stop = False
self._stop_pipe = os.pipe()

def __del__(self):
# pylint: disable=except-pass
try:
os.close(self._stop_pipe[0])
except OSError:
pass
try:
os.close(self._stop_pipe[1])
except OSError:
pass

@classmethod
def from_environ_or_config(cls):
scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get(
Expand Down
20 changes: 13 additions & 7 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime, timedelta

from odoo import _, api, exceptions, fields, models
from odoo.tools import config, html_escape
from odoo.tools import config, html_escape, index_exists

from odoo.addons.base_sparse_field.models.fields import Serialized

Expand Down Expand Up @@ -91,7 +91,7 @@ class QueueJob(models.Model):
func_string = fields.Char(string="Task", readonly=True)

state = fields.Selection(STATES, readonly=True, required=True, index=True)
priority = fields.Integer()
priority = fields.Integer(group_operator=False)
exc_name = fields.Char(string="Exception", readonly=True)
exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True)
exc_info = fields.Text(string="Exception Info", readonly=True)
Expand Down Expand Up @@ -130,16 +130,21 @@ class QueueJob(models.Model):
worker_pid = fields.Integer(readonly=True)

def init(self):
self._cr.execute(
"SELECT indexname FROM pg_indexes WHERE indexname = %s ",
("queue_job_identity_key_state_partial_index",),
)
if not self._cr.fetchone():
index_1 = "queue_job_identity_key_state_partial_index"
index_2 = "queue_job_channel_date_done_date_created_index"
if not index_exists(self._cr, index_1):
# Used by Job.job_record_with_same_identity_key
self._cr.execute(
"CREATE INDEX queue_job_identity_key_state_partial_index "
"ON queue_job (identity_key) WHERE state in ('pending', "
"'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;"
)
if not index_exists(self._cr, index_2):
# Used by <queue.job>.autovacuum
self._cr.execute(
"CREATE INDEX queue_job_channel_date_done_date_created_index "
"ON queue_job (channel, date_done, date_created);"
)

@api.depends("records")
def _compute_record_ids(self):
Expand Down Expand Up @@ -408,6 +413,7 @@ def autovacuum(self):
("date_cancelled", "<=", deadline),
("channel", "=", channel.complete_name),
],
order="date_done, date_created",
limit=1000,
)
if jobs:
Expand Down
8 changes: 8 additions & 0 deletions queue_job/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def _add_job(self, *args, **kwargs):
if not job.identity_key or all(
j.identity_key != job.identity_key for j in self.enqueued_jobs
):
self._prepare_context(job)
self.enqueued_jobs.append(job)

patcher = mock.patch.object(job, "store")
Expand All @@ -274,6 +275,13 @@ def _add_job(self, *args, **kwargs):
)
return job

def _prepare_context(self, job):
# pylint: disable=context-overridden
job_model = job.job_model.with_context({})
field_records = job_model._fields["records"]
# Filter the context to simulate store/load of the job
job.recordset = field_records.convert_to_write(job.recordset, job_model)

def __enter__(self):
return self

Expand Down
49 changes: 49 additions & 0 deletions queue_job/tests/test_runner_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,57 @@

# pylint: disable=odoo-addons-relative-import
# we are testing, we want to test as we were an external consumer of the API
import os

from odoo.tests import BaseCase, tagged

from odoo.addons.queue_job.jobrunner import runner

from .common import load_doctests

load_tests = load_doctests(runner)


@tagged("-at_install", "post_install")
class TestRunner(BaseCase):
@classmethod
def _is_open_file_descriptor(cls, fd):
try:
os.fstat(fd)
return True
except OSError:
return False

def test_runner_file_descriptor(self):
a_runner = runner.QueueJobRunner.from_environ_or_config()

read_fd, write_fd = a_runner._stop_pipe
self.assertTrue(self._is_open_file_descriptor(read_fd))
self.assertTrue(self._is_open_file_descriptor(write_fd))

del a_runner

self.assertFalse(self._is_open_file_descriptor(read_fd))
self.assertFalse(self._is_open_file_descriptor(write_fd))

def test_runner_file_closed_read_descriptor(self):
a_runner = runner.QueueJobRunner.from_environ_or_config()

read_fd, write_fd = a_runner._stop_pipe
os.close(read_fd)

del a_runner

self.assertFalse(self._is_open_file_descriptor(read_fd))
self.assertFalse(self._is_open_file_descriptor(write_fd))

def test_runner_file_closed_write_descriptor(self):
a_runner = runner.QueueJobRunner.from_environ_or_config()

read_fd, write_fd = a_runner._stop_pipe
os.close(write_fd)

del a_runner

self.assertFalse(self._is_open_file_descriptor(read_fd))
self.assertFalse(self._is_open_file_descriptor(write_fd))
Loading