2828from _tilebox .grpc .error import InternalServerError
2929from tilebox .datasets .sync .dataset import DatasetClient
3030from tilebox .workflows .cache import JobCache
31- from tilebox .workflows .data import ComputedTask , NextTaskToRun , Task , TaskLease
31+ from tilebox .workflows .data import ComputedTask , Idling , NextTaskToRun , Task , TaskLease
3232from tilebox .workflows .interceptors import Interceptor , InterceptorType
3333from tilebox .workflows .observability .logging import get_logger
3434from tilebox .workflows .observability .tracing import WorkflowTracer
3737from tilebox .workflows .task import FutureTask , RunnerContext , TaskMeta
3838from tilebox .workflows .task import Task as TaskInstance
3939
40- # In seconds
40+ # The time we give a task to finish it's execution when a runner shutdown is requested before we forcefully stop it
4141_SHUTDOWN_GRACE_PERIOD = timedelta (seconds = 2 )
42- _POLL_INTERVAL = timedelta ( seconds = 5 )
43- _JITTER_INTERVAL = timedelta ( seconds = 5 )
42+
43+ # Retry configuration for retrying failed requests to the workflows API
4444_INITIAL_RETRY_BACKOFF = timedelta (seconds = 5 )
4545_MAX_RETRY_BACKOFF = timedelta (hours = 1 ) # 1 hour
4646
47+ # A maximum idling duration, as a safeguard to avoid way too long sleep times in case the suggested idling duration is
48+ # ever too long. 5 minutes should be plenty of time to wait.
49+ _MAX_IDLING_DURATION = timedelta (minutes = 5 )
50+ # A minimum idling duration, as a safeguard to avoid too short sleep times in case the suggested idling duration is
51+ # ever too short.
52+ _MIN_IDLING_DURATION = timedelta (milliseconds = 1 )
53+
54+ # Fallback polling interval and jitter in case the workflows API fails to respond with a suggested idling duration
55+ _FALLBACK_POLL_INTERVAL = timedelta (seconds = 5 )
56+ _FALLBACK_JITTER_INTERVAL = timedelta (seconds = 5 )
57+
4758WrappedFnReturnT = TypeVar ("WrappedFnReturnT" )
4859
4960
@@ -96,14 +107,14 @@ def _extend_lease_while_task_is_running(
96107
97108 break
98109
99- logger .info (f"Extending task lease for { task_id = } , { task_lease = } " )
110+ logger .debug (f"Extending task lease for { task_id = } , { task_lease = } " )
100111 try :
101112 # The first time we call the function, we pass the argument we received
102113 # After that, we call it with the result of the previous call
103114 task_lease = service .extend_task_lease (task_id , 2 * task_lease .lease )
104115 if task_lease .lease == 0 :
105116 # The server did not return a lease extension, it means that there is no need in trying to extend the lease
106- logger .info (f"task lease extension not granted for task { task_id } " )
117+ logger .debug (f"task lease extension not granted for task { task_id } " )
107118 # even though we failed to extend the lease, let's still wait till the task is done
108119 # otherwise we might end up with a mismatch between the task currently being executed and the task
109120 # that we extend leases for (and the runner can anyways only execute one task at a time)
@@ -331,41 +342,59 @@ def run_all(self) -> None:
331342 """
332343 self ._run (stop_when_idling = True )
333344
334- def _run (self , stop_when_idling : bool = True ) -> None :
345+ def _run (self , stop_when_idling : bool = True ) -> None : # noqa: C901
335346 """
336347 Run the task runner forever. This will poll for new tasks and execute them as they come in.
337348 If no tasks are available, it will sleep for a short time and then try again.
338349 """
339- task : Task | None = None
350+ work : Task | Idling | None = None
340351
341352 # capture interrupt signals and delay them by a grace period in order to shut down gracefully
342353 with _GracefulShutdown (_SHUTDOWN_GRACE_PERIOD , self ._service ) as shutdown_context :
343354 while True :
344- if task is None : # if we don't have a task right now, let's try to work-steal one
345- if shutdown_context .is_shutting_down ():
355+ if not isinstance ( work , Task ) : # if we don't have a task right now, let's try to work-steal one
356+ if shutdown_context .is_shutting_down (): # unless we received an interrupt, then we shut down
346357 return
347358 try :
348- task = self ._service .next_task (task_to_run = self .tasks_to_run , computed_task = None )
359+ work = self ._service .next_task (task_to_run = self .tasks_to_run , computed_task = None )
349360 except InternalServerError as e :
350361 # We do not need to retry here, since the task runner will sleep for a while and then anyways request this again.
351362 self .logger .error (f"Failed to get next task with error { e } " )
352363
353- if task is not None : # we have a task to execute
364+ if isinstance (work , Task ): # we received a task to execute
365+ task = work
354366 if task .retry_count > 0 :
355367 self .logger .debug (f"Retrying task { task .id } that failed { task .retry_count } times" )
356- task = self ._execute (task , shutdown_context ) # submitting the task gives us the next one
357- else : # if we didn't get a task, let's sleep for a bit and try work-stealing again
358- self .logger .debug ("No task to run" )
368+ work = self ._execute (task , shutdown_context ) # submitting the task gives us the next work item
369+ elif isinstance ( work , Idling ) : # we received an idling response, so let's sleep for a bit
370+ self .logger .debug ("No task to run, idling " )
359371 if stop_when_idling : # if stop_when_idling is set, we can just return
360372 return
373+
361374 # now sleep for a bit and then try again, unless we receive an interrupt
362- shutdown_context .sleep (
363- _POLL_INTERVAL .total_seconds () + random .uniform (0 , _JITTER_INTERVAL .total_seconds ()) # noqa: S311
364- )
375+ idling_duration = work .suggested_idling_duration
376+ idling_duration = min (idling_duration , _MAX_IDLING_DURATION )
377+ idling_duration = max (idling_duration , _MIN_IDLING_DURATION )
378+ shutdown_context .sleep (idling_duration .total_seconds ())
365379 if shutdown_context .is_shutting_down ():
366380 return
381+ else : # work is None
382+ # we didn't receive an idling response, but also not a task. This only happens if we didn't request
383+ # a task to run, indicating that we are shutting down.
384+ if shutdown_context .is_shutting_down ():
385+ return
386+
387+ fallback_interval = _FALLBACK_POLL_INTERVAL .total_seconds () + random .uniform ( # noqa: S311
388+ 0 , _FALLBACK_JITTER_INTERVAL .total_seconds ()
389+ )
390+ self .logger .debug (
391+ f"Didn't receive a task to run, nor an idling response, but runner is not shutting down. "
392+ f"Falling back to a default idling period of { fallback_interval :.2f} s"
393+ )
394+
395+ shutdown_context .sleep (fallback_interval )
367396
368- def _execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | None :
397+ def _execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | Idling | None :
369398 try :
370399 return self ._try_execute (task , shutdown_context )
371400 except Exception as e :
@@ -380,7 +409,7 @@ def _execute(self, task: Task, shutdown_context: _GracefulShutdown) -> Task | No
380409 task_failed_retry (task , e )
381410 return None
382411
383- def _try_execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | None :
412+ def _try_execute (self , task : Task , shutdown_context : _GracefulShutdown ) -> Task | Idling | None :
384413 if task .job is None :
385414 raise ValueError (f"Task { task .id } has no job associated with it." )
386415
0 commit comments