Skip to content

Commit cf3469a

Browse files
committed
RSCBC-136: Rework kv client stack
1 parent a894767 commit cf3469a

25 files changed

+1791
-1636
lines changed

sdk/couchbase-core/src/agent.rs

Lines changed: 105 additions & 244 deletions
Large diffs are not rendered by default.

sdk/couchbase-core/src/collection_resolver_memd.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818

1919
use crate::collectionresolver::CollectionResolver;
2020
use crate::error::{Error, Result};
21+
use crate::kv_orchestration::{orchestrate_kv_client, KvClientManagerClientType};
2122
use crate::kvclient_ops::KvClientOps;
22-
use crate::kvclientmanager::{
23-
orchestrate_random_memd_client, KvClientManager, KvClientManagerClientType,
24-
};
23+
use crate::kvendpointclientmanager::KvEndpointClientManager;
2524
use crate::memdx::request::GetCollectionIdRequest;
2625
use crate::memdx::response::GetCollectionIdResponse;
2726
use futures::TryFutureExt;
@@ -37,7 +36,7 @@ pub(crate) struct CollectionResolverMemdOptions<K> {
3736

3837
impl<K> CollectionResolverMemd<K>
3938
where
40-
K: KvClientManager,
39+
K: KvEndpointClientManager,
4140
{
4241
pub fn new(opts: CollectionResolverMemdOptions<K>) -> Self {
4342
Self {
@@ -48,14 +47,14 @@ where
4847

4948
impl<K> CollectionResolver for CollectionResolverMemd<K>
5049
where
51-
K: KvClientManager,
50+
K: KvEndpointClientManager + Send + Sync,
5251
{
5352
async fn resolve_collection_id(
5453
&self,
5554
scope_name: &str,
5655
collection_name: &str,
5756
) -> Result<(u32, u64)> {
58-
let resp: GetCollectionIdResponse = orchestrate_random_memd_client(
57+
let resp: GetCollectionIdResponse = orchestrate_kv_client(
5958
self.conn_mgr.clone(),
6059
async |client: Arc<KvClientManagerClientType<K>>| {
6160
client
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
*
3+
* * Copyright (c) 2025 Couchbase, Inc.
4+
* *
5+
* * Licensed under the Apache License, Version 2.0 (the "License");
6+
* * you may not use this file except in compliance with the License.
7+
* * You may obtain a copy of the License at
8+
* *
9+
* * http://www.apache.org/licenses/LICENSE-2.0
10+
* *
11+
* * Unless required by applicable law or agreed to in writing, software
12+
* * distributed under the License is distributed on an "AS IS" BASIS,
13+
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* * See the License for the specific language governing permissions and
15+
* * limitations under the License.
16+
*
17+
*/
18+
use crate::address::Address;
19+
use crate::authenticator::Authenticator;
20+
use crate::configmanager::ConfigManagerMemdConfig;
21+
use crate::diagnosticscomponent::DiagnosticsComponentConfig;
22+
use crate::httpx::client::ClientConfig;
23+
use crate::kvclient_babysitter::KvTarget;
24+
use crate::mgmtcomponent::MgmtComponentConfig;
25+
use crate::parsedconfig::{ParsedConfig, ParsedConfigFeature};
26+
use crate::querycomponent::QueryComponentConfig;
27+
use crate::searchcomponent::SearchComponentConfig;
28+
use crate::service_type::ServiceType;
29+
use crate::tls_config::TlsConfig;
30+
use crate::vbucketrouter::VbucketRoutingInfo;
31+
use std::collections::HashMap;
32+
use std::time::Duration;
33+
34+
pub(crate) struct AgentComponentConfigs {
35+
pub kv_targets: HashMap<String, KvTarget>,
36+
pub auth: Authenticator,
37+
pub selected_bucket: Option<String>,
38+
39+
pub config_manager_memd_config: ConfigManagerMemdConfig,
40+
pub vbucket_routing_info: VbucketRoutingInfo,
41+
pub query_config: QueryComponentConfig,
42+
pub search_config: SearchComponentConfig,
43+
pub mgmt_config: MgmtComponentConfig,
44+
pub diagnostics_config: DiagnosticsComponentConfig,
45+
pub http_client_config: ClientConfig,
46+
}
47+
48+
pub(crate) struct HttpClientConfig {
49+
pub idle_connection_timeout: Duration,
50+
pub max_idle_connections_per_host: Option<usize>,
51+
pub tcp_keep_alive_time: Duration,
52+
}
53+
54+
impl AgentComponentConfigs {
55+
pub fn gen_from_config(
56+
config: &ParsedConfig,
57+
network_type: &str,
58+
tls_config: Option<TlsConfig>,
59+
bucket_name: Option<String>,
60+
authenticator: Authenticator,
61+
http_client_config: HttpClientConfig,
62+
) -> AgentComponentConfigs {
63+
let rev_id = config.rev_id;
64+
let network_info = config.addresses_group_for_network_type(network_type);
65+
66+
let mut gcccp_node_ids = Vec::new();
67+
let mut kv_data_node_ids = Vec::new();
68+
let mut kv_data_hosts: HashMap<String, Address> = HashMap::new();
69+
let mut mgmt_endpoints: HashMap<String, String> = HashMap::new();
70+
let mut query_endpoints: HashMap<String, String> = HashMap::new();
71+
let mut search_endpoints: HashMap<String, String> = HashMap::new();
72+
73+
for node in network_info.nodes {
74+
let kv_ep_id = format!("kv{}", node.node_id);
75+
let mgmt_ep_id = format!("mgmt{}", node.node_id);
76+
let query_ep_id = format!("query{}", node.node_id);
77+
let search_ep_id = format!("search{}", node.node_id);
78+
79+
gcccp_node_ids.push(kv_ep_id.clone());
80+
81+
if node.has_data {
82+
kv_data_node_ids.push(kv_ep_id.clone());
83+
}
84+
85+
if tls_config.is_some() {
86+
if let Some(p) = node.ssl_ports.kv {
87+
kv_data_hosts.insert(
88+
kv_ep_id,
89+
Address {
90+
host: node.hostname.clone(),
91+
port: p,
92+
},
93+
);
94+
}
95+
if let Some(p) = node.ssl_ports.mgmt {
96+
mgmt_endpoints.insert(mgmt_ep_id, format!("https://{}:{}", node.hostname, p));
97+
}
98+
if let Some(p) = node.ssl_ports.query {
99+
query_endpoints.insert(query_ep_id, format!("https://{}:{}", node.hostname, p));
100+
}
101+
if let Some(p) = node.ssl_ports.search {
102+
search_endpoints
103+
.insert(search_ep_id, format!("https://{}:{}", node.hostname, p));
104+
}
105+
} else {
106+
if let Some(p) = node.non_ssl_ports.kv {
107+
kv_data_hosts.insert(
108+
kv_ep_id,
109+
Address {
110+
host: node.hostname.clone(),
111+
port: p,
112+
},
113+
);
114+
}
115+
if let Some(p) = node.non_ssl_ports.mgmt {
116+
mgmt_endpoints.insert(mgmt_ep_id, format!("http://{}:{}", node.hostname, p));
117+
}
118+
if let Some(p) = node.non_ssl_ports.query {
119+
query_endpoints.insert(query_ep_id, format!("http://{}:{}", node.hostname, p));
120+
}
121+
if let Some(p) = node.non_ssl_ports.search {
122+
search_endpoints
123+
.insert(search_ep_id, format!("http://{}:{}", node.hostname, p));
124+
}
125+
}
126+
}
127+
128+
let mut kv_targets = HashMap::new();
129+
for (node_id, address) in kv_data_hosts {
130+
let target = KvTarget {
131+
address,
132+
tls_config: tls_config.clone(),
133+
};
134+
135+
kv_targets.insert(node_id, target);
136+
}
137+
138+
let vbucket_routing_info = if let Some(info) = &config.bucket {
139+
VbucketRoutingInfo {
140+
vbucket_info: info.vbucket_map.clone(),
141+
server_list: kv_data_node_ids,
142+
bucket_selected: true,
143+
}
144+
} else {
145+
VbucketRoutingInfo {
146+
vbucket_info: None,
147+
server_list: kv_data_node_ids,
148+
bucket_selected: false,
149+
}
150+
};
151+
152+
let mut available_services = vec![ServiceType::MEMD];
153+
if !query_endpoints.is_empty() {
154+
available_services.push(ServiceType::QUERY)
155+
}
156+
if !search_endpoints.is_empty() {
157+
available_services.push(ServiceType::SEARCH)
158+
}
159+
160+
AgentComponentConfigs {
161+
kv_targets,
162+
auth: authenticator.clone(),
163+
selected_bucket: bucket_name.clone(),
164+
config_manager_memd_config: ConfigManagerMemdConfig {
165+
endpoints: gcccp_node_ids,
166+
},
167+
168+
vbucket_routing_info,
169+
query_config: QueryComponentConfig {
170+
endpoints: query_endpoints,
171+
authenticator: authenticator.clone(),
172+
},
173+
search_config: SearchComponentConfig {
174+
endpoints: search_endpoints,
175+
authenticator: authenticator.clone(),
176+
vector_search_enabled: config
177+
.features
178+
.contains(&ParsedConfigFeature::FtsVectorSearch),
179+
},
180+
http_client_config: ClientConfig {
181+
tls_config,
182+
idle_connection_timeout: http_client_config.idle_connection_timeout,
183+
max_idle_connections_per_host: http_client_config.max_idle_connections_per_host,
184+
tcp_keep_alive_time: http_client_config.tcp_keep_alive_time,
185+
},
186+
mgmt_config: MgmtComponentConfig {
187+
endpoints: mgmt_endpoints,
188+
authenticator: authenticator.clone(),
189+
},
190+
diagnostics_config: DiagnosticsComponentConfig {
191+
bucket: bucket_name,
192+
services: available_services,
193+
rev_id,
194+
},
195+
}
196+
}
197+
}

sdk/couchbase-core/src/configfetcher.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,38 @@
1919
use crate::cbconfig;
2020
use crate::configparser::ConfigParser;
2121
use crate::configwatcher::ConfigWatcherMemd;
22-
use crate::error::Error;
22+
use crate::error::{Error, ErrorKind};
23+
use crate::kv_orchestration::KvClientManagerClientType;
2324
use crate::kvclient::{KvClient, StdKvClient};
2425
use crate::kvclient_ops::KvClientOps;
25-
use crate::kvclientmanager::{KvClientManager, KvClientManagerClientType};
2626
use crate::kvclientpool::KvClientPool;
27+
use crate::kvendpointclientmanager::KvEndpointClientManager;
2728
use crate::memdx::hello_feature::HelloFeature;
2829
use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest};
2930
use crate::parsedconfig::ParsedConfig;
3031
use log::{debug, trace};
3132
use std::env;
3233
use std::sync::Arc;
34+
use std::time::Duration;
35+
use tokio::time::{timeout, timeout_at};
3336

3437
#[derive(Clone)]
35-
pub(crate) struct ConfigFetcherMemd<M: KvClientManager> {
38+
pub(crate) struct ConfigFetcherMemd<M: KvEndpointClientManager> {
3639
kv_client_manager: Arc<M>,
40+
fetch_timeout: Duration,
3741
}
3842

39-
impl<M: KvClientManager> ConfigFetcherMemd<M> {
40-
pub fn new(kv_client_manager: Arc<M>) -> Self {
41-
Self { kv_client_manager }
43+
pub(crate) struct ConfigFetcherMemdOptions<M: KvEndpointClientManager> {
44+
pub kv_client_manager: Arc<M>,
45+
pub fetch_timeout: Duration,
46+
}
47+
48+
impl<M: KvEndpointClientManager> ConfigFetcherMemd<M> {
49+
pub fn new(opts: ConfigFetcherMemdOptions<M>) -> Self {
50+
Self {
51+
kv_client_manager: opts.kv_client_manager.clone(),
52+
fetch_timeout: opts.fetch_timeout,
53+
}
4254
}
4355
pub(crate) async fn poll_one(
4456
&self,
@@ -47,7 +59,7 @@ impl<M: KvClientManager> ConfigFetcherMemd<M> {
4759
rev_epoch: i64,
4860
skip_fetch_cb: impl FnOnce(Arc<KvClientManagerClientType<M>>) -> bool,
4961
) -> crate::error::Result<Option<ParsedConfig>> {
50-
let client = self.kv_client_manager.get_client(endpoint).await?;
62+
let client = self.kv_client_manager.get_endpoint_client(endpoint).await?;
5163

5264
if skip_fetch_cb(client.clone()) {
5365
return Ok(None);
@@ -64,10 +76,13 @@ impl<M: KvClientManager> ConfigFetcherMemd<M> {
6476
}
6577
};
6678

67-
let resp = client
68-
.get_cluster_config(GetClusterConfigRequest { known_version })
69-
.await
70-
.map_err(Error::new_contextual_memdx_error)?;
79+
let resp = timeout(
80+
self.fetch_timeout,
81+
client.get_cluster_config(GetClusterConfigRequest { known_version }),
82+
)
83+
.await
84+
.map_err(|e| Error::new_message_error("get cluster config timed out"))?
85+
.map_err(Error::new_contextual_memdx_error)?;
7186

7287
if resp.config.is_empty() {
7388
return Ok(None);

0 commit comments

Comments
 (0)