Skip to content

Commit f531051

Browse files
fangyuchuzWaNg3a798347923205150940w00689259
authored
Milestone 1 of Internal Process-level Fault Tolerance (#61)
* feat(fault-tolerance): add class skeletons for fault tolerance Signed-off-by: fangyuchu <fangyuchu@qq.com> * config: add configuration options for fault tolerance Signed-off-by: fangyuchu <fangyuchu@qq.com> * 增加generate_identity和generate_identitys函数 Generate a unique identity for ZMQ ROUTER node * add service startup configuradtion fault report addr * add init WorkerGuard * add engine_core_cmd_addr、fault_report_addr、client_cmd_addr、engine_core_identitys in EngineZmqAddresses init engine_core_cmd_addr、fault_report_addr、client_cmd_addr in launch_core_engines func add _report_engine_dead func in CoreEngineProcManager * init ClientGuard init EngineZmqAddresses engine_core_identitys * init EngineCoreGuard * change generate_identitys to generate_identity_group * code typesetting is optimized * code typesetting is optimized * changed code format ensure every line < 88 chars * changed code format ensure every line < 88 chars fix error Value of type "dict[Any, Any] | None" is not indexable [index] * fix bug Error: vllm/v1/engine/utils.py:122:89: E501 Line too long (117 > 88) Error: vllm/v1/engine/utils.py:1059:9: F402 Import `uuid` from line 6 shadowed by loop variable * fix Error: vllm/v1/engine/utils.py:1045: error: Need type annotation for "uuids" (hint: "uuids: set[<type>] = ...") [var-annotated] * fix error: Value of type "dict[Any, Any] | None" is not indexable [index] * fix error: Value of type "dict[Any, Any] | None" is not indexable [index] Signed-off-by: a798347923 <2645302020@qq.com> * add _send_msg in EngineCoreGuard Signed-off-by: a798347923 <2645302020@qq.com> * add import torch.cuda * add _recv_cmd function docstring that clearly explains the meaning of the return value. * changed recv_fault_msg to recv_msg add ClientGuard __init__ func parameter types * add engine monitor Signed-off-by: TianZhuo <2770730562@qq.com> * Delete requirements/test.txt~ Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Delete vllm/v1/engine/core_client.py~ Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * simply _send_msg and _recv_cmd in EngineCoreGuard * simply recv_msg in ClientGuard * engine: add fault tolerance features for EngineCore. Signed-off-by: fangyuchu <fangyuchu@qq.com> * engine: add timeout mechanism in retry. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add engine monitor * Delete vllm/v1/engine/exceptions.py~ Signed-off-by: 205150940 <112750056+205150940@users.noreply.github.com> * updata actor_index * updata enginedead flag * handle fault and report exception Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix engine_actor * fix engine_actor fault_info * handle fault and report exception Signed-off-by: w00689259 <wangzhuo66@huawei.com> * delete num_identity * changed try expect * fix debug error * fix one bug. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add fault_report_addr in FaultToleranceConfig * add handle fault&get_fault_info api Signed-off-by: w00689259 <wangzhuo66@huawei.com> * remove fault_report_address in CoreEngineActorManager __init__ Signed-off-by: a798347923 <2645302020@qq.com> * ruff format Signed-off-by: a798347923 <2645302020@qq.com> * add handle fault&get_fault_info api Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix one bug. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add fault_report_port in FaultToleranceConfig Signed-off-by: a798347923 <2645302020@qq.com> * add zmq_addr concatenate with fault_report_addr and fault_report_port Signed-off-by: a798347923 <2645302020@qq.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix some bug * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fault reporter bug fix Signed-off-by: w00689259 <wangzhuo66@huawei.com> * remove fault_report_addr in FaultToleranceConfig Signed-off-by: a798347923 <2645302020@qq.com> * refactor: relocate method serialization functions to serial_util.py Signed-off-by: fangyuchu <fangyuchu@qq.com> * fix actor bug * fix actor bug * add engine_core_cmd_addr in FaultToleranceConfig Signed-off-by: a798347923 <2645302020@qq.com> * add and use _stop_worker_execution in EngineCoreGuard Signed-off-by: a798347923 <2645302020@qq.com> * add and use run in WorkerGuard Signed-off-by: a798347923 <2645302020@qq.com> * fix actor bug * fix bug * fix sentinel * fix bug vllm/v1/engine/core.py:847: error: Missing positional argument "tp_size" in call to "EngineCoreGuard" Signed-off-by: a798347923 <2645302020@qq.com> * fix bug error: Missing positional arguments "length", "byteorder" in call to "to_bytes" of "int" Signed-off-by: a798347923 <2645302020@qq.com> * fix bug in fault tolerance mode Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix bug in fault tolerance mode Signed-off-by: w00689259 <wangzhuo66@huawei.com> * change fault_report_port to internal_fault_report_port add external_fault_notify_port Signed-off-by: a798347923 <2645302020@qq.com> * change fault_report_port to internal_fault_report_port add external_fault_notify_port Signed-off-by: a798347923 <2645302020@qq.com> * add _recv_cmd func use deserialize_method_call and run_method in run func Signed-off-by: a798347923 <2645302020@qq.com> * Update core.py fix bug error: Need type annotation for "kwargs" (hint: "kwargs: dict[<type>, <type>] = ...") Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * add self.ctx.term() in shutdown() Signed-off-by: a798347923 <2645302020@qq.com> * changed import deserialize_method_call,serialize_method_call Signed-off-by: a798347923 <2645302020@qq.com> * changed init worker_guard in init_device Signed-off-by: a798347923 <2645302020@qq.com> * Update core.py add import serialize_method_call Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py changed init WorkerGuard in init_device Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py FIX BUG self.worker_guard: WorkerGuard|None = None Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py fix bug error: Argument 1 to "deserialize_method_call" has incompatible type "str | None"; expected "str" [arg-type] Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update gpu_worker.py ruff format Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Update core.py ruff-format Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * actively send exception information Signed-off-by: w00689259 <wangzhuo66@huawei.com> * actively send exception information Signed-off-by: w00689259 <wangzhuo66@huawei.com> * actively send exception information Signed-off-by: w00689259 <wangzhuo66@huawei.com> * change engine_core_cmd_addr(str) to engine_core_cmd_addrs(list[str]) in EngineZmqAddresses Signed-off-by: a798347923 <2645302020@qq.com> * change engine_core_cmd_addr(str) to engine_core_cmd_addrs(list[str]) in EngineZmqAddresses Signed-off-by: a798347923 <2645302020@qq.com> * Update utils.py delete engine_core_cmd_addr in EngineZmqAddresses Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> * Remove redundant configuration: fault-pub-port Signed-off-by: fangyuchu <fangyuchu@qq.com> * Send pause instructions after receiving fault info in ClientGuard Signed-off-by: fangyuchu <fangyuchu@qq.com> * change engine_core_guard_identities from dict[int, bytes] to list[bytes] Signed-off-by: a798347923 <2645302020@qq.com> * fix bug "only the worker guard of engine core 0 can receive messages sent from engine core guard Signed-off-by: a798347923 <2645302020@qq.com> * change local_rank to rank_in_group in WorkerGuard Signed-off-by: a798347923 <2645302020@qq.com> * changed del self.client_cmd_registry[int(unhealthy_engine.engine_id)] Signed-off-by: a798347923 <2645302020@qq.com> * add gloo communication timeout * fix some bug * add stateless_process_group gloo_comm_timeout * reconstruct fault receiver&fault handler Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix some bug * reconstruct fault receiver&fault handler Signed-off-by: w00689259 <wangzhuo66@huawei.com> * reconstruct fault receiver&fault handler Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix return format Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix return format Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix return format Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add abort request * fix some bug * fix some bug * fix some bug * add dt for client guard Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add dt for client guard Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add dt for client guard Signed-off-by: w00689259 <wangzhuo66@huawei.com> * Implementation of two types of pause: a soft one by using flag signals and a hard one by aborting nccl communicators. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Refine certain log forms and fix a minor bug in pause function. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Refactor and abstract the recv_msg logic in CG,ECG,WG. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Add and check method uuid when sending commands and receiving results. Signed-off-by: fangyuchu <fangyuchu@qq.com> * Abstract the logic of sending instructions and waiting responses from FaultHandler Signed-off-by: fangyuchu <fangyuchu@qq.com> * Add options in EngineCoreGuard to recv execution results from WorkerGuard Signed-off-by: fangyuchu <fangyuchu@qq.com> * Support worker reinitialization after hard pause; add task queue in FaultHandler to ensure sequential task execution Signed-off-by: fangyuchu <fangyuchu@qq.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * resolve conflicts Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add engine core ut Signed-off-by: w00689259 <wangzhuo66@huawei.com> * add engine core ut Signed-off-by: w00689259 <wangzhuo66@huawei.com> * Ensure WorkerGuard command execution returns result; fix missing set_device when TP>1 Signed-off-by: fangyuchu <fangyuchu@qq.com> * rename& format logger Signed-off-by: w00689259 <wangzhuo66@huawei.com> * rename& format logger Signed-off-by: w00689259 <wangzhuo66@huawei.com> * feat(nccl): enable non-blocking NCCL communicators to support ncclCommAbort Signed-off-by: fangyuchu <fangyuchu@qq.com> * reinit dp_group * fix bug * fix bug * fix bug * fix bug (#54) * Move requests to waiting queue instead of abandoing them directly. Signed-off-by: fangyuchu <fangyuchu@qq.com> * add annotation Signed-off-by: w00689259 <wangzhuo66@huawei.com> * fix typos Signed-off-by: fangyuchu <fangyuchu@qq.com> --------- Signed-off-by: fangyuchu <fangyuchu@qq.com> Signed-off-by: a798347923 <2645302020@qq.com> Signed-off-by: TianZhuo <2770730562@qq.com> Signed-off-by: a798347923 <39047817+a798347923@users.noreply.github.com> Signed-off-by: 205150940 <112750056+205150940@users.noreply.github.com> Signed-off-by: w00689259 <wangzhuo66@huawei.com> Signed-off-by: zWaNg3 <37772915+zWaNg3@users.noreply.github.com> Co-authored-by: zWaNg3 <37772915+zWaNg3@users.noreply.github.com> Co-authored-by: a798347923 <2645302020@qq.com> Co-authored-by: TianZhuo <2770730562@qq.com> Co-authored-by: 205150940 <112750056+205150940@users.noreply.github.com> Co-authored-by: a798347923 <39047817+a798347923@users.noreply.github.com> Co-authored-by: w00689259 <wangzhuo66@huawei.com>
1 parent 4b1ff13 commit f531051

28 files changed

+2600
-79
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
4+
import asyncio
5+
import json
6+
import threading
7+
import time
8+
from unittest.mock import AsyncMock
9+
10+
import pytest
11+
import zmq
12+
13+
from vllm.utils.collection_utils import ThreadSafeDict
14+
from vllm.v1.engine.core_client import ClientGuard
15+
from vllm.v1.engine.utils import FaultHandler, FaultInfo
16+
17+
FAULT_RECEIVER_ADDR = "tcp://127.0.0.1:8844"
18+
CMD_ADDR = "tcp://127.0.0.1:8845"
19+
FAULT_PUB_ADDR = "tcp://127.0.0.1:8846"
20+
FAULT_PUB_TOPIC = "vllm_fault"
21+
22+
23+
def create_test_thread_safe_dict(initial_data=None):
24+
if initial_data is None:
25+
initial_data = {1: "Healthy"}
26+
if initial_data is None:
27+
initial_data = {1: "Healthy"}
28+
tsd = ThreadSafeDict()
29+
if initial_data:
30+
for k, v in initial_data.items():
31+
tsd[k] = v
32+
return tsd
33+
34+
35+
def create_client_guard(
36+
engine_exception_q: asyncio.Queue, engine_status_dict: ThreadSafeDict[int, str]
37+
):
38+
return ClientGuard(
39+
fault_receiver_addr=FAULT_RECEIVER_ADDR,
40+
cmd_addr=CMD_ADDR,
41+
engine_registry=[b"engine_identity"],
42+
engine_exception_q=engine_exception_q,
43+
engine_exception_q_lock=asyncio.Lock(),
44+
fault_pub_addr=FAULT_PUB_ADDR,
45+
engine_status_dict=engine_status_dict,
46+
)
47+
48+
49+
def test_client_guard_initialization():
50+
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
51+
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
52+
guard = create_client_guard(engine_exception_q, engine_status_dict)
53+
54+
assert guard.engine_registry == [b"engine_identity"]
55+
assert not guard.client_guard_dead
56+
assert isinstance(guard.fault_handler, FaultHandler)
57+
assert guard.engine_exception_q is engine_exception_q
58+
59+
assert guard.fault_receiver_socket.type == zmq.ROUTER
60+
assert guard.cmd_socket.type == zmq.ROUTER
61+
assert guard.fault_pub_socket.type == zmq.PUB
62+
63+
guard.shutdown_guard()
64+
65+
66+
@pytest.mark.asyncio
67+
async def test_handle_fault():
68+
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
69+
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
70+
guard = create_client_guard(engine_exception_q, engine_status_dict)
71+
72+
engine_exception_q.put_nowait(
73+
FaultInfo(engine_id="1", message="test exception", type="test")
74+
)
75+
76+
guard.fault_handler.handle_fault = AsyncMock(return_value=True)
77+
78+
result = await guard.handle_fault("pause", 5)
79+
assert result is True
80+
guard.fault_handler.handle_fault.assert_awaited_once_with("pause", 5)
81+
82+
guard.shutdown_guard()
83+
84+
85+
def test_fault_receiver():
86+
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
87+
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
88+
guard = create_client_guard(engine_exception_q, engine_status_dict)
89+
90+
def send_test_message():
91+
ctx = zmq.Context()
92+
socket = ctx.socket(zmq.DEALER)
93+
socket.setsockopt(zmq.IDENTITY, b"test_sender")
94+
socket.connect(FAULT_RECEIVER_ADDR)
95+
96+
test_fault = FaultInfo(engine_id="1", type="dead", message="test error")
97+
socket.send_multipart([b"", test_fault.serialize().encode("utf-8")])
98+
socket.close()
99+
ctx.term()
100+
101+
sender_thread = threading.Thread(target=send_test_message)
102+
sender_thread.start()
103+
104+
def check_published_message():
105+
ctx = zmq.Context()
106+
sub_socket = ctx.socket(zmq.SUB)
107+
sub_socket.connect(FAULT_PUB_ADDR)
108+
sub_socket.setsockopt_string(zmq.SUBSCRIBE, FAULT_PUB_TOPIC)
109+
110+
message = sub_socket.recv_string()
111+
sub_socket.close()
112+
ctx.term()
113+
114+
prefix, data = message.split("|", 1)
115+
assert prefix == FAULT_PUB_TOPIC
116+
assert json.loads(data) == {"1": "Dead"}
117+
118+
check_thread = threading.Thread(target=check_published_message)
119+
check_thread.start()
120+
121+
time.sleep(0.1)
122+
123+
assert not engine_exception_q.empty()
124+
received_fault = engine_exception_q.get_nowait()
125+
assert received_fault.engine_id == "1"
126+
assert received_fault.type == "dead"
127+
128+
assert engine_status_dict[1] == "Dead"
129+
130+
guard.shutdown_guard()
131+
132+
133+
def test_fault_receiver_unhealthy():
134+
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
135+
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
136+
guard = create_client_guard(engine_exception_q, engine_status_dict)
137+
138+
def send_unhealthy_message():
139+
ctx = zmq.Context()
140+
socket = ctx.socket(zmq.DEALER)
141+
socket.setsockopt(zmq.IDENTITY, b"engine_identity")
142+
socket.connect(FAULT_RECEIVER_ADDR)
143+
144+
test_fault = FaultInfo(engine_id="1", type="error", message="test error")
145+
socket.send_multipart([b"", test_fault.serialize().encode()])
146+
socket.close()
147+
ctx.term()
148+
149+
threading.Thread(target=send_unhealthy_message).start()
150+
time.sleep(0.1)
151+
152+
assert engine_status_dict[1] == "Unhealthy"
153+
154+
guard.shutdown_guard()
155+
156+
157+
def test_shutdown_guard():
158+
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
159+
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
160+
guard = create_client_guard(engine_exception_q, engine_status_dict)
161+
162+
original_fault_sock = guard.fault_receiver_socket
163+
original_cmd_sock = guard.cmd_socket
164+
original_pub_sock = guard.fault_pub_socket
165+
original_ctx = guard.zmq_ctx
166+
167+
guard.shutdown_guard()
168+
169+
assert guard.client_guard_dead is True
170+
171+
with pytest.raises(zmq.ZMQError):
172+
original_fault_sock.recv()
173+
174+
with pytest.raises(zmq.ZMQError):
175+
original_cmd_sock.recv()
176+
177+
with pytest.raises(zmq.ZMQError):
178+
original_pub_sock.send(b"test")
179+
180+
assert original_ctx.closed
181+
182+
183+
@pytest.mark.asyncio
184+
async def test_handle_fault_async():
185+
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
186+
engine_status_dict = create_test_thread_safe_dict({1: "Unhealthy"})
187+
guard = create_client_guard(engine_exception_q, engine_status_dict)
188+
189+
time.sleep(0.1)
190+
ctx = zmq.Context().instance()
191+
cmd_socket = ctx.socket(zmq.DEALER)
192+
cmd_socket.setsockopt(zmq.IDENTITY, b"engine_identity")
193+
cmd_socket.connect(CMD_ADDR)
194+
195+
uuid = None
196+
197+
def receive_cmd(cmd_socket):
198+
nonlocal uuid
199+
time.sleep(0.1)
200+
201+
identity, msg = cmd_socket.recv_multipart()
202+
cmd_dict = json.loads(msg.decode("utf-8"))
203+
assert cmd_dict["method"] == "retry"
204+
assert cmd_dict["timeout"] == 3
205+
uuid = cmd_dict["method_uuid"]
206+
207+
def response_cmd(cmd_socket):
208+
nonlocal uuid
209+
while uuid is None:
210+
time.sleep(0.1)
211+
execute_result = {"engine_index": 1, "success": True, "method_uuid": uuid}
212+
cmd_socket.send_multipart([b"", json.dumps(execute_result).encode("utf-8")])
213+
214+
threading.Thread(target=receive_cmd, args=(cmd_socket,)).start()
215+
threading.Thread(target=response_cmd, args=(cmd_socket,)).start()
216+
217+
result = await guard.handle_fault("retry", 3)
218+
219+
assert result is True
220+
assert engine_status_dict[1] == "Healthy"
221+
222+
guard.shutdown_guard()
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
4+
import json
5+
import logging
6+
import queue
7+
import threading
8+
import time
9+
10+
import pytest
11+
import zmq
12+
13+
from vllm.utils.network_utils import make_zmq_socket
14+
from vllm.v1.engine.core import (
15+
EngineCoreGuard,
16+
EngineLoopPausedError,
17+
)
18+
from vllm.v1.serial_utils import serialize_method_call
19+
20+
CLIENT_CMD_ADDR = "tcp://127.0.0.1:8844"
21+
WORKER_CMD_ADDR = "tcp://127.0.0.1:8845"
22+
FAULT_REPORT_ADDR = "tcp://127.0.0.1:8846"
23+
GUARD_IDENTITY = b"engine_guard_0"
24+
25+
26+
def create_engine_core_guard(
27+
fault_signal_q: queue.Queue, busy_loop_active: threading.Event
28+
):
29+
return EngineCoreGuard(
30+
engine_index=0,
31+
fault_signal_q=fault_signal_q,
32+
cmd_q=queue.Queue(),
33+
busy_loop_active=busy_loop_active,
34+
engine_input_q=queue.Queue(),
35+
client_cmd_addr=CLIENT_CMD_ADDR,
36+
worker_cmd_addr=WORKER_CMD_ADDR,
37+
fault_report_addr=FAULT_REPORT_ADDR,
38+
guard_identity=GUARD_IDENTITY,
39+
tp_size=1,
40+
pp_size=1,
41+
)
42+
43+
44+
def test_engine_core_guard_initialization():
45+
fault_signal_q: queue.Queue = queue.Queue()
46+
busy_loop_active = threading.Event()
47+
48+
guard = create_engine_core_guard(fault_signal_q, busy_loop_active)
49+
50+
assert guard.engine_index == 0
51+
assert guard.tp_size == 1
52+
assert guard.pp_size == 1
53+
assert not guard.communicator_aborted
54+
assert guard.engine_running is True
55+
assert guard.daemon is True
56+
57+
assert guard.fault_report_socket.type == zmq.DEALER
58+
assert guard.client_cmd_socket.type == zmq.DEALER
59+
assert guard.worker_cmd_socket.type == zmq.ROUTER
60+
61+
guard.shutdown()
62+
63+
64+
@pytest.mark.parametrize("instruction", ["pause", "retry"])
65+
def test_run_handle_instruction(instruction):
66+
fault_signal_q: queue.Queue = queue.Queue()
67+
busy_loop_active = threading.Event()
68+
69+
client_socket = make_zmq_socket(
70+
ctx=zmq.Context(), path=CLIENT_CMD_ADDR, socket_type=zmq.ROUTER, bind=True
71+
)
72+
73+
time.sleep(0.1)
74+
75+
guard = create_engine_core_guard(fault_signal_q, busy_loop_active)
76+
time.sleep(0.1)
77+
78+
ctx = zmq.Context()
79+
worker_cmd_socket = ctx.socket(zmq.DEALER)
80+
worker_cmd_socket.setsockopt(zmq.IDENTITY, b"0_0")
81+
worker_cmd_socket.connect(WORKER_CMD_ADDR)
82+
83+
def mock_worker_receiver(cmd_socket):
84+
time.sleep(0.1)
85+
logging.info("start worker")
86+
identity, msg = cmd_socket.recv_multipart()
87+
logging.info(identity)
88+
cmd_dict = json.loads(msg.decode("utf-8"))
89+
assert (
90+
cmd_dict["method"] == "pause_by_signal"
91+
if instruction == "pause"
92+
else "retry"
93+
)
94+
response_dict = {"success": True, "method_uuid": cmd_dict["method_uuid"]}
95+
logging.info(identity)
96+
cmd_socket.send_multipart([b"", json.dumps(response_dict).encode("utf-8")])
97+
98+
threading.Thread(target=guard.run, daemon=True).start()
99+
time.sleep(0.1)
100+
101+
param = {"timeout": 3}
102+
if instruction == "pause":
103+
param["soft_pause"] = True
104+
serial_instruction = serialize_method_call(instruction, **param)
105+
client_socket.send_multipart(
106+
[GUARD_IDENTITY, b"", serial_instruction.encode("utf-8")]
107+
)
108+
if instruction == "pause":
109+
fault_signal_q.put(EngineLoopPausedError(Exception("test error")))
110+
elif instruction == "retry":
111+
busy_loop_active.set()
112+
113+
threading.Thread(target=mock_worker_receiver, args=(worker_cmd_socket,)).start()
114+
115+
time.sleep(0.1)
116+
identity, _, msg = client_socket.recv_multipart()
117+
result_dict = json.loads(msg.decode("utf-8"))
118+
assert result_dict["engine_index"] == 0
119+
assert result_dict["success"]
120+
121+
time.sleep(0.1)
122+
123+
client_socket.close()
124+
worker_cmd_socket.close()
125+
guard.shutdown()

vllm/config/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
PassConfig,
1010
)
1111
from vllm.config.device import DeviceConfig
12+
from vllm.config.fault_tolerance import FaultToleranceConfig
1213
from vllm.config.kv_events import KVEventsConfig
1314
from vllm.config.kv_transfer import KVTransferConfig
1415
from vllm.config.load import LoadConfig
@@ -83,6 +84,8 @@
8384
"SpeechToTextConfig",
8485
# From vllm.config.structured_outputs
8586
"StructuredOutputsConfig",
87+
# From vllm.config.fault_tolerance
88+
"FaultToleranceConfig",
8689
# From vllm.config.utils
8790
"ConfigType",
8891
"SupportsMetricsInfo",

0 commit comments

Comments
 (0)