Skip to content

Commit d4a7035

Browse files
authored
优化并去掉二次寻址 (#34)
* feat: c++ 对齐其他语言SDK 1. 限流接口支持传入method 2. 去掉grpc以及动态权重调整的代码 3. 支持非二次寻址 4. 支持环境变量配置 * feat:修复用例失败问题 * feat: 默认地址导致用例失败问题 * feat: 优化并去掉二次寻址
1 parent 9f1c193 commit d4a7035

File tree

5 files changed

+53
-16
lines changed

5 files changed

+53
-16
lines changed

polaris/context/context.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ Context* Context::Create(Config* config, ContextMode mode) {
4545
if (context_impl->Init(config, context.get(), mode) != kReturnOk) {
4646
return nullptr;
4747
}
48+
const PolarisCluster& discover_cluster = context_impl->GetDiscoverService();
4849
// Polaris discover先请求一下
49-
if (context_impl->InitSystemService(context_impl->GetDiscoverService()) != kReturnOk) {
50+
if (!discover_cluster.service_.name_.empty() && context_impl->InitSystemService(discover_cluster) != kReturnOk) {
5051
return nullptr;
5152
}
5253
// 如果有设置Metric Cluster则提前获取

polaris/context/context_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ ReturnCode ContextImpl::InitSystemConfig(Config* system_config) {
230230

231231
ReturnCode ContextImpl::InitSystemService(const PolarisCluster& cluster) {
232232
ServiceContext* service_context = this->GetServiceContext(cluster.service_);
233+
POLARIS_LOG(LOG_ERROR, "init system service for service[%s/%s] failed", cluster.service_.namespace_.c_str(),
234+
cluster.service_.name_.c_str());
233235
if (service_context == nullptr) {
234236
POLARIS_LOG(LOG_ERROR, "create service context for service[%s/%s] failed", cluster.service_.namespace_.c_str(),
235237
cluster.service_.name_.c_str());

polaris/monitor/monitor_reporter.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,9 @@ bool MonitorReporter::GetInstance(ReportBase* report_data) {
831831
POLARIS_ASSERT(report_data != nullptr);
832832
POLARIS_ASSERT(report_data->instance_ == nullptr);
833833
const PolarisCluster& monitor_cluster = context_->GetContextImpl()->GetMonitorService();
834+
if (monitor_cluster.service_.name_.empty()) {
835+
return false;
836+
}
834837
Criteria criteria;
835838
criteria.ignore_half_open_ = (rand() % 10 != 0); // 10分之一的概率选择半开节点
836839
ReturnCode ret_code = ConsumerApiImpl::GetSystemServer(context_, monitor_cluster.service_, criteria,

polaris/plugin/server_connector/grpc_server_connector.cpp

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,8 @@ bool GrpcServerConnector::SendDiscoverRequest(ServiceListener& service_listener)
335335
}
336336
if (discover_stream_state_ != kDiscoverStreamInit) {
337337
const ServiceKey& discover_service = context_->GetContextImpl()->GetDiscoverService().service_;
338-
if (isEmpty(discover_service)) {
338+
if (discover_service.name_.empty()) {
339+
POLARIS_LOG(LOG_INFO, "discover service is empty, state transive to DiscoverStreamInit");
339340
discover_stream_state_ = kDiscoverStreamInit;
340341
} else {
341342
if (service_key.name_ != discover_service.name_ || service_key.namespace_ != discover_service.namespace_) {
@@ -572,6 +573,10 @@ ReturnCode GrpcServerConnector::SelectInstance(const ServiceKey& service_key, ui
572573
return ConsumerApiImpl::GetSystemServer(context_, service_key, criteria, *instance, timeout);
573574
}
574575

576+
SeedServer& GrpcServerConnector::SelectSeed() {
577+
return server_lists_[rand() % server_lists_.size()];
578+
}
579+
575580
void GrpcServerConnector::ServerSwitch() {
576581
if (server_switch_state_ == kServerSwitchNormal // 服务调用出错或超时触发切换
577582
|| server_switch_state_ == kServerSwitchBegin) { // 切换后异步连接回调触发重新
@@ -590,12 +595,12 @@ void GrpcServerConnector::ServerSwitch() {
590595
// 选择一个服务器
591596
std::string host;
592597
int port = 0;
593-
if (discover_stream_state_ >= kDiscoverStreamGetInstance) { // 说明内部服务已经返回
598+
const ServiceKey& discover_service = context_->GetContextImpl()->GetDiscoverService().service_;
599+
if (!discover_service.name_.empty() && discover_stream_state_ >= kDiscoverStreamGetInstance) { // 说明内部服务已经返回
594600
if (discover_instance_ != nullptr) {
595601
delete discover_instance_;
596602
discover_instance_ = nullptr;
597603
}
598-
const ServiceKey& discover_service = context_->GetContextImpl()->GetDiscoverService().service_;
599604
bool ignore_half_open = server_switch_state_ != kServerSwitchPeriodic; // 周期切换才选半开节点
600605
ReturnCode ret_code = SelectInstance(discover_service, 0, &discover_instance_, ignore_half_open);
601606
if (ret_code == kReturnOk) {
@@ -612,7 +617,7 @@ void GrpcServerConnector::ServerSwitch() {
612617
}
613618
if (host.empty()) {
614619
discover_stream_state_ = kDiscoverStreamNotInit;
615-
SeedServer& server = server_lists_[rand() % server_lists_.size()];
620+
SeedServer& server = SelectSeed();
616621
host = server.ip_;
617622
port = server.port_;
618623
POLARIS_LOG(LOG_INFO, "discover stream switch to seed server[%s:%d]", host.c_str(), port);
@@ -837,11 +842,19 @@ bool GrpcServerConnector::GetInstance(BlockRequest* block_request) {
837842
POLARIS_ASSERT(block_request != nullptr);
838843
POLARIS_ASSERT(block_request->instance_ == nullptr);
839844
const ServiceKey& service = GetPolarisService(context_, block_request->request_type_);
845+
if (service.name_.empty()) {
846+
SeedServer& seedServer = SelectSeed();
847+
block_request->host_ = seedServer.ip_;
848+
block_request->port_ = seedServer.port_;
849+
return true;
850+
}
840851
ReturnCode ret_code = SelectInstance(service, block_request->request_timeout_, &block_request->instance_);
841852
if (ret_code == kReturnOk) {
842853
POLARIS_ASSERT(block_request->instance_ != nullptr);
843854
POLARIS_LOG(LOG_DEBUG, "get server:%s:%d for %s", block_request->instance_->GetHost().c_str(),
844855
block_request->instance_->GetPort(), PolarisRequestTypeStr(block_request->request_type_));
856+
block_request->host_ = block_request->instance_->GetHost();
857+
block_request->port_ = block_request->instance_->GetPort();
845858
return true;
846859
} else {
847860
POLARIS_ASSERT(block_request->instance_ == nullptr);
@@ -852,7 +865,9 @@ bool GrpcServerConnector::GetInstance(BlockRequest* block_request) {
852865
}
853866

854867
void GrpcServerConnector::UpdateCallResult(BlockRequest* block_request) {
855-
POLARIS_ASSERT(block_request->instance_ != nullptr);
868+
if(block_request->instance_ == nullptr) {
869+
return;
870+
}
856871
const ServiceKey& service = GetPolarisService(context_, block_request->request_type_);
857872
CallRetStatus status = kCallRetOk;
858873
if (kServerCodeConnectError <= block_request->server_code_ &&
@@ -876,7 +891,9 @@ BlockRequest::BlockRequest(PolarisRequestType request_type, GrpcServerConnector&
876891
message_(nullptr),
877892
promise_(nullptr),
878893
instance_(nullptr),
879-
grpc_client_(nullptr) {}
894+
host_(""),
895+
port_(0),
896+
grpc_client_(nullptr) {}
880897

881898
BlockRequest::~BlockRequest() {
882899
if (instance_ != nullptr) {
@@ -934,18 +951,18 @@ bool BlockRequest::PrepareClient() {
934951

935952
// 建立grpc客户端,并尝试连接
936953
grpc_client_ = new grpc::GrpcClient(connector_.GetReactor());
937-
if (!grpc_client_->ConnectTo(instance_->GetHost(), instance_->GetPort()) ||
954+
if (!grpc_client_->ConnectTo(host_, port_) ||
938955
!grpc_client_->WaitConnected(request_timeout_)) {
939956
POLARIS_LOG(LOG_ERROR, "%s connect to server[%s:%d] timeout", PolarisRequestTypeStr(request_type_),
940-
instance_->GetHost().c_str(), instance_->GetPort());
957+
host_.c_str(), port_);
941958
server_code_ = kServerCodeConnectError;
942959
connector_.UpdateCallResult(this);
943960
return false;
944961
}
945962
uint64_t use_time = Time::GetCoarseSteadyTimeMs() - begin_time;
946963
if (use_time >= request_timeout_) {
947964
POLARIS_LOG(LOG_ERROR, "%s connect to server[%s:%d] timeout", PolarisRequestTypeStr(request_type_),
948-
instance_->GetHost().c_str(), instance_->GetPort());
965+
host_.c_str(), port_);
949966
server_code_ = kServerCodeConnectError;
950967
connector_.UpdateCallResult(this);
951968
return false;
@@ -1020,6 +1037,8 @@ AsyncRequest::AsyncRequest(Reactor& reactor, GrpcServerConnector* connector, Pol
10201037
timeout_(timeout),
10211038
callback_(callback),
10221039
server_(nullptr),
1040+
host_(""),
1041+
port_(0),
10231042
client_(nullptr),
10241043
timing_task_(connector->GetReactor().TimingTaskEnd()) {}
10251044

@@ -1046,16 +1065,23 @@ bool AsyncRequest::Submit() {
10461065
}
10471066

10481067
const ServiceKey& service = GetPolarisService(connector_->context_, request_type_);
1049-
ReturnCode ret_code = connector_->SelectInstance(service, 0, &server_);
1050-
if (ret_code != kReturnOk) {
1051-
callback_(ret_code, "select server failed", nullptr);
1052-
return false;
1068+
if (service.name_.empty()) {
1069+
SeedServer& seedServer = connector_->SelectSeed();
1070+
host_ = seedServer.ip_;
1071+
port_ = seedServer.port_;
1072+
} else {
1073+
ReturnCode ret_code = connector_->SelectInstance(service, 0, &server_);
1074+
if (ret_code != kReturnOk) {
1075+
callback_(ret_code, "select server failed", nullptr);
1076+
return false;
1077+
}
1078+
host_ = server_->GetHost();
1079+
port_ = server_->GetPort();
10531080
}
1054-
10551081
connector_->async_request_map_[request_id_] = this; // 记录请求
10561082
// 尝试建立连接
10571083
client_ = new grpc::GrpcClient(reactor_);
1058-
client_->Connect(server_->GetHost(), server_->GetPort(), GetTimeLeft(),
1084+
client_->Connect(host_, port_, GetTimeLeft(),
10591085
std::bind(&AsyncRequest::OnConnect, this, std::placeholders::_1));
10601086
return true;
10611087
}

polaris/plugin/server_connector/grpc_server_connector.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class GrpcServerConnector : public ServerConnector, public grpc::StreamCallback<
175175
virtual ReturnCode SelectInstance(const ServiceKey& service_key, uint32_t timeout, Instance** instance,
176176
bool ignore_half_open = false);
177177

178+
SeedServer& SelectSeed();
178179
private:
179180
friend class AsyncRequest;
180181
Context* context_;
@@ -235,6 +236,8 @@ class BlockRequest : public grpc::RequestCallback<v1::Response> {
235236

236237
protected: // protected for test
237238
Instance* instance_;
239+
std::string host_;
240+
int port_;
238241
grpc::GrpcClient* grpc_client_;
239242
};
240243

@@ -299,6 +302,8 @@ class AsyncRequest : public grpc::RequestCallback<v1::Response> {
299302
uint64_t timeout_;
300303
PolarisCallback callback_;
301304
Instance* server_; // 选择连接的服务器
305+
std::string host_;
306+
int port_;
302307
grpc::GrpcClient* client_;
303308
TimingTaskIter timing_task_;
304309
};

0 commit comments

Comments
 (0)