diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index c895d9caf..028409c67 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -799,6 +799,62 @@ class ChannelManager: >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done') >>> pp(list(cm.get_jobs_to_run(now=105))) [] + + Test handling of duplicate job UUIDs across different databases. + This can happen when a database is duplicated/cloned and both databases + notify the same job runner with the same job UUID. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,D:2') + + Create a job in db1 + + >>> cm.notify('db1', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending') + >>> job = cm._jobs_by_uuid.get('dup-uuid-1') + >>> job.db_name + 'db1' + >>> len(cm._jobs_by_uuid) + 1 + + Same UUID notified from a different database (db2) - simulates cloned DB. + The job should be recreated for the new database. + + >>> cm.notify('db2', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending') + >>> job = cm._jobs_by_uuid.get('dup-uuid-1') + >>> job.db_name + 'db2' + >>> len(cm._jobs_by_uuid) + 1 + + Verify job can be run from the new database + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + + Test multiple database switches for the same UUID + + >>> cm.notify('db3', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending') + >>> job = cm._jobs_by_uuid.get('dup-uuid-1') + >>> job.db_name + 'db3' + + Create another job with different UUID in db1 and verify it coexists + + >>> cm.notify('db1', 'D', 'other-uuid', 2, 0, 10, None, 'pending') + >>> len(cm._jobs_by_uuid) + 2 + >>> cm._jobs_by_uuid.get('dup-uuid-1').db_name + 'db3' + >>> cm._jobs_by_uuid.get('other-uuid').db_name + 'db1' + + Both jobs should be available to run + + >>> jobs = list(cm.get_jobs_to_run(now=100)) + >>> len(jobs) + 2 + >>> sorted([j.uuid for j in jobs]) + ['dup-uuid-1', 'other-uuid'] """ def __init__(self): @@ -1029,22 +1085,31 @@ def notify( channel = self.get_channel_by_name(channel_name, parent_fallback=True) job = self._jobs_by_uuid.get(uuid) if job: - # db_name is invariant - assert job.db_name == db_name - # date_created is invariant - assert job.date_created == date_created - # if one of the job properties that influence - # scheduling order has changed, we remove the job - # from the queues and create a new job object - if ( - seq != job.seq - or priority != job.priority - or eta != job.eta - or channel != job.channel - ): - _logger.debug("job %s properties changed, rescheduling it", uuid) + # if db_name differs, this is likely a cloned database + if job.db_name != db_name: + _logger.warning( + "job %s exists in multiple databases (%s and %s). recreating it", + uuid, + job.db_name, + db_name, + ) self.remove_job(uuid) job = None + else: + # date_created is invariant + assert job.date_created == date_created + # if one of the job properties that influence + # scheduling order has changed, we remove the job + # from the queues and create a new job object + if ( + seq != job.seq + or priority != job.priority + or eta != job.eta + or channel != job.channel + ): + _logger.debug("job %s properties changed, rescheduling it", uuid) + self.remove_job(uuid) + job = None if not job: job = ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta) self._jobs_by_uuid[uuid] = job