Skip to content

Commit 2b83a76

Browse files
committed
refactored client and server initialization methods
1 parent ff9525a commit 2b83a76

File tree

4 files changed

+27
-2
lines changed

4 files changed

+27
-2
lines changed

mooncake-integration/transfer_engine/transfer_engine_py.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,11 @@ void bind_coro_rpc_interface(py::module_ &m) {
675675
.def("initialize", &CoroRPCInterface::initialize,
676676
"listen_address"_a = "", "thread_count"_a = 0,
677677
"timeout_seconds"_a = 30, "pool_size"_a = 10)
678+
.def("initialize_client", &CoroRPCInterface::initializeClient,
679+
"pool_size"_a = 10, "timeout_seconds"_a = 30)
680+
.def("initialize_server", &CoroRPCInterface::initializeServer,
681+
"listen_address"_a, "thread_count"_a = 8,
682+
"timeout_seconds"_a = 30, "pool_size"_a = 4)
678683
.def("start_server", &CoroRPCInterface::startServer)
679684
.def("start_server_async", &CoroRPCInterface::startServerAsync)
680685
.def("stop_server", &CoroRPCInterface::stopServer)

mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_interface.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ class CoroRPCInterface {
4747
size_t thread_count = 0, size_t timeout_seconds = 30,
4848
size_t pool_size = 10);
4949

50+
// Convenience methods for common use cases
51+
bool initializeClient(size_t pool_size = 10, size_t timeout_seconds = 30);
52+
bool initializeServer(const std::string& listen_address,
53+
size_t thread_count = 8, size_t timeout_seconds = 30,
54+
size_t pool_size = 4);
55+
5056
bool startServer();
5157
bool startServerAsync();
5258
bool startServerImpl(bool is_async = true);

mooncake-transfer-engine/src/transport/coro_rpc_connector/cororpc_interface.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ bool CoroRPCInterface::initialize(const std::string& local_address,
3838
return impl_->communicator->initialize(config);
3939
}
4040

41+
// Convenience method for client initialization
42+
bool CoroRPCInterface::initializeClient(size_t pool_size, size_t timeout_seconds) {
43+
return initialize("", 0, timeout_seconds, pool_size);
44+
}
45+
46+
// Convenience method for server initialization
47+
bool CoroRPCInterface::initializeServer(const std::string& listen_address,
48+
size_t thread_count, size_t timeout_seconds,
49+
size_t pool_size) {
50+
return initialize(listen_address, thread_count, timeout_seconds, pool_size);
51+
}
52+
4153
bool CoroRPCInterface::startServer() {
4254
if (!impl_->communicator) return false;
4355
return impl_->communicator->startServer();

mooncake-transfer-engine/tests/communicator_bandwidth_test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def run_server(bind_url, data_size_mb=1):
5959

6060
CoroRPCInterface = engine.CoroRPCInterface
6161
server = CoroRPCInterface()
62-
server.initialize(bind_url, 8, 30, 4)
62+
# Server使用专门的初始化方法
63+
server.initialize_server(bind_url, thread_count=8)
6364
server.start_server_async() #start the server asynchronously
6465

6566
# Start QPS statistics thread
@@ -85,7 +86,8 @@ def run_client(target_url, num_threads=8, data_size_mb=1):
8586

8687
CoroRPCInterface = engine.CoroRPCInterface
8788
client = CoroRPCInterface()
88-
client.initialize("", 0, 30, 100)
89+
# Client使用专门的初始化方法,只需要提供pool_size
90+
client.initialize_client(pool_size=100)
8991

9092
# Start QPS statistics thread
9193
qps_thread = threading.Thread(target=print_qps)

0 commit comments

Comments
 (0)