Skip to content

SpecCluster fails to remove grouped workers when they die, breaking adaptive scaling #9102

@alisterburt

Description

@alisterburt

When using SpecCluster with grouped workers (e.g., JobQueueCluster from dask-jobqueue with processes > 1, c.f. dask/dask-jobqueue#498), dead workers are not properly removed from self.workers or self.worker_spec. This causes the adaptive system to incorrectly believe workers are still "requested" when they have actually died, preventing scale-up.

Example

Apologies it's not a full reproducer

# Using SLURMCluster (subclass of JobQueueCluster -> SpecCluster)
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    cores=8,
    processes=8,  # Creates grouped workers
    memory="32GB",
    walltime="01:00:00"
)
cluster.adapt(minimum=0, maximum=10)

# When jobs die (timeout, killed, etc.):
# - cluster.observed drops to 0 (scheduler sees no workers)
# - cluster.requested stays high (dead jobs still in self.workers)
# - Adaptive won't scale up new workers

log from Adaptive showing the issue

target=32 len(self.plan)=32 len(self.observed)=15 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=14 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=12 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=11 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=10 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=9 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=8 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=7 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=6 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=5 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=4 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=3 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=2 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=1 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=0 len(self.requested)=32

Context

Setup: JobQueueCluster creates grouped workers when processes > 1:

# In JobQueueCluster.__init__
if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1:
    worker["group"] = ["-" + str(i) for i in range(self._job_kwargs["processes"])]

This creates a worker_spec entry like:

python"cluster-0": {
    "cls": SLURMJob, 
    "options": {...},
    "group": ["-0", "-1", "-2", "-3", "-4", "-5", "-6", "-7"]
}

The Bug: When workers die, _update_worker_status receives expanded names ("cluster-0-0") but tries to look them up in self.workers which uses job names ("cluster-0"):

# Current broken implementation
def _update_worker_status(self, op, msg):
    if op == "remove":
        name = self.scheduler_info["workers"][msg]["name"]  # "cluster-0-0"
        def f():
            if name in self.workers:  # self.workers has "cluster-0", not "cluster-0-0"
                # This never executes for grouped workers!
                del self.workers[name]

Result:

  • self.workers keeps dead job objects
  • self.worker_spec keeps their specifications
  • adaptive doesn't scale up

Metadata

Metadata

Assignees

No one assigned

    Labels

    needs infoNeeds further information from the user

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions