Skip to content

Commit 05c6828

Browse files
committed
Fix hanging issue in DT; fix hang when aborting communicators from Python side; use queue.Queue for engine_exception_q
Signed-off-by: fangyuchu <fangyuchu@qq.com>
1 parent ba183cf commit 05c6828

File tree

6 files changed

+75
-85
lines changed

6 files changed

+75
-85
lines changed

tests/v1/engine/test_client_guard.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33

4-
import asyncio
54
import json
5+
import queue
66
import threading
77
import time
88
from unittest.mock import AsyncMock
@@ -32,21 +32,20 @@ def create_test_thread_safe_dict(initial_data=None):
3232

3333

3434
def create_client_guard(
35-
engine_exception_q: asyncio.Queue, engine_status_dict: ThreadSafeDict[int, str]
35+
engine_exception_q: queue.Queue, engine_status_dict: ThreadSafeDict[int, str]
3636
):
3737
return ClientGuard(
3838
fault_receiver_addr=FAULT_RECEIVER_ADDR,
3939
cmd_addr=CMD_ADDR,
4040
engine_registry=[b"engine_identity"],
4141
engine_exception_q=engine_exception_q,
42-
engine_exception_q_lock=asyncio.Lock(),
4342
fault_pub_addr=FAULT_PUB_ADDR,
4443
engine_status_dict=engine_status_dict,
4544
)
4645

4746

4847
def test_client_guard_initialization():
49-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
48+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
5049
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
5150
guard = create_client_guard(engine_exception_q, engine_status_dict)
5251

@@ -64,7 +63,7 @@ def test_client_guard_initialization():
6463

6564
@pytest.mark.asyncio
6665
async def test_handle_fault():
67-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
66+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
6867
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
6968
guard = create_client_guard(engine_exception_q, engine_status_dict)
7069

@@ -82,7 +81,7 @@ async def test_handle_fault():
8281

8382

8483
def test_fault_receiver():
85-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
84+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
8685
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
8786
guard = create_client_guard(engine_exception_q, engine_status_dict)
8887

@@ -97,7 +96,7 @@ def send_test_message():
9796
socket.close()
9897
ctx.term()
9998

100-
sender_thread = threading.Thread(target=send_test_message)
99+
sender_thread = threading.Thread(target=send_test_message, daemon=True)
101100
sender_thread.start()
102101

103102
def check_published_message():
@@ -114,7 +113,7 @@ def check_published_message():
114113
assert prefix == FAULT_PUB_TOPIC
115114
assert json.loads(data) == {"1": "Dead"}
116115

117-
check_thread = threading.Thread(target=check_published_message)
116+
check_thread = threading.Thread(target=check_published_message, daemon=True)
118117
check_thread.start()
119118

120119
time.sleep(0.1)
@@ -130,7 +129,7 @@ def check_published_message():
130129

131130

132131
def test_fault_receiver_unhealthy():
133-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
132+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
134133
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
135134
guard = create_client_guard(engine_exception_q, engine_status_dict)
136135

@@ -145,7 +144,7 @@ def send_unhealthy_message():
145144
socket.close()
146145
ctx.term()
147146

148-
threading.Thread(target=send_unhealthy_message).start()
147+
threading.Thread(target=send_unhealthy_message, daemon=True).start()
149148
time.sleep(0.1)
150149

151150
assert engine_status_dict[1] == "Unhealthy"
@@ -154,7 +153,7 @@ def send_unhealthy_message():
154153

155154

156155
def test_shutdown_guard():
157-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
156+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
158157
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
159158
guard = create_client_guard(engine_exception_q, engine_status_dict)
160159

@@ -181,7 +180,7 @@ def test_shutdown_guard():
181180

182181
@pytest.mark.asyncio
183182
async def test_handle_fault_async():
184-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
183+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
185184
engine_status_dict = create_test_thread_safe_dict({0: "Unhealthy"})
186185
guard = create_client_guard(engine_exception_q, engine_status_dict)
187186

@@ -190,6 +189,7 @@ async def test_handle_fault_async():
190189
cmd_socket = ctx.socket(zmq.DEALER)
191190
cmd_socket.setsockopt(zmq.IDENTITY, b"engine_identity")
192191
cmd_socket.connect(CMD_ADDR)
192+
time.sleep(0.1)
193193

194194
uuid = None
195195

@@ -210,13 +210,14 @@ def response_cmd(cmd_socket):
210210
execute_result = {"engine_index": 0, "success": True, "method_uuid": uuid}
211211
cmd_socket.send_multipart([b"", json.dumps(execute_result).encode("utf-8")])
212212

213-
threading.Thread(target=receive_cmd, args=(cmd_socket,)).start()
214-
threading.Thread(target=response_cmd, args=(cmd_socket,)).start()
213+
threading.Thread(target=receive_cmd, args=(cmd_socket,), daemon=True).start()
214+
threading.Thread(target=response_cmd, args=(cmd_socket,), daemon=True).start()
215215

216216
result = await guard.handle_fault("retry", 3)
217217

218218
assert result is True
219219
assert engine_status_dict[0] == "Healthy"
220220

221221
cmd_socket.close()
222+
ctx.term()
222223
guard.shutdown_guard()

tests/v1/engine/test_engine_core_guard.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ def mock_worker_receiver(cmd_socket):
113113
elif instruction == "retry":
114114
busy_loop_active.set()
115115

116-
threading.Thread(target=mock_worker_receiver, args=(worker_cmd_socket,)).start()
116+
threading.Thread(
117+
target=mock_worker_receiver, args=(worker_cmd_socket,), daemon=True
118+
).start()
117119

118120
time.sleep(0.1)
119121
identity, _, msg = client_socket.recv_multipart()

vllm/v1/engine/core.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ def _stop_worker_execution(self, soft_pause: bool, timeout: int = 2) -> bool:
199199
pause_method = "pause_by_abort_communicators"
200200
self.communicator_aborted = True
201201

202-
success = self._execute_worker_method(pause_method, timeout=timeout)
202+
success = self._execute_worker_method(
203+
pause_method, timeout=timeout, worker_timeout=timeout
204+
)
203205
return success
204206

205207
def _execute_worker_method(self, method_name, timeout: int = 5, **kwargs) -> bool:
@@ -238,18 +240,19 @@ def _execute_cmd(self, cmd_str):
238240
try:
239241
success = run_method(self, method, args=(), kwargs=method_params)
240242
self.logger("Command (%s) succeeded: %s", method, success, level="info")
241-
243+
reason = None
242244
except Exception as e:
243245
self.logger(
244-
"Error executing method %s: %s %s",
246+
"Error executing method %s: %s, %s",
245247
method,
246248
type(e).__name__,
247249
e,
248250
level="error",
249251
)
250252
success = False
253+
reason = f"{type(e).__name__}: {e}"
251254

252-
self._send_execution_result(success, method_uuid)
255+
self._send_execution_result(success, method_uuid, reason)
253256

254257
def pause(self, timeout: int = 1, soft_pause: bool = True) -> bool:
255258
"""
@@ -297,6 +300,9 @@ def retry(self, new_stateless_dp_group_port: int, timeout: int = 1):
297300
This instruction tells the EngineCore to continue its busy loop
298301
after being suspended due to an exception.
299302
"""
303+
if self.engine_running:
304+
return True
305+
300306
start_time = time.monotonic()
301307

302308
success = self._execute_worker_method("restore_worker", timeout=timeout)
@@ -320,14 +326,19 @@ def retry(self, new_stateless_dp_group_port: int, timeout: int = 1):
320326
remaining_timeout = max(0, timeout - elapsed)
321327
success = self.busy_loop_active.wait(timeout=remaining_timeout)
322328
self.engine_running = success
329+
assert self.cmd_q.empty(), "cmd_q must be empty after execution"
323330
return success
324331

325-
def _send_execution_result(self, success: bool, method_uuid: str):
332+
def _send_execution_result(
333+
self, success: bool, method_uuid: str, reason: str | None
334+
):
326335
msg = {
327336
"engine_index": self.engine_index,
328337
"success": success,
329338
"method_uuid": method_uuid,
330339
}
340+
if not success and reason is not None:
341+
msg["reason"] = reason
331342
msg_bytes = json.dumps(msg).encode("utf-8")
332343
self.client_cmd_socket.send_multipart([b"", msg_bytes])
333344

@@ -949,7 +960,7 @@ def __init__(
949960
# Track whether the busy loop is currently active.
950961
self.busy_loop_active = threading.Event()
951962
self.fault_signal_q: queue.Queue[Exception] = queue.Queue()
952-
self.cmd_q: queue.Queue[str | None] = queue.Queue()
963+
self.cmd_q: queue.Queue[str | None] = queue.Queue(maxsize=1)
953964
self.engine_recovery_timeout = ft_config.engine_recovery_timeout
954965
engine_core_guard_ids = addresses.engine_core_guard_identities
955966
assert engine_core_guard_ids is not None

vllm/v1/engine/core_client.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,7 @@ def __init__(
351351
fault_receiver_addr: str,
352352
cmd_addr: str,
353353
engine_registry: list[bytes],
354-
engine_exception_q: asyncio.Queue[FaultInfo],
355-
engine_exception_q_lock: asyncio.Lock,
354+
engine_exception_q: queue.Queue[FaultInfo],
356355
fault_pub_addr: str,
357356
engine_status_dict: ThreadSafeDict[int, str],
358357
):
@@ -372,17 +371,14 @@ def __init__(
372371
ctx=self.zmq_ctx, path=fault_pub_addr, socket_type=zmq.PUB, bind=True
373372
)
374373

375-
self.engine_exception_q: asyncio.Queue[FaultInfo] = engine_exception_q
376-
377-
self.engine_exception_q_lock = engine_exception_q_lock
374+
self.engine_exception_q: queue.Queue[FaultInfo] = engine_exception_q
378375

379376
self.engine_status_dict: ThreadSafeDict[int, str] = engine_status_dict
380377

381378
self.fault_handler = FaultHandler(
382379
self.cmd_socket,
383380
self.engine_registry,
384381
self.engine_exception_q,
385-
self.engine_exception_q_lock,
386382
self.engine_status_dict,
387383
)
388384

@@ -449,7 +445,7 @@ def fault_receiver(self):
449445
)
450446

451447
# Pause healthy engines on fault.
452-
# Pause will be invoked again during fault-tolerance handling,
448+
# Pause can be invoked again during fault-tolerance handling,
453449
# so it's unnecessary to track whether all engines are currently
454450
# paused.
455451
self.fault_handler.submit_fault("pause", 5, soft_pause=False)
@@ -684,7 +680,7 @@ def __init__(
684680
self.start_engine_core_monitor()
685681

686682
if vllm_config.fault_tolerance_config.enable_fault_tolerance:
687-
self.engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
683+
self.engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
688684
assert addresses.fault_report_addr is not None, (
689685
"addresses.fault_report_addr should not be None at fault tolerance"
690686
" scenario"
@@ -694,7 +690,6 @@ def __init__(
694690
" scenario"
695691
)
696692
self.engine_registry = addresses.engine_core_guard_identities
697-
self.engine_exception_q_lock = asyncio.Lock()
698693
assert self.engine_registry is not None
699694
assert addresses.fault_pub_socket_addr is not None, (
700695
"addresses.fault_pub_socket_addr should not be None at"
@@ -708,7 +703,6 @@ def __init__(
708703
addresses.client_cmd_addr,
709704
self.engine_registry,
710705
self.engine_exception_q,
711-
self.engine_exception_q_lock,
712706
addresses.fault_pub_socket_addr,
713707
self.engine_status_dict,
714708
)
@@ -758,7 +752,7 @@ def monitor_actors():
758752
if not all_actors:
759753
return
760754
while True:
761-
for actor in all_actors:
755+
for actor in all_actors[:]:
762756
actor_id = actor._actor_id.hex()
763757
if actor in engine_manager.local_engine_actors:
764758
actor_index = engine_manager.local_engine_actors.index(actor)

vllm/v1/engine/utils.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import multiprocessing
77
import os
8+
import queue
89
import time
910
import uuid
1011
import weakref
@@ -1224,20 +1225,6 @@ def generate_identity_group(peer1, peer2, use, n):
12241225
return identitys
12251226

12261227

1227-
async def get_queue_snapshot(queue: asyncio.Queue, queue_lock: asyncio.Lock) -> list:
1228-
"""Thread-safe snapshot of the exception queue."""
1229-
async with queue_lock:
1230-
items = []
1231-
# get item at first
1232-
while not queue.empty():
1233-
item = queue.get_nowait()
1234-
items.append(item)
1235-
# put item into queue again
1236-
for item in items:
1237-
queue.put_nowait(item)
1238-
return items
1239-
1240-
12411228
def broadcast_instruction(
12421229
cmd_socket,
12431230
target_identities: set[bytes] | list[bytes],
@@ -1346,13 +1333,11 @@ def __init__(
13461333
self,
13471334
cmd_socket: zmq.Socket,
13481335
client_cmd_registry: list[bytes],
1349-
engine_exception_q: asyncio.Queue[FaultInfo],
1350-
engine_exception_q_lock: asyncio.Lock,
1336+
engine_exception_q: queue.Queue[FaultInfo],
13511337
engine_status_dict: ThreadSafeDict[int, str],
13521338
) -> None:
13531339
self.cmd_socket = cmd_socket
13541340
self.engine_exception_q = engine_exception_q
1355-
self.engine_exception_q_lock = engine_exception_q_lock
13561341
self.engine_status_dict: ThreadSafeDict[int, str] = engine_status_dict
13571342
self.engine_identity_to_index: dict[bytes, int] = {
13581343
identity: i for i, identity in enumerate(client_cmd_registry)
@@ -1394,9 +1379,8 @@ async def _dispatcher(self):
13941379
def retry(self, **kwargs):
13951380
if "Dead" in self.engine_status_dict.values():
13961381
self.logger(
1397-
"engine_core dead unexpectedly, retry is impossible,"
1398-
"shutdown will be performed",
1399-
level="info",
1382+
"Engine core is dead; retry won't work.",
1383+
level="warning",
14001384
)
14011385
return False, set(), kwargs
14021386

@@ -1472,7 +1456,12 @@ async def _handle_fault_internal(
14721456
if instruction == "retry" and all_success:
14731457
for engine_index, _ in self.engine_status_dict.items():
14741458
self.engine_status_dict[engine_index] = "Healthy"
1475-
# todo: should we also clear the engine_exception_q here?
1459+
while not self.engine_exception_q.empty():
1460+
try:
1461+
self.engine_exception_q.get_nowait()
1462+
except queue.Empty:
1463+
break
1464+
14761465
return all_success
14771466

14781467
async def handle_fault(self, instruction: str, timeout: int, **kwargs) -> bool:

0 commit comments

Comments
 (0)