Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 79 additions & 14 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,62 @@ class ChannelManager:
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[]

Test handling of duplicate job UUIDs across different databases.
This can happen when a database is duplicated/cloned and both databases
notify the same job runner with the same job UUID.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,D:2')

Create a job in db1

>>> cm.notify('db1', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending')
>>> job = cm._jobs_by_uuid.get('dup-uuid-1')
>>> job.db_name
'db1'
>>> len(cm._jobs_by_uuid)
1

Same UUID notified from a different database (db2) - simulates cloned DB.
The job should be recreated for the new database.

>>> cm.notify('db2', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending')
>>> job = cm._jobs_by_uuid.get('dup-uuid-1')
>>> job.db_name
'db2'
>>> len(cm._jobs_by_uuid)
1

Verify job can be run from the new database

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob dup-uuid-1>]

Test multiple database switches for the same UUID

>>> cm.notify('db3', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending')
>>> job = cm._jobs_by_uuid.get('dup-uuid-1')
>>> job.db_name
'db3'

Create another job with different UUID in db1 and verify it coexists

>>> cm.notify('db1', 'D', 'other-uuid', 2, 0, 10, None, 'pending')
>>> len(cm._jobs_by_uuid)
2
>>> cm._jobs_by_uuid.get('dup-uuid-1').db_name
'db3'
>>> cm._jobs_by_uuid.get('other-uuid').db_name
'db1'

Both jobs should be available to run

>>> jobs = list(cm.get_jobs_to_run(now=100))
>>> len(jobs)
2
>>> sorted([j.uuid for j in jobs])
['dup-uuid-1', 'other-uuid']
"""

def __init__(self):
Expand Down Expand Up @@ -1029,22 +1085,31 @@ def notify(
channel = self.get_channel_by_name(channel_name, parent_fallback=True)
job = self._jobs_by_uuid.get(uuid)
if job:
# db_name is invariant
assert job.db_name == db_name
# date_created is invariant
assert job.date_created == date_created
# if one of the job properties that influence
# scheduling order has changed, we remove the job
# from the queues and create a new job object
if (
seq != job.seq
or priority != job.priority
or eta != job.eta
or channel != job.channel
):
_logger.debug("job %s properties changed, rescheduling it", uuid)
# if db_name differs, this is likely a cloned database
if job.db_name != db_name:
_logger.warning(
"job %s exists in multiple databases (%s and %s). recreating it",
uuid,
job.db_name,
db_name,
)
self.remove_job(uuid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure this is a correct thing to do. We still end up with two jobs with the same uuid in the queue and that's going to be confusing, if only when reading the logs. I suspect it will also break things down the line.

If that case is important to you, perhaps you could have a little script that updates uuid and dbname in the jobs of the cloned database?

job = None
else:
# date_created is invariant
assert job.date_created == date_created
# if one of the job properties that influence
# scheduling order has changed, we remove the job
# from the queues and create a new job object
if (
seq != job.seq
or priority != job.priority
or eta != job.eta
or channel != job.channel
):
_logger.debug("job %s properties changed, rescheduling it", uuid)
self.remove_job(uuid)
job = None
if not job:
job = ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta)
self._jobs_by_uuid[uuid] = job
Expand Down