-
Notifications
You must be signed in to change notification settings - Fork 3
add worker shutdown options #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a worker auto-shutdown mechanism for Celery workers, allowing them to gracefully terminate after processing a configurable number of tasks. This is useful for environments like Kubernetes where worker pods need to be recycled periodically to prevent memory leaks or apply configuration updates.
Key Changes
- Added three new configuration options to control worker auto-shutdown behavior (enabled flag, max tasks, and delay)
- Implemented a task post-run signal handler that tracks task execution count and triggers shutdown when the threshold is reached
- Integrated shutdown configuration into the Celery app creation process
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| materializationengine/config.py | Adds three new configuration variables for controlling worker auto-shutdown |
| materializationengine/celery_worker.py | Implements the auto-shutdown mechanism with task counting, signal handling, and graceful termination logic |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
4521543 to
2f35817
Compare
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2f35817 to
d533b40
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the final PR Bugbot will review for you during this billing cycle
Your free Bugbot reviews will reset on January 27
Details
You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
| observed_count, | ||
| ) | ||
| try: | ||
| os.kill(os.getpid(), signal.SIGTERM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: SIGTERM targets child process, not main worker with prefork
The auto-shutdown feature intends to terminate the worker after a configurable number of tasks, but with Celery's prefork pool (which this project uses), the task_postrun signal fires in the child process, not the main worker process. Calling os.kill(os.getpid(), signal.SIGTERM) from that context terminates only the child process, which the main worker will simply replace with a new one. The worker continues running indefinitely, defeating the feature's purpose. To properly shut down the entire worker, the code would need to signal the parent/main process rather than the current child process.
|
|
||
|
|
||
| _task_execution_count = 0 | ||
| _shutdown_requested = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Task count is per-child, not per-worker with prefork
The module-level globals _task_execution_count and _shutdown_requested are process-local. With Celery's prefork pool (used in this project with concurrency up to 4), each child process maintains its own independent counter due to fork semantics. This means if worker_autoshutdown_max_tasks is set to 10 and concurrency is 4, each child would need to run 10 tasks individually before triggering shutdown - potentially 40 total tasks before any action, rather than the expected 10. The counting doesn't aggregate across worker children as the configuration name suggests.
Additional Locations (1)
| if active_workers: | ||
| worker_hostname = list(active_workers.keys())[0] | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Incorrect worker hostname attribute access on task sender
The sender parameter in task_postrun is the Task instance, which doesn't have a hostname attribute directly. The worker hostname is available at sender.request.hostname, not sender.hostname. The hasattr(sender, 'hostname') check will return False, always falling through to the fallback logic which calls inspect.active(). This fallback makes a network call to query all workers and picks the first one from the dictionary, which may not be the current worker, potentially canceling the consumer on a different worker.
| # This prevents the worker from picking up new tasks during the shutdown delay | ||
| try: | ||
| # Get queue name from config or use default | ||
| queue_name = celery.conf.get("task_default_queue") or celery.conf.get("task_routes", {}).get("*", {}).get("queue", "celery") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Queue name fallback logic fails with string task_routes
The fallback logic for determining queue_name assumes task_routes is a dict with pattern matching keys, but in this codebase task_routes is set to a string "materializationengine.task_router.TaskRouter" at line 167. If task_default_queue is not set or returns a falsy value, the expression celery.conf.get("task_routes", {}).get("*", {}) will raise AttributeError because strings don't have a .get() method. While this is caught by the outer try-except block, it causes the consumer cancellation feature to silently fail, allowing the worker to potentially accept new tasks during the shutdown delay period.
| inspect = current_app.control.inspect() | ||
| active_workers = inspect.active() if inspect else {} | ||
| if active_workers: | ||
| worker_hostname = list(active_workers.keys())[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Consumer cancellation may target wrong worker in cluster
The code uses inspect.active() which returns all active workers across the entire Celery cluster, then selects the first worker with list(active_workers.keys())[0]. In a multi-worker environment, this could pick an arbitrary worker's hostname instead of the current worker, causing cancel_consumer to stop task consumption on a different worker than the one that reached its task limit or idle timeout.
Additional Locations (1)
| return | ||
|
|
||
| _task_execution_count += 1 | ||
| _last_task_time = time.time() # Update last task time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Idle timeout doesn't track task completions when autoshutdown disabled
The _last_task_time variable is only updated at line 64 inside _auto_shutdown_handler, but this code path requires worker_autoshutdown_enabled to be True (lines 52-53). If a user enables only idle timeout without enabling autoshutdown, _last_task_time is never updated and remains None. The _monitor_idle_timeout function then falls back to using _worker_start_time, causing the worker to shut down after the configured timeout from startup—regardless of how many tasks are actively being processed.
Additional Locations (1)
| # Return the result ID - the chain will execute asynchronously | ||
| # The worker will track all tasks in the chain | ||
| celery_logger.info(f"Workflow chain started with root task ID: {result.id}") | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Locked workflow returns before chain finishes
update_database_workflow (a LockedTask) now launches run_update_database_workflow.apply_async() and immediately returns True, so the lock is released while the long-running chain still executes. This can allow multiple overlapping “update database” runs, causing concurrent writes and duplicated work that the lock previously prevented.
|
|
||
| if not annotation_chunks: | ||
| celery_logger.info("No annotation chunks to process") | ||
| return fin.si() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Generator emptiness check always evaluates as truthy
The check if not annotation_chunks will never be True because generate_chunked_model_ids returns a generator (via chunk_ids which uses yield). In Python, generator objects are always truthy, even when they would yield zero items. This means the early return path for "no annotation chunks to process" is dead code and will never execute. The same issue affects update_root_ids_workflow when lookup_all_root_ids is true. To properly check for empty generators, the generator would need to be converted to a list first or use a peek pattern.
Additional Locations (1)
|
|
||
| if not annotation_chunks: | ||
| celery_logger.info("No annotation chunks to process") | ||
| return fin.si() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Generator emptiness check always evaluates as truthy
The check if not annotation_chunks will never be True because generate_chunked_model_ids returns a generator (via chunk_ids which uses yield). In Python, generator objects are always truthy, even when they would yield zero items. This means the early return path for "no annotation chunks to process" is dead code and will never execute. The same issue affects update_root_ids_workflow when lookup_all_root_ids is true. To properly check for empty generators, the generator would need to be converted to a list first or use a peek pattern.
Additional Locations (1)
| if _task_execution_count < max_tasks: | ||
| return | ||
|
|
||
| _shutdown_requested = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Race condition in global shutdown state without synchronization
The global variables _task_execution_count and _shutdown_requested are accessed and modified without synchronization (e.g., a threading lock). When using Celery with eventlet, gevent, or threads pools where multiple tasks execute concurrently, two tasks could both pass the if _shutdown_requested: check before either sets it to True. This could cause multiple shutdown threads to be spawned and _task_execution_count to be incremented incorrectly. While the worker still shuts down, the logged task count may be wrong and duplicate shutdown attempts occur.
Note
Adds configurable Celery worker auto-shutdown (by task count and idle timeout), hardens beat schedule registration, and introduces explicit database/cache shutdown routines.
worker_autoshutdown_max_taskswith optional delay and consumer cancellation; forces process exit if needed.worker_ready.config.py:CELERY_WORKER_AUTOSHUTDOWN_ENABLED,CELERY_WORKER_AUTOSHUTDOWN_MAX_TASKS,CELERY_WORKER_AUTOSHUTDOWN_DELAY_SECONDS.beat_schedules/BEAT_SCHEDULES.setup_periodic_tasksincreate_celeryafter config load; expanded debug logging.DatabaseConnectionManager.shutdown()to close sessions, dispose engines, and clear caches;cleanup()delegates to it.DynamicMaterializationCache.shutdown()to close sessions, dispose engines, and clear clients.check_if_task_is_runningmade robust to no/filtered workers and aggregates across matching workers.remove_expired_databases: logs databases slated for deletion and refines removal checks.Written by Cursor Bugbot for commit 7ce830b. This will update automatically on new commits. Configure here.