-
-
Notifications
You must be signed in to change notification settings - Fork 738
Open
Labels
needs infoNeeds further information from the userNeeds further information from the user
Description
When using SpecCluster with grouped workers (e.g., JobQueueCluster
from dask-jobqueue with processes > 1, c.f. dask/dask-jobqueue#498), dead workers are not properly removed from self.workers or self.worker_spec. This causes the adaptive system to incorrectly believe workers are still "requested" when they have actually died, preventing scale-up.
Example
Apologies it's not a full reproducer
# Using SLURMCluster (subclass of JobQueueCluster -> SpecCluster)
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
cores=8,
processes=8, # Creates grouped workers
memory="32GB",
walltime="01:00:00"
)
cluster.adapt(minimum=0, maximum=10)
# When jobs die (timeout, killed, etc.):
# - cluster.observed drops to 0 (scheduler sees no workers)
# - cluster.requested stays high (dead jobs still in self.workers)
# - Adaptive won't scale up new workers
log from Adaptive
showing the issue
target=32 len(self.plan)=32 len(self.observed)=15 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=14 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=12 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=11 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=10 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=9 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=8 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=7 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=6 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=5 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=4 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=3 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=2 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=1 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=0 len(self.requested)=32
Context
Setup: JobQueueCluster creates grouped workers when processes > 1:
# In JobQueueCluster.__init__
if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1:
worker["group"] = ["-" + str(i) for i in range(self._job_kwargs["processes"])]
This creates a worker_spec entry like:
python"cluster-0": {
"cls": SLURMJob,
"options": {...},
"group": ["-0", "-1", "-2", "-3", "-4", "-5", "-6", "-7"]
}
The Bug: When workers die, _update_worker_status receives expanded names ("cluster-0-0") but tries to look them up in self.workers which uses job names ("cluster-0"):
# Current broken implementation
def _update_worker_status(self, op, msg):
if op == "remove":
name = self.scheduler_info["workers"][msg]["name"] # "cluster-0-0"
def f():
if name in self.workers: # self.workers has "cluster-0", not "cluster-0-0"
# This never executes for grouped workers!
del self.workers[name]
Result:
- self.workers keeps dead job objects
- self.worker_spec keeps their specifications
- adaptive doesn't scale up
guillaumeeb
Metadata
Metadata
Assignees
Labels
needs infoNeeds further information from the userNeeds further information from the user