Skip to content

Move subtask queueing to workers #2631

@wjsi

Description

@wjsi

Motivations

Mars now handles all subtasks within a single supervisor. When the number of subtasks or workers is large, there can be huge load on a supervisor node. What's more, scheduling merely on the supervisor side brings considerable latency between worker tasks. If we move subtask scheduling to workers, these issues can be alleviated.

Overall design

This design enables workers to schedule subtasks submitted to it, while supervisors only act as batch assigners and coordinators. Subtasks created from TaskService will be assigned and pushed into workers. Then inside workers, subtasks are then queued and executed given priority assigned to them. Results are then fed back to supervisors for further activation of successors.

Subtask submission

When subtasks are generated, the assigned supervisor assignes and pushes all ready subtasks to corresponding workers. Unlike previous design, the supervisor no longer decides how many subtasks it need to submit to workers given global slot information, neither did it maintain queues of subtasks. Workers decide and run subtasks given their own storage, leading to faster reaction speed and narrower gap between execution.

Subtask queueing

Inside workers, we use queues with latches to order and control tasks. The queue can be seen as a combination of a priority queue deciding orders of subtasks with a semaphore deciding the number of subtasks to output. The default value of the semaphore is equal to the number of slots of given bands. The basic API of the queue is shown below:

class SubtaskPriorityQueueActor(mo.StatelessActor):
    @mo.extensible
    def put(self, subtask_id: str, band_name: str, priority: Tuple):
        """
        Put a subtask ID into the queue.
        """

    @mo.extensible
    def update_priority(self, subtask_id: str, band_name: str, priority: Tuple):
        """
        Update priority of given subtask.
        """

    async def get(self, band_name: str):
        """
        Get an item from the queue and returns the subtask ID
        and slot ID. Will wait when the queue is empty, or
        the value of semaphore is zero.
        """

    def release_slot(self, subtask_id: str, errors: str = "raise"):
        """
        Return the slot occupied by given subtask and increase
        the value of the semaphore.
        """

    @mo.extensible
    def remove(self, subtask_id: str):
        """
        Remove a subtask from the queue. If the subtask is occupying
        some slot, the slot is also released.
        """

More APIs can be added to implement operations like yield_slot.

To parallelize IO and CPU cost, two queues are set up inside the worker.

  • PrepareQueue: queue of submitted subtasks. A prepare task consumes items of the queue and do quora allocation as well as data moving. When a new subtask starts execution, its slot is released.
  • ExecutionQueue: queue of prepared subtasks. An execution task consumes items of the queue and do execution. When a subtask finishes execution, its slots are then released.

Successor forwarding

When a subtask finishes execution and we need to choose another subtask to run, we have two kinds of subtasks to schedule: subtasks already enqueued in ExecutionQueue, and subtasks whose predecessors are just filled by the execution finished just now. The latter group often have higher priority but without data preparation, and may not be scheduled because of latencies brought by queues. We design a successor forwarding mechanism to resolve this condition.

When pushing ready subtasks to scheduling service, its successors are also pushed for cache. Scheduling service decides and pushes subtasks to correct workers. Subtasks whose successors can be forwarded must satisfy conditions below:

  1. Some of the successors are cached in workers

  2. All dependent data of successors are already stored in workers, thus we do not need to consult Meta or Storage service for remote data retrival

  3. There is enough quota for the successor

When all conditions are met, PrepareQueueis skipped and the subtask is inserted into ExecutionQueuedirectly. When the slot is released, the successor will be scheduled as soon as possible.

Possible impacts

Autoscale

Current autoscale is based on queueing mechanism at supervisor side, which must be redesigned based on worker scheduling.

Fault Injection

As worker side of scheduling service is rewritten, fault injection need to adapt to that change.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions