Skip to content

Conversation

@fcollman
Copy link
Collaborator

@fcollman fcollman commented Nov 26, 2025

Note

Adds configurable Celery worker auto-shutdown (by task count and idle timeout), hardens beat schedule registration, and introduces explicit database/cache shutdown routines.

  • Celery Worker:
    • Adds auto-shutdown after worker_autoshutdown_max_tasks with optional delay and consumer cancellation; forces process exit if needed.
    • Adds idle-timeout shutdown via background monitor started on worker_ready.
    • New config flags in config.py: CELERY_WORKER_AUTOSHUTDOWN_ENABLED, CELERY_WORKER_AUTOSHUTDOWN_MAX_TASKS, CELERY_WORKER_AUTOSHUTDOWN_DELAY_SECONDS.
  • Periodic Tasks (Beat):
    • Avoids duplicate registration of cleanup and configured tasks; normalizes beat_schedules/BEAT_SCHEDULES.
    • Explicitly triggers setup_periodic_tasks in create_celery after config load; expanded debug logging.
  • Database Management:
    • DatabaseConnectionManager.shutdown() to close sessions, dispose engines, and clear caches; cleanup() delegates to it.
    • DynamicMaterializationCache.shutdown() to close sessions, dispose engines, and clear clients.
  • Task Utilities:
    • check_if_task_is_running made robust to no/filtered workers and aggregates across matching workers.
  • Workflows:
    • remove_expired_databases: logs databases slated for deletion and refines removal checks.

Written by Cursor Bugbot for commit 7ce830b. This will update automatically on new commits. Configure here.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a worker auto-shutdown mechanism for Celery workers, allowing them to gracefully terminate after processing a configurable number of tasks. This is useful for environments like Kubernetes where worker pods need to be recycled periodically to prevent memory leaks or apply configuration updates.

Key Changes

  • Added three new configuration options to control worker auto-shutdown behavior (enabled flag, max tasks, and delay)
  • Implemented a task post-run signal handler that tracks task execution count and triggers shutdown when the threshold is reached
  • Integrated shutdown configuration into the Celery app creation process

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
materializationengine/config.py Adds three new configuration variables for controlling worker auto-shutdown
materializationengine/celery_worker.py Implements the auto-shutdown mechanism with task counting, signal handling, and graceful termination logic

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

fcollman and others added 3 commits December 10, 2025 23:27
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the final PR Bugbot will review for you during this billing cycle

Your free Bugbot reviews will reset on January 27

Details

You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

observed_count,
)
try:
os.kill(os.getpid(), signal.SIGTERM)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: SIGTERM targets child process, not main worker with prefork

The auto-shutdown feature intends to terminate the worker after a configurable number of tasks, but with Celery's prefork pool (which this project uses), the task_postrun signal fires in the child process, not the main worker process. Calling os.kill(os.getpid(), signal.SIGTERM) from that context terminates only the child process, which the main worker will simply replace with a new one. The worker continues running indefinitely, defeating the feature's purpose. To properly shut down the entire worker, the code would need to signal the parent/main process rather than the current child process.

Fix in Cursor Fix in Web



_task_execution_count = 0
_shutdown_requested = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Task count is per-child, not per-worker with prefork

The module-level globals _task_execution_count and _shutdown_requested are process-local. With Celery's prefork pool (used in this project with concurrency up to 4), each child process maintains its own independent counter due to fork semantics. This means if worker_autoshutdown_max_tasks is set to 10 and concurrency is 4, each child would need to run 10 tasks individually before triggering shutdown - potentially 40 total tasks before any action, rather than the expected 10. The counting doesn't aggregate across worker children as the configuration name suggests.

Additional Locations (1)

Fix in Cursor Fix in Web

if active_workers:
worker_hostname = list(active_workers.keys())[0]
except Exception:
pass
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect worker hostname attribute access on task sender

The sender parameter in task_postrun is the Task instance, which doesn't have a hostname attribute directly. The worker hostname is available at sender.request.hostname, not sender.hostname. The hasattr(sender, 'hostname') check will return False, always falling through to the fallback logic which calls inspect.active(). This fallback makes a network call to query all workers and picks the first one from the dictionary, which may not be the current worker, potentially canceling the consumer on a different worker.

Fix in Cursor Fix in Web

# This prevents the worker from picking up new tasks during the shutdown delay
try:
# Get queue name from config or use default
queue_name = celery.conf.get("task_default_queue") or celery.conf.get("task_routes", {}).get("*", {}).get("queue", "celery")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Queue name fallback logic fails with string task_routes

The fallback logic for determining queue_name assumes task_routes is a dict with pattern matching keys, but in this codebase task_routes is set to a string "materializationengine.task_router.TaskRouter" at line 167. If task_default_queue is not set or returns a falsy value, the expression celery.conf.get("task_routes", {}).get("*", {}) will raise AttributeError because strings don't have a .get() method. While this is caught by the outer try-except block, it causes the consumer cancellation feature to silently fail, allowing the worker to potentially accept new tasks during the shutdown delay period.

Fix in Cursor Fix in Web

inspect = current_app.control.inspect()
active_workers = inspect.active() if inspect else {}
if active_workers:
worker_hostname = list(active_workers.keys())[0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Consumer cancellation may target wrong worker in cluster

The code uses inspect.active() which returns all active workers across the entire Celery cluster, then selects the first worker with list(active_workers.keys())[0]. In a multi-worker environment, this could pick an arbitrary worker's hostname instead of the current worker, causing cancel_consumer to stop task consumption on a different worker than the one that reached its task limit or idle timeout.

Additional Locations (1)

Fix in Cursor Fix in Web

return

_task_execution_count += 1
_last_task_time = time.time() # Update last task time
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Idle timeout doesn't track task completions when autoshutdown disabled

The _last_task_time variable is only updated at line 64 inside _auto_shutdown_handler, but this code path requires worker_autoshutdown_enabled to be True (lines 52-53). If a user enables only idle timeout without enabling autoshutdown, _last_task_time is never updated and remains None. The _monitor_idle_timeout function then falls back to using _worker_start_time, causing the worker to shut down after the configured timeout from startup—regardless of how many tasks are actively being processed.

Additional Locations (1)

Fix in Cursor Fix in Web

# Return the result ID - the chain will execute asynchronously
# The worker will track all tasks in the chain
celery_logger.info(f"Workflow chain started with root task ID: {result.id}")
return True
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Locked workflow returns before chain finishes

update_database_workflow (a LockedTask) now launches run_update_database_workflow.apply_async() and immediately returns True, so the lock is released while the long-running chain still executes. This can allow multiple overlapping “update database” runs, causing concurrent writes and duplicated work that the lock previously prevented.

Fix in Cursor Fix in Web


if not annotation_chunks:
celery_logger.info("No annotation chunks to process")
return fin.si()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Generator emptiness check always evaluates as truthy

The check if not annotation_chunks will never be True because generate_chunked_model_ids returns a generator (via chunk_ids which uses yield). In Python, generator objects are always truthy, even when they would yield zero items. This means the early return path for "no annotation chunks to process" is dead code and will never execute. The same issue affects update_root_ids_workflow when lookup_all_root_ids is true. To properly check for empty generators, the generator would need to be converted to a list first or use a peek pattern.

Additional Locations (1)

Fix in Cursor Fix in Web


if not annotation_chunks:
celery_logger.info("No annotation chunks to process")
return fin.si()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Generator emptiness check always evaluates as truthy

The check if not annotation_chunks will never be True because generate_chunked_model_ids returns a generator (via chunk_ids which uses yield). In Python, generator objects are always truthy, even when they would yield zero items. This means the early return path for "no annotation chunks to process" is dead code and will never execute. The same issue affects update_root_ids_workflow when lookup_all_root_ids is true. To properly check for empty generators, the generator would need to be converted to a list first or use a peek pattern.

Additional Locations (1)

Fix in Cursor Fix in Web

if _task_execution_count < max_tasks:
return

_shutdown_requested = True
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition in global shutdown state without synchronization

The global variables _task_execution_count and _shutdown_requested are accessed and modified without synchronization (e.g., a threading lock). When using Celery with eventlet, gevent, or threads pools where multiple tasks execute concurrently, two tasks could both pass the if _shutdown_requested: check before either sets it to True. This could cause multiple shutdown threads to be spawned and _task_execution_count to be incremented incorrectly. While the worker still shuts down, the logged task count may be wrong and duplicate shutdown attempts occur.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants