-
-
Notifications
You must be signed in to change notification settings - Fork 738
Description
Describe the issue:
Dask scheduler incorrectly allows resource over-allocation when scheduling tasks with custom resource requirements. The scheduler's valid_workers()
method only checks if a worker's total declared resources meet the task requirements, but ignores the worker's currently used resources. This leads to severe resource over-allocation (up to 450% in our tests) and inconsistent resource accounting between scheduler and worker.
The worker layer correctly enforces resource limits, but this creates a discrepancy where the scheduler reports tasks as "processing" while they're actually queued due to resource constraints.
Minimal Complete Verifiable Example:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import dask
from distributed import LocalCluster, Client
def simple_task(task_id, duration=15):
"""Simple task that requires MEM resources"""
print(f"Task {task_id} starting...")
time.sleep(duration)
print(f"Task {task_id} completed!")
return f"Result from task {task_id}"
def test_resource_bug():
print("=" * 60)
print("DASK RESOURCE SCHEDULING BUG REPRODUCTION")
print("=" * 60)
print(f"Dask version: {dask.__version__}")
# Create a local cluster with one worker having limited MEM resources
cluster = LocalCluster(
n_workers=1,
threads_per_worker=20, # High thread count to avoid thread limit interference
memory_limit=None, # Disable memory limit to focus on custom resources
resources={"MEM": 10}, # Worker declares 10 MEM resources
dashboard_address=None,
silence_logs=False
)
client = Client(cluster)
try:
print(f"\nCluster Info:")
print(f"Workers: {len(client.scheduler_info()['workers'])}")
# Helper function to get resource info from scheduler
def get_worker_resources(dask_scheduler):
scheduler = dask_scheduler
worker_info = {}
for addr, worker in scheduler.workers.items():
worker_info[addr] = {
'declared_resources': dict(worker.resources),
'used_resources': dict(worker.used_resources),
'processing_count': len(worker.processing)
}
return worker_info
# Check initial resources
print(f"\nInitial Worker Resources:")
initial_resources = client.run_on_scheduler(get_worker_resources)
for addr, info in initial_resources.items():
print(f" Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
f"used_MEM={info['used_resources'].get('MEM', 0)}, "
f"tasks={info['processing_count']}")
print("\nSubmitting 15 tasks, each requiring 3 MEM resources...")
print(" Worker declares: 10 MEM")
print(" Total requirement: 15 * 3 = 45 MEM")
print(" Expected (strict): Only 3 tasks should run (3*3=9 <= 10 MEM)")
print(" Let's see how many actually run simultaneously...")
print(" Worker has 20 threads, so thread count is NOT a limiting factor")
# Submit 15 tasks, each requiring 3 MEM resources
# Total requirement: 15 * 3 = 45 MEM
# Available: 10 MEM
# Let's see how many actually run
futures = []
for i in range(1, 16):
future = client.submit(
simple_task,
task_id=i,
duration=15, # Long enough to observe resource allocation
resources={'MEM': 3},
pure=False
)
futures.append(future)
print(f" Submitted task {i} (requires 3 MEM)")
# Wait for tasks to start executing
print("\nWaiting for tasks to start...")
# Wait until at least some tasks are running
max_wait = 10
wait_time = 0
while wait_time < max_wait:
current_check = client.run_on_scheduler(get_worker_resources)
running_tasks = sum(info['processing_count'] for info in current_check.values())
if running_tasks > 0:
print(f"Tasks started! {running_tasks} tasks now running.")
break
time.sleep(0.5)
wait_time += 0.5
if wait_time >= max_wait:
print("Warning: Tasks may not have started yet")
# Check resource usage after submission
print(f"\nResource Usage After Task Submission:")
current_resources = client.run_on_scheduler(get_worker_resources)
for addr, info in current_resources.items():
declared = info['declared_resources'].get('MEM', 0)
used = info['used_resources'].get('MEM', 0)
tasks = info['processing_count']
overallocation = (used / declared * 100) if declared > 0 else 0
print(f" Worker: declared_MEM={declared}, used_MEM={used}, processing_tasks={tasks}")
print(f" Resource utilization: {overallocation:.1f}%")
# Now with 20 threads, we should see all 15 tasks actually running if there's no resource limit
print(f" NOTE: Worker has 20 threads, so thread limit should NOT be a constraint")
if used > declared:
print(f" *** RESOURCE ACCOUNTING BUG DETECTED!")
print(f" Scheduler reserved {used} MEM for {tasks} tasks")
print(f" but worker only declared {declared} MEM resources!")
print(f" This shows scheduler doesn't check available resources!")
print(f" Overallocation: {used - declared} MEM over limit")
elif tasks < 15:
print(f" PARTIAL SCHEDULING: {tasks} tasks running, {15-tasks} waiting")
else:
print(f" FULL OVERCOMMIT: All {tasks} tasks scheduled!")
# Count how many tasks are actually running by checking logs
# We'll compare this with scheduler's processing count
# Check task statuses
print(f"\nTask Statuses:")
for i, future in enumerate(futures, 1):
print(f" Task {i}: {future.status}")
# Cancel tasks to complete demonstration
print(f"\nCancelling tasks to complete demonstration...")
for future in futures:
future.cancel()
time.sleep(2) # Wait for cancellation to complete
# Final resource check
print(f"\nFinal Resource Usage (after cancellation):")
final_resources = client.run_on_scheduler(get_worker_resources)
for addr, info in final_resources.items():
print(f" Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
f"used_MEM={info['used_resources'].get('MEM', 0)}, "
f"tasks={info['processing_count']}")
finally:
print(f"\nClosing cluster...")
client.close()
cluster.close()
print(f"\n" + "=" * 60)
print("BUG REPORT SUMMARY:")
print("- Issue: Dask scheduler doesn't check current resource usage")
print("- Location: distributed/scheduler.py, valid_workers() method")
print("- Line ~3279-3281: if supplied >= required:")
print("- Fix needed: if (supplied - used_resources) >= required:")
print("- Version affected: dask==2025.5.1 (possibly others)")
print("=" * 60)
if __name__ == "__main__":
test_resource_bug()
Expected Output (showing the bug):
Resource Usage After Task Submission:
Worker: declared_MEM=10, used_MEM=45, processing_tasks=15
Resource utilization: 450.0%
*** RESOURCE ACCOUNTING BUG DETECTED!
Scheduler reserved 45 MEM for 15 tasks
but worker only declared 10 MEM resources!
Overallocation: 35 MEM over limit
# But in the logs, only 3 tasks actually start:
Task 1 starting...
Task 2 starting...
Task 3 starting...
Root Cause Analysis:
This is a dual-layer architecture problem with inconsistent resource management between scheduler and worker:
1. Scheduler Layer Bug (distributed/scheduler.py:3279-3281)
The valid_workers()
method in the scheduler has a critical flaw:
# Current buggy code
for addr, supplied in dr.items():
if supplied >= required: # 🚨 Only checks total declared, not available
sw.add(addr)
Should be fixed to:
for addr, supplied in dr.items():
ws = self.workers[addr]
available = supplied - ws.used_resources.get(resource, 0)
if available >= required:
sw.add(addr)
2. Worker Layer (Working Correctly)
The worker correctly enforces resource limits via distributed/worker_state_machine.py
:
def _resource_restrictions_satisfied(self, ts: TaskState) -> bool:
if not ts.resource_restrictions:
return True
return all(
self.available_resources[resource] >= needed
for resource, needed in ts.resource_restrictions.items()
)
3. The Discrepancy
- Scheduler says: 15 tasks processing, 45 MEM used (450% over-allocation)
- Worker reality: Only 3 tasks executing, 9 MEM actually used
- Result: Resource accounting is completely unreliable
Anything else we need to know?:
This bug has significant implications:
- Resource accounting is unreliable:
used_resources
shows false data - Monitoring tools report incorrect resource utilization
- Scheduler makes poor decisions based on wrong resource state
- Can lead to resource starvation or inefficient scheduling
The worker's protection mechanism prevents catastrophic over-allocation, but the scheduler's incorrect accounting creates system inconsistency.
Environment:
- Dask version: 2025.5.1
- Python version: 3.12
- Operating System: Rocky Linux 8.8
- Install method (conda, pip, source): pip