Skip to content

Resource scheduling bug: Scheduler allows over-allocation by ignoring worker's current resource usage #9108

@g199209

Description

@g199209

Describe the issue:

Dask scheduler incorrectly allows resource over-allocation when scheduling tasks with custom resource requirements. The scheduler's valid_workers() method only checks if a worker's total declared resources meet the task requirements, but ignores the worker's currently used resources. This leads to severe resource over-allocation (up to 450% in our tests) and inconsistent resource accounting between scheduler and worker.

The worker layer correctly enforces resource limits, but this creates a discrepancy where the scheduler reports tasks as "processing" while they're actually queued due to resource constraints.

Minimal Complete Verifiable Example:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import dask
from distributed import LocalCluster, Client

def simple_task(task_id, duration=15):
    """Simple task that requires MEM resources"""
    print(f"Task {task_id} starting...")
    time.sleep(duration)
    print(f"Task {task_id} completed!")
    return f"Result from task {task_id}"

def test_resource_bug():
    print("=" * 60)
    print("DASK RESOURCE SCHEDULING BUG REPRODUCTION")
    print("=" * 60)
    print(f"Dask version: {dask.__version__}")
    
    # Create a local cluster with one worker having limited MEM resources
    cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=20,  # High thread count to avoid thread limit interference
        memory_limit=None,  # Disable memory limit to focus on custom resources
        resources={"MEM": 10},  # Worker declares 10 MEM resources
        dashboard_address=None,
        silence_logs=False
    )
    
    client = Client(cluster)
    
    try:
        print(f"\nCluster Info:")
        print(f"Workers: {len(client.scheduler_info()['workers'])}")
        
        # Helper function to get resource info from scheduler
        def get_worker_resources(dask_scheduler):
            scheduler = dask_scheduler
            worker_info = {}
            for addr, worker in scheduler.workers.items():
                worker_info[addr] = {
                    'declared_resources': dict(worker.resources),
                    'used_resources': dict(worker.used_resources),
                    'processing_count': len(worker.processing)
                }
            return worker_info
        
        # Check initial resources
        print(f"\nInitial Worker Resources:")
        initial_resources = client.run_on_scheduler(get_worker_resources)
        for addr, info in initial_resources.items():
            print(f"  Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
                  f"used_MEM={info['used_resources'].get('MEM', 0)}, "
                  f"tasks={info['processing_count']}")
        
        print("\nSubmitting 15 tasks, each requiring 3 MEM resources...")
        print("   Worker declares: 10 MEM")
        print("   Total requirement: 15 * 3 = 45 MEM") 
        print("   Expected (strict): Only 3 tasks should run (3*3=9 <= 10 MEM)")
        print("   Let's see how many actually run simultaneously...")
        print("   Worker has 20 threads, so thread count is NOT a limiting factor")
        
        # Submit 15 tasks, each requiring 3 MEM resources
        # Total requirement: 15 * 3 = 45 MEM
        # Available: 10 MEM
        # Let's see how many actually run
        futures = []
        for i in range(1, 16):
            future = client.submit(
                simple_task, 
                task_id=i, 
                duration=15,  # Long enough to observe resource allocation
                resources={'MEM': 3},
                pure=False
            )
            futures.append(future)
            print(f"  Submitted task {i} (requires 3 MEM)")
        
        # Wait for tasks to start executing
        print("\nWaiting for tasks to start...")
        
        # Wait until at least some tasks are running
        max_wait = 10
        wait_time = 0
        while wait_time < max_wait:
            current_check = client.run_on_scheduler(get_worker_resources)
            running_tasks = sum(info['processing_count'] for info in current_check.values())
            if running_tasks > 0:
                print(f"Tasks started! {running_tasks} tasks now running.")
                break
            time.sleep(0.5)
            wait_time += 0.5
        
        if wait_time >= max_wait:
            print("Warning: Tasks may not have started yet")
        
        # Check resource usage after submission
        print(f"\nResource Usage After Task Submission:")
        current_resources = client.run_on_scheduler(get_worker_resources)
        for addr, info in current_resources.items():
            declared = info['declared_resources'].get('MEM', 0)
            used = info['used_resources'].get('MEM', 0)
            tasks = info['processing_count']
            overallocation = (used / declared * 100) if declared > 0 else 0
            
            print(f"  Worker: declared_MEM={declared}, used_MEM={used}, processing_tasks={tasks}")
            print(f"  Resource utilization: {overallocation:.1f}%")
            
            # Now with 20 threads, we should see all 15 tasks actually running if there's no resource limit
            print(f"  NOTE: Worker has 20 threads, so thread limit should NOT be a constraint")
            
            if used > declared:
                print(f"  *** RESOURCE ACCOUNTING BUG DETECTED!")
                print(f"      Scheduler reserved {used} MEM for {tasks} tasks")
                print(f"      but worker only declared {declared} MEM resources!")
                print(f"      This shows scheduler doesn't check available resources!")
                print(f"      Overallocation: {used - declared} MEM over limit")
            elif tasks < 15:
                print(f"  PARTIAL SCHEDULING: {tasks} tasks running, {15-tasks} waiting")
            else:
                print(f"  FULL OVERCOMMIT: All {tasks} tasks scheduled!")
                
        # Count how many tasks are actually running by checking logs
        # We'll compare this with scheduler's processing count
        
        # Check task statuses
        print(f"\nTask Statuses:")
        for i, future in enumerate(futures, 1):
            print(f"  Task {i}: {future.status}")
        
        # Cancel tasks to complete demonstration
        print(f"\nCancelling tasks to complete demonstration...")
        for future in futures:
            future.cancel()
        
        time.sleep(2)  # Wait for cancellation to complete
        
        # Final resource check
        print(f"\nFinal Resource Usage (after cancellation):")
        final_resources = client.run_on_scheduler(get_worker_resources)
        for addr, info in final_resources.items():
            print(f"  Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
                  f"used_MEM={info['used_resources'].get('MEM', 0)}, "
                  f"tasks={info['processing_count']}")
                  
    finally:
        print(f"\nClosing cluster...")
        client.close()
        cluster.close()

    print(f"\n" + "=" * 60)
    print("BUG REPORT SUMMARY:")
    print("- Issue: Dask scheduler doesn't check current resource usage")
    print("- Location: distributed/scheduler.py, valid_workers() method")
    print("- Line ~3279-3281: if supplied >= required:")
    print("- Fix needed: if (supplied - used_resources) >= required:")
    print("- Version affected: dask==2025.5.1 (possibly others)")
    print("=" * 60)

if __name__ == "__main__":
    test_resource_bug()

Expected Output (showing the bug):

Resource Usage After Task Submission:
  Worker: declared_MEM=10, used_MEM=45, processing_tasks=15
  Resource utilization: 450.0%
  *** RESOURCE ACCOUNTING BUG DETECTED!
      Scheduler reserved 45 MEM for 15 tasks
      but worker only declared 10 MEM resources!
      Overallocation: 35 MEM over limit

# But in the logs, only 3 tasks actually start:
Task 1 starting...
Task 2 starting...
Task 3 starting...

Root Cause Analysis:

This is a dual-layer architecture problem with inconsistent resource management between scheduler and worker:

1. Scheduler Layer Bug (distributed/scheduler.py:3279-3281)

The valid_workers() method in the scheduler has a critical flaw:

# Current buggy code
for addr, supplied in dr.items():
    if supplied >= required:  # 🚨 Only checks total declared, not available
        sw.add(addr)

Should be fixed to:

for addr, supplied in dr.items():
    ws = self.workers[addr] 
    available = supplied - ws.used_resources.get(resource, 0)
    if available >= required:
        sw.add(addr)

2. Worker Layer (Working Correctly)

The worker correctly enforces resource limits via distributed/worker_state_machine.py:

def _resource_restrictions_satisfied(self, ts: TaskState) -> bool:
    if not ts.resource_restrictions:
        return True
    return all(
        self.available_resources[resource] >= needed
        for resource, needed in ts.resource_restrictions.items()
    )

3. The Discrepancy

  • Scheduler says: 15 tasks processing, 45 MEM used (450% over-allocation)
  • Worker reality: Only 3 tasks executing, 9 MEM actually used
  • Result: Resource accounting is completely unreliable

Anything else we need to know?:

This bug has significant implications:

  1. Resource accounting is unreliable: used_resources shows false data
  2. Monitoring tools report incorrect resource utilization
  3. Scheduler makes poor decisions based on wrong resource state
  4. Can lead to resource starvation or inefficient scheduling

The worker's protection mechanism prevents catastrophic over-allocation, but the scheduler's incorrect accounting creates system inconsistency.

Environment:

  • Dask version: 2025.5.1
  • Python version: 3.12
  • Operating System: Rocky Linux 8.8
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions