@@ -117,7 +117,6 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
117117 logger .exception (f"Job { self .name } : error while executing failure callback" )
118118 raise
119119
120-
121120 else :
122121 logger .warning (
123122 f"{ self .__class__ .__name__ } cleanup: Moving job to { self .failed_job_registry .key } "
@@ -189,24 +188,24 @@ def get_all_jobs(self) -> List[JobModel]:
189188 return JobModel .get_many (job_names , connection = self .connection )
190189
191190 def create_and_enqueue_job (
192- self ,
193- func : FunctionReferenceType ,
194- args : Union [Tuple , List , None ] = None ,
195- kwargs : Optional [Dict ] = None ,
196- timeout : Optional [int ] = None ,
197- result_ttl : Optional [int ] = None ,
198- job_info_ttl : Optional [int ] = None ,
199- description : Optional [str ] = None ,
200- name : Optional [str ] = None ,
201- at_front : bool = False ,
202- meta : Optional [Dict ] = None ,
203- on_success : Optional [Callback ] = None ,
204- on_failure : Optional [Callback ] = None ,
205- on_stopped : Optional [Callback ] = None ,
206- task_type : Optional [str ] = None ,
207- scheduled_task_id : Optional [int ] = None ,
208- when : Optional [datetime ] = None ,
209- pipeline : Optional [ConnectionType ] = None ,
191+ self ,
192+ func : FunctionReferenceType ,
193+ args : Union [Tuple , List , None ] = None ,
194+ kwargs : Optional [Dict ] = None ,
195+ timeout : Optional [int ] = None ,
196+ result_ttl : Optional [int ] = None ,
197+ job_info_ttl : Optional [int ] = None ,
198+ description : Optional [str ] = None ,
199+ name : Optional [str ] = None ,
200+ at_front : bool = False ,
201+ meta : Optional [Dict ] = None ,
202+ on_success : Optional [Callback ] = None ,
203+ on_failure : Optional [Callback ] = None ,
204+ on_stopped : Optional [Callback ] = None ,
205+ task_type : Optional [str ] = None ,
206+ scheduled_task_id : Optional [int ] = None ,
207+ when : Optional [datetime ] = None ,
208+ pipeline : Optional [ConnectionType ] = None ,
210209 ) -> JobModel :
211210 """Creates a job to represent the delayed function call and enqueues it.
212211 :param when: When to schedule the job (None to enqueue immediately)
@@ -258,23 +257,37 @@ def create_and_enqueue_job(
258257 def job_handle_success (self , job : JobModel , result : Any , result_ttl : int , connection : ConnectionType ):
259258 """Saves and cleanup job after successful execution"""
260259 job .after_execution (
261- result_ttl , JobStatus .FINISHED ,
260+ result_ttl ,
261+ JobStatus .FINISHED ,
262262 prev_registry = self .active_job_registry ,
263- new_registry = self .finished_job_registry , connection = connection )
264- Result .create (connection , job_name = job .name , worker_name = job .worker_name , _type = ResultType .SUCCESSFUL ,
265- return_value = result , ttl = result_ttl )
263+ new_registry = self .finished_job_registry ,
264+ connection = connection ,
265+ )
266+ Result .create (
267+ connection ,
268+ job_name = job .name ,
269+ worker_name = job .worker_name ,
270+ _type = ResultType .SUCCESSFUL ,
271+ return_value = result ,
272+ ttl = result_ttl ,
273+ )
266274
267275 def job_handle_failure (self , status : JobStatus , job : JobModel , exc_string : str , connection : ConnectionType ):
268276 # Does not set job status since the job might be stopped
269277 job .after_execution (
270- SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL , status ,
278+ SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
279+ status ,
271280 prev_registry = self .active_job_registry ,
272281 new_registry = self .failed_job_registry ,
273- connection = connection )
282+ connection = connection ,
283+ )
274284 Result .create (
275- connection , job .name , job .worker_name ,
276- ResultType .FAILED , SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
277- exc_string = exc_string
285+ connection ,
286+ job .name ,
287+ job .worker_name ,
288+ ResultType .FAILED ,
289+ SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
290+ exc_string = exc_string ,
278291 )
279292
280293 def run_job (self , job : JobModel ) -> JobModel :
@@ -299,7 +312,7 @@ def run_job(self, job: JobModel) -> JobModel:
299312 return job
300313
301314 def enqueue_job (
302- self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
315+ self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
303316 ) -> JobModel :
304317 """Enqueues a job for delayed execution without checking dependencies.
305318
@@ -350,10 +363,10 @@ def run_sync(self, job: JobModel) -> JobModel:
350363
351364 @classmethod
352365 def dequeue_any (
353- cls ,
354- queues : List [Self ],
355- timeout : Optional [int ],
356- connection : Optional [ConnectionType ] = None ,
366+ cls ,
367+ queues : List [Self ],
368+ timeout : Optional [int ],
369+ connection : Optional [ConnectionType ] = None ,
357370 ) -> Tuple [Optional [JobModel ], Optional [Self ]]:
358371 """Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
359372 is important.
@@ -439,8 +452,9 @@ def cancel_job(self, job_name: str) -> None:
439452 if new_status == JobStatus .CANCELED :
440453 self .canceled_job_registry .add (pipe , job_name , 0 )
441454 else :
442- self .finished_job_registry .add (pipe , job_name ,
443- current_timestamp () + SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL )
455+ self .finished_job_registry .add (
456+ pipe , job_name , current_timestamp () + SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL
457+ )
444458 pipe .execute ()
445459 break
446460 except WatchError :
0 commit comments