From dd36d2393ab9f07b2c85c64a6327d291e63e4548 Mon Sep 17 00:00:00 2001 From: Vincent Hatakeyama Date: Fri, 24 Jan 2025 11:32:02 +0100 Subject: [PATCH 01/12] [FIX] queue_job: indicate that run_job need a read/write connection Without this fix, if db_replica_host is set, Odoo might pass a readonly database cursor and the FOR UPDATE in the method would fail. --- queue_job/controllers/main.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index fce3049fa..effc1a728 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -74,7 +74,13 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route("/queue_job/runjob", type="http", auth="none", save_session=False) + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) def runjob(self, db, job_uuid, **kw): http.request.session.db = db env = http.request.env(user=SUPERUSER_ID) From e095b069591ade60e47fbd1594e70aabf6a4a3cd Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Fri, 17 Jan 2025 18:40:27 +0100 Subject: [PATCH 02/12] [IMP] queue_job: perform_enqueued_jobs should filter the context --- queue_job/tests/common.py | 8 ++++++++ test_queue_job/tests/test_delay_mocks.py | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index ccd63cff1..434eadfc3 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -255,6 +255,7 @@ def _add_job(self, *args, **kwargs): if not job.identity_key or all( j.identity_key != job.identity_key for j in self.enqueued_jobs ): + self._prepare_context(job) self.enqueued_jobs.append(job) patcher = mock.patch.object(job, "store") @@ -273,6 +274,13 @@ def _add_job(self, *args, **kwargs): ) return job + def _prepare_context(self, job): + # pylint: disable=context-overridden + job_model = job.job_model.with_context({}) + field_records = job_model._fields["records"] + # Filter the context to simulate store/load of the job + job.recordset = field_records.convert_to_write(job.recordset, job_model) + def __enter__(self): return self diff --git a/test_queue_job/tests/test_delay_mocks.py b/test_queue_job/tests/test_delay_mocks.py index e9e071fd0..b910ea15e 100644 --- a/test_queue_job/tests/test_delay_mocks.py +++ b/test_queue_job/tests/test_delay_mocks.py @@ -269,6 +269,22 @@ def test_trap_jobs_perform(self): self.assertEqual(logs[2].message, "test_trap_jobs_perform graph 3") self.assertEqual(logs[3].message, "test_trap_jobs_perform graph 1") + def test_trap_jobs_prepare_context(self): + # pylint: disable=context-overridden + with trap_jobs() as trap: + model1 = self.env["test.queue.job"].with_context({"config_key": 42}) + model2 = self.env["test.queue.job"].with_context( + {"config_key": 42, "lang": "it_IT"} + ) + model1.with_delay().testing_method("0", "K", return_context=1) + model2.with_delay().testing_method("0", "K", return_context=1) + + [job1, job2] = trap.enqueued_jobs + trap.perform_enqueued_jobs() + + self.assertEqual(job1.result, {"job_uuid": mock.ANY}) + self.assertEqual(job2.result, {"job_uuid": mock.ANY, "lang": "it_IT"}) + def test_mock_with_delay(self): with mock_with_delay() as (delayable_cls, delayable): self.env["test.queue.job"].button_that_uses_with_delay() From 1cde603a3ceb6b5bb0d811970ea6ce96ad092daa Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Tue, 4 Feb 2025 22:12:28 +0100 Subject: [PATCH 03/12] [IMP] queue_job: explain context in docstring --- queue_job/README.rst | 5 +++-- test_queue_job/tests/test_delay_mocks.py | 6 ++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/queue_job/README.rst b/queue_job/README.rst index f5a0ba797..d92e1c2bd 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -466,8 +466,9 @@ allow-list. The default allow-list is ("tz", "lang", "allowed_company_ids", "force_company", "active_test"). It can be customized in -``Base._job_prepare_context_before_enqueue_keys``. **Bypass jobs on -running Odoo** +``Base._job_prepare_context_before_enqueue_keys``. + +**Bypass jobs on running Odoo** When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately. diff --git a/test_queue_job/tests/test_delay_mocks.py b/test_queue_job/tests/test_delay_mocks.py index b910ea15e..ebc8dab3d 100644 --- a/test_queue_job/tests/test_delay_mocks.py +++ b/test_queue_job/tests/test_delay_mocks.py @@ -270,6 +270,12 @@ def test_trap_jobs_perform(self): self.assertEqual(logs[3].message, "test_trap_jobs_perform graph 1") def test_trap_jobs_prepare_context(self): + """Context is transferred to the job according to an allow-list. + + Default allow-list is: + ("tz", "lang", "allowed_company_ids", "force_company", "active_test") + It can be customized in ``Base._job_prepare_context_before_enqueue_keys``. + """ # pylint: disable=context-overridden with trap_jobs() as trap: model1 = self.env["test.queue.job"].with_context({"config_key": 42}) From b493d54716aa680646f01f4df403c90b2be5e007 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 2 Jun 2025 13:50:28 +0200 Subject: [PATCH 04/12] [IMP] queue_job: use __slots__ for ChannelJob It decreases memory footprint when there's thousands of jobs to prioritize. --- queue_job/jobrunner/channels.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 6e33a7318..a88e1b92f 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -173,6 +173,8 @@ class ChannelJob: """ + __slots__ = ("db_name", "channel", "uuid", "seq", "date_created", "priority", "eta") + def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name self.channel = channel From 5e5536219179a7c54b462ef2a9660a0b5d789a70 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Tue, 3 Jun 2025 15:08:57 +0200 Subject: [PATCH 05/12] [FIX] queue_job: missing slot for __weakref__ --- queue_job/jobrunner/channels.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index a88e1b92f..fb9aaced6 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -173,7 +173,16 @@ class ChannelJob: """ - __slots__ = ("db_name", "channel", "uuid", "seq", "date_created", "priority", "eta") + __slots__ = ( + "db_name", + "channel", + "uuid", + "seq", + "date_created", + "priority", + "eta", + "__weakref__", + ) def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name From 72df40033739485f588ec95b6015dbbce1863ff1 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 2 Jun 2025 09:45:50 +0200 Subject: [PATCH 06/12] [REF] queue_job: remove deprecated and not used methods --- queue_job/job.py | 71 ------------------------------------------------ 1 file changed, 71 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index a473be5cd..594f1948a 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -9,7 +9,6 @@ import uuid import weakref from datetime import datetime, timedelta -from functools import total_ordering from random import randint import odoo @@ -104,7 +103,6 @@ def identity_exact_hasher(job_): return hasher -@total_ordering class Job: """A Job is a task to execute. It is the in-memory representation of a job. @@ -367,65 +365,6 @@ def job_record_with_same_identity_key(self): ) return existing - # TODO to deprecate (not called anymore) - @classmethod - def enqueue( - cls, - func, - args=None, - kwargs=None, - priority=None, - eta=None, - max_retries=None, - description=None, - channel=None, - identity_key=None, - ): - """Create a Job and enqueue it in the queue. Return the job uuid. - - This expects the arguments specific to the job to be already extracted - from the ones to pass to the job function. - - If the identity key is the same than the one in a pending job, - no job is created and the existing job is returned - - """ - new_job = cls( - func=func, - args=args, - kwargs=kwargs, - priority=priority, - eta=eta, - max_retries=max_retries, - description=description, - channel=channel, - identity_key=identity_key, - ) - return new_job._enqueue_job() - - # TODO to deprecate (not called anymore) - def _enqueue_job(self): - if self.identity_key: - existing = self.job_record_with_same_identity_key() - if existing: - _logger.debug( - "a job has not been enqueued due to having " - "the same identity key (%s) than job %s", - self.identity_key, - existing.uuid, - ) - return Job._load_from_db_record(existing) - self.store() - _logger.debug( - "enqueued %s:%s(*%r, **%r) with uuid: %s", - self.recordset, - self.method_name, - self.args, - self.kwargs, - self.uuid, - ) - return self - @staticmethod def db_record_from_uuid(env, job_uuid): # TODO remove in 15.0 or 16.0 @@ -749,16 +688,6 @@ def __eq__(self, other): def __hash__(self): return self.uuid.__hash__() - def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq - - def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() - def db_record(self): return self.db_records_from_uuids(self.env, [self.uuid]) From 56ea1c37a7b68afeff235b1ff18321a6514162a2 Mon Sep 17 00:00:00 2001 From: Pierre Verkest Date: Fri, 14 Mar 2025 00:18:38 +0100 Subject: [PATCH 07/12] =?UTF-8?q?[FIX]=C2=A0queue=5Fjob:=20job=20runner=20?= =?UTF-8?q?open=20pipe=20that=20are=20never=20closed=20properly?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this make a lot of open file descriptors that we got the limit while instentiate jobrunner from queue_job_cron_jobrunner in the current channel implementation https://github.com/OCA/queue/pull/750 --- queue_job/jobrunner/runner.py | 11 ++++++ queue_job/tests/test_runner_runner.py | 49 +++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 48bf51020..f84531f49 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -435,6 +435,17 @@ def __init__( self._stop = False self._stop_pipe = os.pipe() + def __del__(self): + # pylint: disable=except-pass + try: + os.close(self._stop_pipe[0]) + except OSError: + pass + try: + os.close(self._stop_pipe[1]) + except OSError: + pass + @classmethod def from_environ_or_config(cls): scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get( diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index c6486e27e..131ce6322 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -3,8 +3,57 @@ # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API +import os + +from odoo.tests import BaseCase, tagged + from odoo.addons.queue_job.jobrunner import runner from .common import load_doctests load_tests = load_doctests(runner) + + +@tagged("-at_install", "post_install") +class TestRunner(BaseCase): + @classmethod + def _is_open_file_descriptor(cls, fd): + try: + os.fstat(fd) + return True + except OSError: + return False + + def test_runner_file_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + self.assertTrue(self._is_open_file_descriptor(read_fd)) + self.assertTrue(self._is_open_file_descriptor(write_fd)) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) + + def test_runner_file_closed_read_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + os.close(read_fd) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) + + def test_runner_file_closed_write_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + os.close(write_fd) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) From 74037be632e9b4db346afb8cc84b803e4aec60d4 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 26 May 2025 15:24:51 +0200 Subject: [PATCH 08/12] [IMP] queue_job: add Priority to Group-By and search --- queue_job/models/queue_job.py | 2 +- queue_job/views/queue_job_views.xml | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 55ee7e526..ad82d57b4 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -91,7 +91,7 @@ class QueueJob(models.Model): func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) - priority = fields.Integer() + priority = fields.Integer(group_operator=False) exc_name = fields.Char(string="Exception", readonly=True) exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True) exc_info = fields.Text(string="Exception Info", readonly=True) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index be12b4294..ddbb4c5e1 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -167,6 +167,7 @@ /> + @@ -211,6 +212,7 @@ + @@ -283,6 +285,11 @@ string="State" context="{'group_by': 'state'}" /> + Date: Tue, 3 Jun 2025 08:58:27 +0200 Subject: [PATCH 09/12] [IMP] queue_job: more efficient ChannelJob sorting --- queue_job/jobrunner/channels.py | 64 +++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index fb9aaced6..cb0a23f01 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -2,6 +2,7 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) import logging +from collections import namedtuple from functools import total_ordering from heapq import heappop, heappush from weakref import WeakValueDictionary @@ -10,6 +11,7 @@ from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) +JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq") _logger = logging.getLogger(__name__) @@ -108,7 +110,7 @@ class ChannelJob: job that are necessary to prioritise them. Channel jobs are comparable according to the following rules: - * jobs with an eta come before all other jobs + * jobs with an eta cannot be compared with jobs without * then jobs with a smaller eta come first * then jobs with a smaller priority come first * then jobs with a smaller creation time come first @@ -135,14 +137,18 @@ class ChannelJob: >>> j3 < j1 True - j4 and j5 comes even before j3, because they have an eta + j4 and j5 have an eta, they cannot be compared with j3 >>> j4 = ChannelJob(None, None, 4, ... seq=0, date_created=4, priority=9, eta=9) >>> j5 = ChannelJob(None, None, 5, ... seq=0, date_created=5, priority=9, eta=9) - >>> j4 < j5 < j3 + >>> j4 < j5 True + >>> j4 < j3 + Traceback (most recent call last): + ... + TypeError: '<' not supported between instances of 'int' and 'NoneType' j6 has same date_created and priority as j5 but a smaller eta @@ -153,7 +159,7 @@ class ChannelJob: Here is the complete suite: - >>> j6 < j4 < j5 < j3 < j1 < j2 + >>> j6 < j4 < j5 and j3 < j1 < j2 True j0 has the same properties as j1 but they are not considered @@ -173,25 +179,13 @@ class ChannelJob: """ - __slots__ = ( - "db_name", - "channel", - "uuid", - "seq", - "date_created", - "priority", - "eta", - "__weakref__", - ) + __slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__") def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name self.channel = channel self.uuid = uuid - self.seq = seq - self.date_created = date_created - self.priority = priority - self.eta = eta + self._sorting_key = JobSortingKey(eta, priority, date_created, seq) def __repr__(self): return "" % self.uuid @@ -202,18 +196,36 @@ def __eq__(self, other): def __hash__(self): return id(self) + def set_no_eta(self): + self._sorting_key = JobSortingKey(None, *self._sorting_key[1:]) + + @property + def seq(self): + return self._sorting_key.seq + + @property + def date_created(self): + return self._sorting_key.date_created + + @property + def priority(self): + return self._sorting_key.priority + + @property + def eta(self): + return self._sorting_key.eta + def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq + # DEPRECATED + return self._sorting_key def sorting_key_ignoring_eta(self): - return self.priority, self.date_created, self.seq + return self._sorting_key[1:] def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() + # Do not compare job where ETA is set with job where it is not + # If one job 'eta' is set, and the other is None, it raises TypeError + return self._sorting_key < other._sorting_key class ChannelQueue: @@ -323,7 +335,7 @@ def remove(self, job): def pop(self, now): while self._eta_queue and self._eta_queue[0].eta <= now: eta_job = self._eta_queue.pop() - eta_job.eta = None + eta_job.set_no_eta() self._queue.add(eta_job) if self.sequential and self._eta_queue and self._queue: eta_job = self._eta_queue[0] From e44df055e107388f4d2c99ee2e70e4140ba829cd Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Wed, 11 Jun 2025 11:52:51 +0200 Subject: [PATCH 10/12] [IMP] queue_job: filter for retried jobs --- queue_job/views/queue_job_views.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index ddbb4c5e1..ba9de108b 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -254,6 +254,12 @@ domain="[('state', '=', 'cancelled')]" /> + + Date: Wed, 11 Jun 2025 12:13:08 +0200 Subject: [PATCH 11/12] [IMP] queue_job: set the columns optional in list view --- queue_job/views/queue_job_views.xml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index ba9de108b..bee17b2b6 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -156,7 +156,7 @@ decoration-muted="state == 'done'" > - + - - + + - - - - + + + + From d589140c54afc272666cfefc8ee8c584f46fb2b8 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Wed, 25 Jun 2025 09:24:52 +0200 Subject: [PATCH 12/12] [IMP] queue_job: add index for efficient autovacuum --- queue_job/models/queue_job.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index ad82d57b4..42ddf8c93 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.tools import config, html_escape +from odoo.tools import config, html_escape, index_exists from odoo.addons.base_sparse_field.models.fields import Serialized @@ -130,16 +130,21 @@ class QueueJob(models.Model): worker_pid = fields.Integer(readonly=True) def init(self): - self._cr.execute( - "SELECT indexname FROM pg_indexes WHERE indexname = %s ", - ("queue_job_identity_key_state_partial_index",), - ) - if not self._cr.fetchone(): + index_1 = "queue_job_identity_key_state_partial_index" + index_2 = "queue_job_channel_date_done_date_created_index" + if not index_exists(self._cr, index_1): + # Used by Job.job_record_with_same_identity_key self._cr.execute( "CREATE INDEX queue_job_identity_key_state_partial_index " "ON queue_job (identity_key) WHERE state in ('pending', " "'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;" ) + if not index_exists(self._cr, index_2): + # Used by .autovacuum + self._cr.execute( + "CREATE INDEX queue_job_channel_date_done_date_created_index " + "ON queue_job (channel, date_done, date_created);" + ) @api.depends("records") def _compute_record_ids(self): @@ -405,6 +410,7 @@ def autovacuum(self): ("date_cancelled", "<=", deadline), ("channel", "=", channel.complete_name), ], + order="date_done, date_created", limit=1000, ) if jobs: