-
-
Notifications
You must be signed in to change notification settings - Fork 11.4k
Milestone 1 of Internal Process-level Fault Tolerance (RFC #27866) #28296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Milestone 1 of Internal Process-level Fault Tolerance (RFC #27866) #28296
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| def _execute_worker_method(self, method_name, timeout: int = 5, **kwargs) -> bool: | ||
| identities = set() | ||
| for tp_rank in range(self.tp_size): | ||
| for pp_rank in range(self.pp_size): | ||
| identity = f"{tp_rank}_{pp_rank}".encode() | ||
| identities.add(identity) | ||
|
|
||
| method_uuid = broadcast_instruction( | ||
| self.worker_cmd_socket, identities, method_name, **kwargs | ||
| ) | ||
|
|
||
| all_success = True | ||
| worker_responses = wait_for_instruction_result( | ||
| self.worker_cmd_socket, identities, method_name, timeout, method_uuid | ||
| ) | ||
| for identity in identities: | ||
| response = worker_responses.get(identity) | ||
| if response is None or not response.get("success", False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preserve worker identity set when checking command results
Inside EngineCoreGuard._execute_worker_method the identities set is passed directly to wait_for_instruction_result, which removes identities from the set as responses arrive. The subsequent for identity in identities: loop therefore iterates over an empty set and never verifies worker_responses, so the method will return True even when some workers time out or return success=False. Fault-tolerance commands such as pause/retry can appear successful while underlying worker guards failed, leaving the system in an inconsistent state. Iterate over a copy of the identity set or over worker_responses.items() to correctly detect missing or failed acknowledgements.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the wait_for_instruction_result function, the line target_identities = set(target_identities) creates a new set as a shallow copy of the input target_identities. This means any modifications made to target_identities within wait_for_instruction_result will only affect this local copy, not the original identities set passed from _execute_worker_method.
|
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
First pass. Just some nitpicks.
I think it makes sense overall. Any end-to-end tests/demonstration that this is functional and actually helps recover from faults (considering there are quite some changes added)?
vllm/v1/worker/gpu_worker.py
Outdated
| nccl_comm.available = active | ||
| nccl_comm.disabled = not active | ||
|
|
||
| def restart_worker(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is a bit misleading? This looks like a resume/restore worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name has been changed to restore_worker.
vllm/v1/worker/gpu_worker.py
Outdated
| ).start() | ||
|
|
||
| def _make_worker_logger(self): | ||
| prefix = f"[WorkerGuard_dp{self.dp_rank}_tp{self.tp_rank}_pp{self.pp_rank}] " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: PP should come first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order is corrected.
vllm/v1/engine/core_client.py
Outdated
| return False | ||
|
|
||
|
|
||
| class ClientGuard: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we have a better name? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Considering there’s already a start_engine_core_monitor function in MPClient, perhaps Watchdog would be a better fit to replace the new Guard classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We renamed the Guard classes to Sentinel, which we think better reflects their role (similar to Redis Sentinel for monitoring and fault handling).
vllm/entrypoints/api_server.py
Outdated
|
|
||
| @app.post("/fault_tolerance/apply") | ||
| async def send_fault_tolerance_instruction(request: Request) -> Response: | ||
| """Generate completion for the request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error in doc string has been corrected.
f531051 to
7194000
Compare
73239fe to
9f496ca
Compare
Thanks for the review! 1. Simulated Exception
The recovery process took only 2 seconds, compared with 32 seconds for a full restart recovery, demonstrating significantly improved fault recovery efficiency. After retry, normal inference resumed ("Engine 000: Avg prompt throughput: 3.9 tokens/s, Avg generation throughput: 4.3 tokens/s, ..."). We also tested with ray backend with DP8/TP1, and here is the log: 2. Random process kill simulation |
vllm/engine/arg_utils.py
Outdated
| internal_fault_report_port=self.internal_fault_report_port | ||
| or FaultToleranceConfig.internal_fault_report_port, | ||
| external_fault_notify_port=self.external_fault_notify_port | ||
| or FaultToleranceConfig.external_fault_notify_port, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the ors necessary? self.external_fault_notify_port will always be truthy unless someone manually sets the port to 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that! The or part was a remnant from some early debugging code and has been removed in the latest update.
9f496ca to
05c6828
Compare
|
This pull request has merge conflicts that must be resolved before it can be |
* feat(fault-tolerance): add class skeletons for fault tolerance Signed-off-by: fangyuchu <fangyuchu@qq.com> * config: add configuration options for fault tolerance Signed-off-by: fangyuchu <fangyuchu@qq.com> * 增加generate_identity和generate_identitys函数 Generate a unique identity for ZMQ ROUTER node * add service startup configuradtion fault report addr * add init WorkerGuard * add engine_core_cmd_addr、fault_report_addr、client_cmd_addr、engine_core_identitys in EngineZmqAddresses init engine_core_cmd_addr、fault_report_addr、client_cmd_addr in launch_core_engines func add _report_engine_dead func in CoreEngineProcManager * init ClientGuard init EngineZmqAddresses engine_core_identitys * init EngineCoreGuard * change generate_identitys to generate_identity_group * code typesetting is optimized * code typesetting is optimized * changed code format ensure every line < 88 chars * changed code format ensure every line < 88 chars fix error Value of type "dict[Any, Any] | None" is not indexable [index] * fix bug Error: vllm/v1/engine/utils.py:122:89: E501 Line too long (117 > 88) Error: vllm/v1/engine/utils.py:1059:9: F402 Import `uuid` from line 6 shadowed by loop variable * fix Error: vllm/v1/engine/utils.py:1045: error: Need type annotation for "uuids" (hint: "uuids: set[<type>] = ...") [var-annotated] * fix error: Value of type "dict[Any, Any] | None" is not indexable [index] * fix error: Value of type "dict[Any, Any] | None" is not indexable [index] Signed-off-by: a798347923 <2645302020@qq.com> * add _send_msg in EngineCoreGuard Signed-off-by: a798347923 <2645302020@qq.com> * add import torch.cuda * add _recv_cmd function docstring that clearly explains the meaning of the return value. * changed recv_fault_msg to recv_msg add ClientGuard __init__ func parameter types * add engine monitor Signed-off-by: TianZhuo <2770730562@qq.com> * Delete requirements/test.txt~ Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Delete vllm/v1/engine/core_client.py~ Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * simply _send_msg and _recv_cmd in EngineCoreGuard * simply recv_msg in ClientGuard * engine: add fault tolerance features for EngineCore. Signed-off-by: fangyuchu <fangyuchu@qq.com> * engine: add timeout mechanism in retry. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add engine monitor * Delete vllm/v1/engine/exceptions.py~ Signed-off-by: 205150940 <112750056+205150940@users.noreply.github.com> * updata actor_index * updata enginedead flag * handle fault and report exception Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix engine_actor * fix engine_actor fault_info * handle fault and report exception Signed-off-by: w00689259 <wangzhuo66@huawei.com> * delete num_identity * changed try expect * fix debug error * fix one bug. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add fault_report_addr in FaultToleranceConfig * add handle fault&get_fault_info api Signed-off-by: w00689259 <wangzhuo66@huawei.com> * remove fault_report_address in CoreEngineActorManager __init__ Signed-off-by: a798347923 <2645302020@qq.com> * ruff format Signed-off-by: a798347923 <2645302020@qq.com> * add handle fault&get_fault_info api Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix one bug. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add fault_report_port in FaultToleranceConfig Signed-off-by: a798347923 <2645302020@qq.com> * add zmq_addr concatenate with fault_report_addr and fault_report_port Signed-off-by: a798347923 <2645302020@qq.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix some bug * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * remove fault_report_addr in FaultToleranceConfig Signed-off-by: a798347923 <2645302020@qq.com> * refactor: relocate method serialization functions to serial_util.py Signed-off-by: fangyuchu <fangyuchu@qq.com> * fix actor bug * fix actor bug * add engine_core_cmd_addr in FaultToleranceConfig Signed-off-by: a798347923 <2645302020@qq.com> * add and use _stop_worker_execution in EngineCoreGuard Signed-off-by: a798347923 <2645302020@qq.com> * add and use run in WorkerGuard Signed-off-by: a798347923 <2645302020@qq.com> * fix actor bug * fix bug * fix sentinel * fix bug vllm/v1/engine/core.py:847: error: Missing positional argument "tp_size" in call to "EngineCoreGuard" Signed-off-by: a798347923 <2645302020@qq.com> * fix bug error: Missing positional arguments "length", "byteorder" in call to "to_bytes" of "int" Signed-off-by: a798347923 <2645302020@qq.com> * fix bug in fault tolerance mode Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix bug in fault tolerance mode Signed-off-by: w00689259 <wangzhuo66@huawei.com> * change fault_report_port to internal_fault_report_port add external_fault_notify_port Signed-off-by: a798347923 <2645302020@qq.com> * change fault_report_port to internal_fault_report_port add external_fault_notify_port Signed-off-by: a798347923 <2645302020@qq.com> * add _recv_cmd func use deserialize_method_call and run_method in run func Signed-off-by: a798347923 <2645302020@qq.com> * Update core.py fix bug error: Need type annotation for "kwargs" (hint: "kwargs: dict[<type>, <type>] = ...") Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * add self.ctx.term() in shutdown() Signed-off-by: a798347923 <2645302020@qq.com> * changed import deserialize_method_call,serialize_method_call Signed-off-by: a798347923 <2645302020@qq.com> * changed init worker_guard in init_device Signed-off-by: a798347923 <2645302020@qq.com> * Update core.py add import serialize_method_call Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py changed init WorkerGuard in init_device Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py FIX BUG self.worker_guard: WorkerGuard|None = None Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py fix bug error: Argument 1 to "deserialize_method_call" has incompatible type "str | None"; expected "str" [arg-type] Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py ruff format Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update core.py ruff-format Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * actively send exception information Signed-off-by: w00689259 <wangzhuo66@huawei.com> * actively send exception information Signed-off-by: w00689259 <wangzhuo66@huawei.com> * actively send exception information Signed-off-by: w00689259 <wangzhuo66@huawei.com> * change engine_core_cmd_addr(str) to engine_core_cmd_addrs(list[str]) in EngineZmqAddresses Signed-off-by: a798347923 <2645302020@qq.com> * change engine_core_cmd_addr(str) to engine_core_cmd_addrs(list[str]) in EngineZmqAddresses Signed-off-by: a798347923 <2645302020@qq.com> * Update utils.py delete engine_core_cmd_addr in EngineZmqAddresses Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Remove redundant configuration: fault-pub-port Signed-off-by: fangyuchu <fangyuchu@qq.com> * Send pause instructions after receiving fault info in ClientGuard Signed-off-by: fangyuchu <fangyuchu@qq.com> * change engine_core_guard_identities from dict[int, bytes] to list[bytes] Signed-off-by: a798347923 <2645302020@qq.com> * fix bug "only the worker guard of engine core 0 can receive messages sent from engine core guard Signed-off-by: a798347923 <2645302020@qq.com> * change local_rank to rank_in_group in WorkerGuard Signed-off-by: a798347923 <2645302020@qq.com> * changed del self.client_cmd_registry[int(unhealthy_engine.engine_id)] Signed-off-by: a798347923 <2645302020@qq.com> * add gloo communication timeout * fix some bug * add stateless_process_group gloo_comm_timeout * reconstruct fault receiver&fault handler Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix some bug * reconstruct fault receiver&fault handler Signed-off-by: w00689259 <wangzhuo66@huawei.com> * reconstruct fault receiver&fault handler Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix return format Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix return format Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix return format Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add abort request * fix some bug * fix some bug * fix some bug * add dt for client guard Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add dt for client guard Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add dt for client guard Signed-off-by: w00689259 <wangzhuo66@huawei.com> * Implementation of two types of pause: a soft one by using flag signals and a hard one by aborting nccl communicators. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Refine certain log forms and fix a minor bug in pause function. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Refactor and abstract the recv_msg logic in CG,ECG,WG. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Add and check method uuid when sending commands and receiving results. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Abstract the logic of sending instructions and waiting responses from FaultHandler Signed-off-by: fangyuchu <fangyuchu@qq.com> * Add options in EngineCoreGuard to recv execution results from WorkerGuard Signed-off-by: fangyuchu <fangyuchu@qq.com> * Support worker reinitialization after hard pause; add task queue in FaultHandler to ensure sequential task execution Signed-off-by: fangyuchu <fangyuchu@qq.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add engine core ut Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add engine core ut Signed-off-by: w00689259 <wangzhuo66@huawei.com> * Ensure WorkerGuard command execution returns result; fix missing set_device when TP>1 Signed-off-by: fangyuchu <fangyuchu@qq.com> * rename& format logger Signed-off-by: w00689259 <wangzhuo66@huawei.com> * rename& format logger Signed-off-by: w00689259 <wangzhuo66@huawei.com> * feat(nccl): enable non-blocking NCCL communicators to support ncclCommAbort Signed-off-by: fangyuchu <fangyuchu@qq.com> * reinit dp_group * fix bug * fix bug * fix bug * fix bug (#54) * Move requests to waiting queue instead of abandoing them directly. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add annotation Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix typos Signed-off-by: fangyuchu <fangyuchu@qq.com> --------- Signed-off-by: fangyuchu <fangyuchu@qq.com> Signed-off-by: a798347923 <2645302020@qq.com> Signed-off-by: TianZhuo <2770730562@qq.com> Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> Signed-off-by: 205150940 <112750056+205150940@users.noreply.github.com> Signed-off-by: w00689259 <wangzhuo66@huawei.com> Signed-off-by: zWaNg3 <37772915+zWaNg3@users.noreply.github.com> Co-authored-by: zWaNg3 <37772915+zWaNg3@users.noreply.github.com> Co-authored-by: a798347923 <2645302020@qq.com> Co-authored-by: TianZhuo <2770730562@qq.com> Co-authored-by: 205150940 <112750056+205150940@users.noreply.github.com> Co-authored-by: a798347923 <39047817+a798347923@users.noreply.github.com> Co-authored-by: w00689259 <wangzhuo66@huawei.com>
…reinitialize dp_group with new port Signed-off-by: fangyuchu <fangyuchu@qq.com>
Signed-off-by: fangyuchu <fangyuchu@qq.com>
…thon side; use queue.Queue for engine_exception_q Signed-off-by: fangyuchu <fangyuchu@qq.com>
…converting engine_registry to a dict Signed-off-by: fangyuchu <fangyuchu@qq.com>
Signed-off-by: fangyuchu <fangyuchu@qq.com>
50d5b6e to
fae0e75
Compare
Co-authored with: @zWaNg3 @a798347923 @205150940
Purpose
Summary
This PR implements Milestone 1 of RFC #27866, introducing a fault tolerance framework for vLLM.
It provides mechanisms to detect runtime exceptions, pause execution, report fault events, and allow upper orchestration layers to apply fault tolerance methods (initially supporting retry).
Supported Functionality
Usage
Enable the Fault Tolerance feature via configuration, for example:
When an exception is raised in any rank (tested via code injection or killing a worker process), vLLM reports the event via a ZMQ PUB socket.
The status of each EngineCore can be queried via API:
Implementation Notes
We actually implemented two approaches to pause healthy ranks. Due to the complexity of collective communications, we currently adopt the hard pause method as the default.
NCCL: All communicators are set as nonblocking when fault tolerance is enabled, allowing safe use of ncclCommAbort as suggested by NCCL.
Currently, when querying EngineCore status via the API: EngineCores paused using the signal flag are reported as healthy. EngineCores paused due to aborted communicators are reported as unhealthy. Further refinements to the status reporting will be implemented in follow-ups.
We believe that this implementation is compatible with RFC #27774 and #27908 to isolate fault ranks through scale_down. This implementation establishes the overall fault tolerance framework within vLLM, providing mechanisms for detecting, pausing, reporting, and performing fault tolerance. Within this framework, fault isolation through elastic expert parallelism (#20323) is treated as one of the fault tolerance methods. As a result, this implementation and the elastic expert parallelism proposal are designed to be compatible.
We welcome further discussion and feedback from the community to refine and extend these fault tolerance mechanisms.
Test Plan
Test Result
DT results
End-to-end Test Results
We validated both the functional correctness and model precision after applying the retry-based fault tolerance mechanism. All fault tolerance features behaved as expected — including fault detection, pause/resume, event reporting, and retry recovery.
The precision tests showed no regression, indicating that the retry mechanism does not affect the model’s output quality.
Here are some vLLM log demonstrations from the end-to-end tests:
1. Simulated Exception
DP4_TP2_Exception_Retry.log
A fault was manually triggered by raising a
RuntimeError("Test Error")insidegpu_model_runner.execute_modelon DP rank0.The recovery process took only 2 seconds, compared with 32 seconds for a full restart recovery, demonstrating significantly improved fault recovery efficiency. After retry, normal inference resumed ("Engine 000: Avg prompt throughput: 3.9 tokens/s, Avg generation throughput: 4.3 tokens/s, ...").
We also tested with ray backend with DP8/TP1, and here is the log:
Ray_DP8_TP1_Exception_Retry.log
2. Random process kill simulation
DP8_TP1_KillProcess.log
In another test, we simulated faults by randomly killing one of the worker processes (23:23:49, EngineCore DP4).
After the kill, the instance automatically entered the paused state.
As no fault-tolerance command was received within the pre-defined timeout (engine-recovery-timeout, 120s), the instance exited gracefully("23:25:49 [core.py:389] [BusyLoopWrapper] Fault tolerance instruction not received within timeout. Proceeding with default exception handling.").
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.