Skip to content

Commit 50d5b6e

Browse files
committed
Refactor fault tolerance modules by renaming classes to Sentinel and converting engine_registry to a dict
Signed-off-by: fangyuchu <fangyuchu@qq.com>
1 parent 05c6828 commit 50d5b6e

File tree

7 files changed

+135
-125
lines changed

7 files changed

+135
-125
lines changed

tests/v1/engine/test_client_guard.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import zmq
1212

1313
from vllm.utils.collection_utils import ThreadSafeDict
14-
from vllm.v1.engine.core_client import ClientGuard
14+
from vllm.v1.engine.core_client import ClientSentinel
1515
from vllm.v1.engine.utils import FaultHandler, FaultInfo
1616

1717
FAULT_RECEIVER_ADDR = "tcp://127.0.0.1:8844"
@@ -31,59 +31,59 @@ def create_test_thread_safe_dict(initial_data=None):
3131
return tsd
3232

3333

34-
def create_client_guard(
34+
def create_client_sentinel(
3535
engine_exception_q: queue.Queue, engine_status_dict: ThreadSafeDict[int, str]
3636
):
37-
return ClientGuard(
37+
return ClientSentinel(
3838
fault_receiver_addr=FAULT_RECEIVER_ADDR,
3939
cmd_addr=CMD_ADDR,
40-
engine_registry=[b"engine_identity"],
40+
engine_registry={0: b"engine_identity"},
4141
engine_exception_q=engine_exception_q,
4242
fault_pub_addr=FAULT_PUB_ADDR,
4343
engine_status_dict=engine_status_dict,
4444
)
4545

4646

47-
def test_client_guard_initialization():
47+
def test_client_sentinel_initialization():
4848
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
4949
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
50-
guard = create_client_guard(engine_exception_q, engine_status_dict)
50+
sentinel = create_client_sentinel(engine_exception_q, engine_status_dict)
5151

52-
assert guard.engine_registry == [b"engine_identity"]
53-
assert not guard.client_guard_dead
54-
assert isinstance(guard.fault_handler, FaultHandler)
55-
assert guard.engine_exception_q is engine_exception_q
52+
assert sentinel.engine_registry[0] == b"engine_identity"
53+
assert not sentinel.client_sentinel_dead
54+
assert isinstance(sentinel.fault_handler, FaultHandler)
55+
assert sentinel.engine_exception_q is engine_exception_q
5656

57-
assert guard.fault_receiver_socket.type == zmq.ROUTER
58-
assert guard.cmd_socket.type == zmq.ROUTER
59-
assert guard.fault_pub_socket.type == zmq.PUB
57+
assert sentinel.fault_receiver_socket.type == zmq.ROUTER
58+
assert sentinel.cmd_socket.type == zmq.ROUTER
59+
assert sentinel.fault_pub_socket.type == zmq.PUB
6060

61-
guard.shutdown_guard()
61+
sentinel.shutdown_sentinel()
6262

6363

6464
@pytest.mark.asyncio
6565
async def test_handle_fault():
6666
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
6767
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
68-
guard = create_client_guard(engine_exception_q, engine_status_dict)
68+
sentinel = create_client_sentinel(engine_exception_q, engine_status_dict)
6969

7070
engine_exception_q.put_nowait(
7171
FaultInfo(engine_id="1", message="test exception", type="test")
7272
)
7373

74-
guard.fault_handler.handle_fault = AsyncMock(return_value=True)
74+
sentinel.fault_handler.handle_fault = AsyncMock(return_value=True)
7575

76-
result = await guard.handle_fault("pause", 5)
76+
result = await sentinel.handle_fault("pause", 5)
7777
assert result is True
78-
guard.fault_handler.handle_fault.assert_awaited_once_with("pause", 5)
78+
sentinel.fault_handler.handle_fault.assert_awaited_once_with("pause", 5)
7979

80-
guard.shutdown_guard()
80+
sentinel.shutdown_sentinel()
8181

8282

8383
def test_fault_receiver():
8484
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
8585
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
86-
guard = create_client_guard(engine_exception_q, engine_status_dict)
86+
sentinel = create_client_sentinel(engine_exception_q, engine_status_dict)
8787

8888
def send_test_message():
8989
ctx = zmq.Context()
@@ -125,13 +125,13 @@ def check_published_message():
125125

126126
assert engine_status_dict[1] == "Dead"
127127

128-
guard.shutdown_guard()
128+
sentinel.shutdown_sentinel()
129129

130130

131131
def test_fault_receiver_unhealthy():
132132
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
133133
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
134-
guard = create_client_guard(engine_exception_q, engine_status_dict)
134+
sentinel = create_client_sentinel(engine_exception_q, engine_status_dict)
135135

136136
def send_unhealthy_message():
137137
ctx = zmq.Context()
@@ -149,22 +149,22 @@ def send_unhealthy_message():
149149

150150
assert engine_status_dict[1] == "Unhealthy"
151151

152-
guard.shutdown_guard()
152+
sentinel.shutdown_sentinel()
153153

154154

155-
def test_shutdown_guard():
155+
def test_shutdown_sentinel():
156156
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
157157
engine_status_dict = create_test_thread_safe_dict({1: "Healthy"})
158-
guard = create_client_guard(engine_exception_q, engine_status_dict)
158+
sentinel = create_client_sentinel(engine_exception_q, engine_status_dict)
159159

160-
original_fault_sock = guard.fault_receiver_socket
161-
original_cmd_sock = guard.cmd_socket
162-
original_pub_sock = guard.fault_pub_socket
163-
original_ctx = guard.zmq_ctx
160+
original_fault_sock = sentinel.fault_receiver_socket
161+
original_cmd_sock = sentinel.cmd_socket
162+
original_pub_sock = sentinel.fault_pub_socket
163+
original_ctx = sentinel.zmq_ctx
164164

165-
guard.shutdown_guard()
165+
sentinel.shutdown_sentinel()
166166

167-
assert guard.client_guard_dead is True
167+
assert sentinel.client_sentinel_dead is True
168168

169169
with pytest.raises(zmq.ZMQError):
170170
original_fault_sock.recv()
@@ -182,7 +182,7 @@ def test_shutdown_guard():
182182
async def test_handle_fault_async():
183183
engine_exception_q: queue.Queue[FaultInfo] = queue.Queue()
184184
engine_status_dict = create_test_thread_safe_dict({0: "Unhealthy"})
185-
guard = create_client_guard(engine_exception_q, engine_status_dict)
185+
sentinel = create_client_sentinel(engine_exception_q, engine_status_dict)
186186

187187
time.sleep(0.1)
188188
ctx = zmq.Context().instance()
@@ -213,11 +213,11 @@ def response_cmd(cmd_socket):
213213
threading.Thread(target=receive_cmd, args=(cmd_socket,), daemon=True).start()
214214
threading.Thread(target=response_cmd, args=(cmd_socket,), daemon=True).start()
215215

216-
result = await guard.handle_fault("retry", 3)
216+
result = await sentinel.handle_fault("retry", 3)
217217

218218
assert result is True
219219
assert engine_status_dict[0] == "Healthy"
220220

221221
cmd_socket.close()
222222
ctx.term()
223-
guard.shutdown_guard()
223+
sentinel.shutdown_sentinel()

tests/v1/engine/test_engine_core_guard.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,21 @@
1212

1313
from vllm.utils.network_utils import make_zmq_socket
1414
from vllm.v1.engine.core import (
15-
EngineCoreGuard,
15+
EngineCoreSentinel,
1616
EngineLoopPausedError,
1717
)
1818
from vllm.v1.serial_utils import serialize_method_call
1919

2020
CLIENT_CMD_ADDR = "tcp://127.0.0.1:8844"
2121
WORKER_CMD_ADDR = "tcp://127.0.0.1:8845"
2222
FAULT_REPORT_ADDR = "tcp://127.0.0.1:8846"
23-
GUARD_IDENTITY = b"engine_guard_0"
23+
SENTINEL_IDENTITY = b"engine_sentinel_0"
2424

2525

26-
def create_engine_core_guard(
26+
def create_engine_core_sentinel(
2727
fault_signal_q: queue.Queue, busy_loop_active: threading.Event
2828
):
29-
return EngineCoreGuard(
29+
return EngineCoreSentinel(
3030
engine_index=0,
3131
fault_signal_q=fault_signal_q,
3232
cmd_q=queue.Queue(),
@@ -35,31 +35,31 @@ def create_engine_core_guard(
3535
client_cmd_addr=CLIENT_CMD_ADDR,
3636
worker_cmd_addr=WORKER_CMD_ADDR,
3737
fault_report_addr=FAULT_REPORT_ADDR,
38-
guard_identity=GUARD_IDENTITY,
38+
sentinel_identity=SENTINEL_IDENTITY,
3939
tp_size=1,
4040
pp_size=1,
4141
dp_size=1,
4242
)
4343

4444

45-
def test_engine_core_guard_initialization():
45+
def test_engine_core_sentinel_initialization():
4646
fault_signal_q: queue.Queue = queue.Queue()
4747
busy_loop_active = threading.Event()
4848

49-
guard = create_engine_core_guard(fault_signal_q, busy_loop_active)
49+
sentinel = create_engine_core_sentinel(fault_signal_q, busy_loop_active)
5050

51-
assert guard.engine_index == 0
52-
assert guard.tp_size == 1
53-
assert guard.pp_size == 1
54-
assert not guard.communicator_aborted
55-
assert guard.engine_running is True
56-
assert guard.daemon is True
51+
assert sentinel.engine_index == 0
52+
assert sentinel.tp_size == 1
53+
assert sentinel.pp_size == 1
54+
assert not sentinel.communicator_aborted
55+
assert sentinel.engine_running is True
56+
assert sentinel.daemon is True
5757

58-
assert guard.fault_report_socket.type == zmq.DEALER
59-
assert guard.client_cmd_socket.type == zmq.DEALER
60-
assert guard.worker_cmd_socket.type == zmq.ROUTER
58+
assert sentinel.fault_report_socket.type == zmq.DEALER
59+
assert sentinel.client_cmd_socket.type == zmq.DEALER
60+
assert sentinel.worker_cmd_socket.type == zmq.ROUTER
6161

62-
guard.shutdown()
62+
sentinel.shutdown()
6363

6464

6565
@pytest.mark.parametrize("instruction", ["pause", "retry"])
@@ -73,7 +73,7 @@ def test_run_handle_instruction(instruction):
7373

7474
time.sleep(0.1)
7575

76-
guard = create_engine_core_guard(fault_signal_q, busy_loop_active)
76+
sentinel = create_engine_core_sentinel(fault_signal_q, busy_loop_active)
7777
time.sleep(0.1)
7878

7979
ctx = zmq.Context()
@@ -96,7 +96,7 @@ def mock_worker_receiver(cmd_socket):
9696
logging.info(identity)
9797
cmd_socket.send_multipart([b"", json.dumps(response_dict).encode("utf-8")])
9898

99-
threading.Thread(target=guard.run, daemon=True).start()
99+
threading.Thread(target=sentinel.run, daemon=True).start()
100100
time.sleep(0.1)
101101

102102
param = {"timeout": 3}
@@ -106,7 +106,7 @@ def mock_worker_receiver(cmd_socket):
106106
param["new_stateless_dp_group_port"] = 23456
107107
serial_instruction = serialize_method_call(instruction, **param)
108108
client_socket.send_multipart(
109-
[GUARD_IDENTITY, b"", serial_instruction.encode("utf-8")]
109+
[SENTINEL_IDENTITY, b"", serial_instruction.encode("utf-8")]
110110
)
111111
if instruction == "pause":
112112
fault_signal_q.put(EngineLoopPausedError(Exception("test error")))
@@ -127,4 +127,4 @@ def mock_worker_receiver(cmd_socket):
127127

128128
client_socket.close()
129129
worker_cmd_socket.close()
130-
guard.shutdown()
130+
sentinel.shutdown()

vllm/config/fault_tolerance.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ class FaultToleranceConfig:
3939

4040
engine_core_cmd_addr: str = ""
4141
"""
42-
The ZMQ address between engine_core_guard and worker_guard.
42+
The ZMQ address between engine_core_sentinel and worker_sentinel.
4343
It will be initialized and assigned in EngineCore, then passed
4444
to the Worker via vllm_config—this is required for the Worker
45-
to spin up the WorkerGuard.
45+
to spin up the WorkerSentinel.
4646
"""
4747

4848
gloo_comm_timeout: int = 30

0 commit comments

Comments
 (0)