Skip to content

Commit b5be237

Browse files
committed
Fix hanging issue in DT; fix hang when aborting communicators from Python side; use queue.Queue for engine_exception_q
Signed-off-by: fangyuchu <fangyuchu@qq.com>
1 parent 20a5a5a commit b5be237

File tree

6 files changed

+75
-85
lines changed

6 files changed

+75
-85
lines changed

tests/v1/engine/test_client_guard.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33

4-
import asyncio
54
import json
5+
import queue
66
import threading
77
import time
88
from unittest.mock import AsyncMock
@@ -32,21 +32,20 @@ def create_test_thread_safe_dict(initial_data=None):
3232

3333

3434
def create_client_guard(
35-
engine_exception_q: asyncio.Queue, engine_status_dict: ThreadSafeDict[int, str]
35+
engine_exception_q: queue.Queue, engine_status_dict: ThreadSafeDict[int, str]
3636
):
3737
return ClientGuard(
3838
fault_receiver_addr=FAULT_RECEIVER_ADDR,
3939
cmd_addr=CMD_ADDR,
4040
engine_registry=[b"engine_identity"],
4141
engine_exception_q=engine_exception_q,
42-
engine_exception_q_lock=asyncio.Lock(),
4342
fault_pub_addr=FAULT_PUB_ADDR,
4443
engine_status_dict=engine_status_dict,
4544
)
4645

4746

4847
def test_client_guard_initialization():
49-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
48+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
5049
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
5150
guard = create_client_guard(engine_exception_q, engine_status_dict)
5251

@@ -64,7 +63,7 @@ def test_client_guard_initialization():
6463

6564
@pytest.mark.asyncio
6665
async def test_handle_fault():
67-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
66+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
6867
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
6968
guard = create_client_guard(engine_exception_q, engine_status_dict)
7069

@@ -82,7 +81,7 @@ async def test_handle_fault():
8281

8382

8483
def test_fault_receiver():
85-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
84+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
8685
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
8786
guard = create_client_guard(engine_exception_q, engine_status_dict)
8887

@@ -97,7 +96,7 @@ def send_test_message():
9796
socket.close()
9897
ctx.term()
9998

100-
sender_thread = threading.Thread(target=send_test_message)
99+
sender_thread = threading.Thread(target=send_test_message, daemon=True)
101100
sender_thread.start()
102101

103102
def check_published_message():
@@ -114,7 +113,7 @@ def check_published_message():
114113
assert prefix == FAULT_PUB_TOPIC
115114
assert json.loads(data) == {"1": "Dead"}
116115

117-
check_thread = threading.Thread(target=check_published_message)
116+
check_thread = threading.Thread(target=check_published_message, daemon=True)
118117
check_thread.start()
119118

120119
time.sleep(0.1)
@@ -130,7 +129,7 @@ def check_published_message():
130129

131130

132131
def test_fault_receiver_unhealthy():
133-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
132+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
134133
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
135134
guard = create_client_guard(engine_exception_q, engine_status_dict)
136135

@@ -145,7 +144,7 @@ def send_unhealthy_message():
145144
socket.close()
146145
ctx.term()
147146

148-
threading.Thread(target=send_unhealthy_message).start()
147+
threading.Thread(target=send_unhealthy_message, daemon=True).start()
149148
time.sleep(0.1)
150149

151150
assert engine_status_dict[1] == "Unhealthy"
@@ -154,7 +153,7 @@ def send_unhealthy_message():
154153

155154

156155
def test_shutdown_guard():
157-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
156+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
158157
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
159158
guard = create_client_guard(engine_exception_q, engine_status_dict)
160159

@@ -181,7 +180,7 @@ def test_shutdown_guard():
181180

182181
@pytest.mark.asyncio
183182
async def test_handle_fault_async():
184-
engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
183+
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
185184
engine_status_dict = create_test_thread_safe_dict({0: "Unhealthy"})
186185
guard = create_client_guard(engine_exception_q, engine_status_dict)
187186

@@ -190,6 +189,7 @@ async def test_handle_fault_async():
190189
cmd_socket = ctx.socket(zmq.DEALER)
191190
cmd_socket.setsockopt(zmq.IDENTITY, b"engine_identity")
192191
cmd_socket.connect(CMD_ADDR)
192+
time.sleep(0.1)
193193

194194
uuid = None
195195

@@ -210,13 +210,14 @@ def response_cmd(cmd_socket):
210210
execute_result = {"engine_index": 0, "success": True, "method_uuid": uuid}
211211
cmd_socket.send_multipart([b"", json.dumps(execute_result).encode("utf-8")])
212212

213-
threading.Thread(target=receive_cmd, args=(cmd_socket,)).start()
214-
threading.Thread(target=response_cmd, args=(cmd_socket,)).start()
213+
threading.Thread(target=receive_cmd, args=(cmd_socket,), daemon=True).start()
214+
threading.Thread(target=response_cmd, args=(cmd_socket,), daemon=True).start()
215215

216216
result = await guard.handle_fault("retry", 3)
217217

218218
assert result is True
219219
assert engine_status_dict[0] == "Healthy"
220220

221221
cmd_socket.close()
222+
ctx.term()
222223
guard.shutdown_guard()

tests/v1/engine/test_engine_core_guard.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ def mock_worker_receiver(cmd_socket):
113113
elif instruction == "retry":
114114
busy_loop_active.set()
115115

116-
threading.Thread(target=mock_worker_receiver, args=(worker_cmd_socket,)).start()
116+
threading.Thread(
117+
target=mock_worker_receiver, args=(worker_cmd_socket,), daemon=True
118+
).start()
117119

118120
time.sleep(0.1)
119121
identity, _, msg = client_socket.recv_multipart()

vllm/v1/engine/core.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ def _stop_worker_execution(self, soft_pause: bool, timeout: int = 2) -> bool:
198198
pause_method = "pause_by_abort_communicators"
199199
self.communicator_aborted = True
200200

201-
success = self._execute_worker_method(pause_method, timeout=timeout)
201+
success = self._execute_worker_method(
202+
pause_method, timeout=timeout, worker_timeout=timeout
203+
)
202204
return success
203205

204206
def _execute_worker_method(self, method_name, timeout: int = 5, **kwargs) -> bool:
@@ -237,18 +239,19 @@ def _execute_cmd(self, cmd_str):
237239
try:
238240
success = run_method(self, method, args=(), kwargs=method_params)
239241
self.logger("Command (%s) succeeded: %s", method, success, level="info")
240-
242+
reason = None
241243
except Exception as e:
242244
self.logger(
243-
"Error executing method %s: %s %s",
245+
"Error executing method %s: %s, %s",
244246
method,
245247
type(e).__name__,
246248
e,
247249
level="error",
248250
)
249251
success = False
252+
reason = f"{type(e).__name__}: {e}"
250253

251-
self._send_execution_result(success, method_uuid)
254+
self._send_execution_result(success, method_uuid, reason)
252255

253256
def pause(self, timeout: int = 1, soft_pause: bool = True) -> bool:
254257
"""
@@ -296,6 +299,9 @@ def retry(self, new_stateless_dp_group_port: int, timeout: int = 1):
296299
This instruction tells the EngineCore to continue its busy loop
297300
after being suspended due to an exception.
298301
"""
302+
if self.engine_running:
303+
return True
304+
299305
start_time = time.monotonic()
300306

301307
success = self._execute_worker_method("restore_worker", timeout=timeout)
@@ -319,14 +325,19 @@ def retry(self, new_stateless_dp_group_port: int, timeout: int = 1):
319325
remaining_timeout = max(0, timeout - elapsed)
320326
success = self.busy_loop_active.wait(timeout=remaining_timeout)
321327
self.engine_running = success
328+
assert self.cmd_q.empty(), "cmd_q must be empty after execution"
322329
return success
323330

324-
def _send_execution_result(self, success: bool, method_uuid: str):
331+
def _send_execution_result(
332+
self, success: bool, method_uuid: str, reason: str | None
333+
):
325334
msg = {
326335
"engine_index": self.engine_index,
327336
"success": success,
328337
"method_uuid": method_uuid,
329338
}
339+
if not success and reason is not None:
340+
msg["reason"] = reason
330341
msg_bytes = json.dumps(msg).encode("utf-8")
331342
self.client_cmd_socket.send_multipart([b"", msg_bytes])
332343

@@ -936,7 +947,7 @@ def __init__(
936947
# Track whether the busy loop is currently active.
937948
self.busy_loop_active = threading.Event()
938949
self.fault_signal_q: queue.Queue[Exception] = queue.Queue()
939-
self.cmd_q: queue.Queue[str | None] = queue.Queue()
950+
self.cmd_q: queue.Queue[str | None] = queue.Queue(maxsize=1)
940951
self.engine_recovery_timeout = ft_config.engine_recovery_timeout
941952
engine_core_guard_ids = addresses.engine_core_guard_identities
942953
assert engine_core_guard_ids is not None

vllm/v1/engine/core_client.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,7 @@ def __init__(
351351
fault_receiver_addr: str,
352352
cmd_addr: str,
353353
engine_registry: list[bytes],
354-
engine_exception_q: asyncio.Queue[FaultInfo],
355-
engine_exception_q_lock: asyncio.Lock,
354+
engine_exception_q: queue.Queue[FaultInfo],
356355
fault_pub_addr: str,
357356
engine_status_dict: ThreadSafeDict[int, str],
358357
):
@@ -372,17 +371,14 @@ def __init__(
372371
ctx=self.zmq_ctx, path=fault_pub_addr, socket_type=zmq.PUB, bind=True
373372
)
374373

375-
self.engine_exception_q: asyncio.Queue[FaultInfo] = engine_exception_q
376-
377-
self.engine_exception_q_lock = engine_exception_q_lock
374+
self.engine_exception_q: queue.Queue[FaultInfo] = engine_exception_q
378375

379376
self.engine_status_dict: ThreadSafeDict[int, str] = engine_status_dict
380377

381378
self.fault_handler = FaultHandler(
382379
self.cmd_socket,
383380
self.engine_registry,
384381
self.engine_exception_q,
385-
self.engine_exception_q_lock,
386382
self.engine_status_dict,
387383
)
388384

@@ -449,7 +445,7 @@ def fault_receiver(self):
449445
)
450446

451447
# Pause healthy engines on fault.
452-
# Pause will be invoked again during fault-tolerance handling,
448+
# Pause can be invoked again during fault-tolerance handling,
453449
# so it's unnecessary to track whether all engines are currently
454450
# paused.
455451
self.fault_handler.submit_fault("pause", 5, soft_pause=False)
@@ -684,7 +680,7 @@ def __init__(
684680
self.start_engine_core_monitor()
685681

686682
if vllm_config.fault_tolerance_config.enable_fault_tolerance:
687-
self.engine_exception_q: asyncio.Queue[FaultInfo] = asyncio.Queue()
683+
self.engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
688684
assert addresses.fault_report_addr is not None, (
689685
"addresses.fault_report_addr should not be None at fault tolerance"
690686
" scenario"
@@ -694,7 +690,6 @@ def __init__(
694690
" scenario"
695691
)
696692
self.engine_registry = addresses.engine_core_guard_identities
697-
self.engine_exception_q_lock = asyncio.Lock()
698693
assert self.engine_registry is not None
699694
assert addresses.fault_pub_socket_addr is not None, (
700695
"addresses.fault_pub_socket_addr should not be None at"
@@ -708,7 +703,6 @@ def __init__(
708703
addresses.client_cmd_addr,
709704
self.engine_registry,
710705
self.engine_exception_q,
711-
self.engine_exception_q_lock,
712706
addresses.fault_pub_socket_addr,
713707
self.engine_status_dict,
714708
)
@@ -758,7 +752,7 @@ def monitor_actors():
758752
if not all_actors:
759753
return
760754
while True:
761-
for actor in all_actors:
755+
for actor in all_actors[:]:
762756
actor_id = actor._actor_id.hex()
763757
if actor in engine_manager.local_engine_actors:
764758
actor_index = engine_manager.local_engine_actors.index(actor)

vllm/v1/engine/utils.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import multiprocessing
77
import os
8+
import queue
89
import time
910
import uuid
1011
import weakref
@@ -1233,20 +1234,6 @@ def generate_identity_group(peer1, peer2, use, n):
12331234
return identitys
12341235

12351236

1236-
async def get_queue_snapshot(queue: asyncio.Queue, queue_lock: asyncio.Lock) -> list:
1237-
"""Thread-safe snapshot of the exception queue."""
1238-
async with queue_lock:
1239-
items = []
1240-
# get item at first
1241-
while not queue.empty():
1242-
item = queue.get_nowait()
1243-
items.append(item)
1244-
# put item into queue again
1245-
for item in items:
1246-
queue.put_nowait(item)
1247-
return items
1248-
1249-
12501237
def broadcast_instruction(
12511238
cmd_socket,
12521239
target_identities: set[bytes] | list[bytes],
@@ -1355,13 +1342,11 @@ def __init__(
13551342
self,
13561343
cmd_socket: zmq.Socket,
13571344
client_cmd_registry: list[bytes],
1358-
engine_exception_q: asyncio.Queue[FaultInfo],
1359-
engine_exception_q_lock: asyncio.Lock,
1345+
engine_exception_q: queue.Queue[FaultInfo],
13601346
engine_status_dict: ThreadSafeDict[int, str],
13611347
) -> None:
13621348
self.cmd_socket = cmd_socket
13631349
self.engine_exception_q = engine_exception_q
1364-
self.engine_exception_q_lock = engine_exception_q_lock
13651350
self.engine_status_dict: ThreadSafeDict[int, str] = engine_status_dict
13661351
self.engine_identity_to_index: dict[bytes, int] = {
13671352
identity: i for i, identity in enumerate(client_cmd_registry)
@@ -1403,9 +1388,8 @@ async def _dispatcher(self):
14031388
def retry(self, **kwargs):
14041389
if "Dead" in self.engine_status_dict.values():
14051390
self.logger(
1406-
"engine_core dead unexpectedly, retry is impossible,"
1407-
"shutdown will be performed",
1408-
level="info",
1391+
"Engine core is dead; retry won't work.",
1392+
level="warning",
14091393
)
14101394
return False, set(), kwargs
14111395

@@ -1481,7 +1465,12 @@ async def _handle_fault_internal(
14811465
if instruction == "retry" and all_success:
14821466
for engine_index, _ in self.engine_status_dict.items():
14831467
self.engine_status_dict[engine_index] = "Healthy"
1484-
# todo: should we also clear the engine_exception_q here?
1468+
while not self.engine_exception_q.empty():
1469+
try:
1470+
self.engine_exception_q.get_nowait()
1471+
except queue.Empty:
1472+
break
1473+
14851474
return all_success
14861475

14871476
async def handle_fault(self, instruction: str, timeout: int, **kwargs) -> bool:

0 commit comments

Comments
 (0)