Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 105 additions & 244 deletions sdk/couchbase-core/src/agent.rs

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions sdk/couchbase-core/src/collection_resolver_memd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@

use crate::collectionresolver::CollectionResolver;
use crate::error::{Error, Result};
use crate::kv_orchestration::{orchestrate_kv_client, KvClientManagerClientType};
use crate::kvclient_ops::KvClientOps;
use crate::kvclientmanager::{
orchestrate_random_memd_client, KvClientManager, KvClientManagerClientType,
};
use crate::kvendpointclientmanager::KvEndpointClientManager;
use crate::memdx::request::GetCollectionIdRequest;
use crate::memdx::response::GetCollectionIdResponse;
use futures::TryFutureExt;
Expand All @@ -37,7 +36,7 @@ pub(crate) struct CollectionResolverMemdOptions<K> {

impl<K> CollectionResolverMemd<K>
where
K: KvClientManager,
K: KvEndpointClientManager,
{
pub fn new(opts: CollectionResolverMemdOptions<K>) -> Self {
Self {
Expand All @@ -48,14 +47,14 @@ where

impl<K> CollectionResolver for CollectionResolverMemd<K>
where
K: KvClientManager,
K: KvEndpointClientManager + Send + Sync,
{
async fn resolve_collection_id(
&self,
scope_name: &str,
collection_name: &str,
) -> Result<(u32, u64)> {
let resp: GetCollectionIdResponse = orchestrate_random_memd_client(
let resp: GetCollectionIdResponse = orchestrate_kv_client(
self.conn_mgr.clone(),
async |client: Arc<KvClientManagerClientType<K>>| {
client
Expand Down
197 changes: 197 additions & 0 deletions sdk/couchbase-core/src/componentconfigs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
*
* * Copyright (c) 2025 Couchbase, Inc.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
use crate::address::Address;
use crate::authenticator::Authenticator;
use crate::configmanager::ConfigManagerMemdConfig;
use crate::diagnosticscomponent::DiagnosticsComponentConfig;
use crate::httpx::client::ClientConfig;
use crate::kvclient_babysitter::KvTarget;
use crate::mgmtcomponent::MgmtComponentConfig;
use crate::parsedconfig::{ParsedConfig, ParsedConfigFeature};
use crate::querycomponent::QueryComponentConfig;
use crate::searchcomponent::SearchComponentConfig;
use crate::service_type::ServiceType;
use crate::tls_config::TlsConfig;
use crate::vbucketrouter::VbucketRoutingInfo;
use std::collections::HashMap;
use std::time::Duration;

pub(crate) struct AgentComponentConfigs {
pub kv_targets: HashMap<String, KvTarget>,
pub auth: Authenticator,
pub selected_bucket: Option<String>,

pub config_manager_memd_config: ConfigManagerMemdConfig,
pub vbucket_routing_info: VbucketRoutingInfo,
pub query_config: QueryComponentConfig,
pub search_config: SearchComponentConfig,
pub mgmt_config: MgmtComponentConfig,
pub diagnostics_config: DiagnosticsComponentConfig,
pub http_client_config: ClientConfig,
}

pub(crate) struct HttpClientConfig {
pub idle_connection_timeout: Duration,
pub max_idle_connections_per_host: Option<usize>,
pub tcp_keep_alive_time: Duration,
}

impl AgentComponentConfigs {
pub fn gen_from_config(
config: &ParsedConfig,
network_type: &str,
tls_config: Option<TlsConfig>,
bucket_name: Option<String>,
authenticator: Authenticator,
http_client_config: HttpClientConfig,
) -> AgentComponentConfigs {
let rev_id = config.rev_id;
let network_info = config.addresses_group_for_network_type(network_type);

let mut gcccp_node_ids = Vec::new();
let mut kv_data_node_ids = Vec::new();
let mut kv_data_hosts: HashMap<String, Address> = HashMap::new();
let mut mgmt_endpoints: HashMap<String, String> = HashMap::new();
let mut query_endpoints: HashMap<String, String> = HashMap::new();
let mut search_endpoints: HashMap<String, String> = HashMap::new();

for node in network_info.nodes {
let kv_ep_id = format!("kv{}", node.node_id);
let mgmt_ep_id = format!("mgmt{}", node.node_id);
let query_ep_id = format!("query{}", node.node_id);
let search_ep_id = format!("search{}", node.node_id);

gcccp_node_ids.push(kv_ep_id.clone());

if node.has_data {
kv_data_node_ids.push(kv_ep_id.clone());
}

if tls_config.is_some() {
if let Some(p) = node.ssl_ports.kv {
kv_data_hosts.insert(
kv_ep_id,
Address {
host: node.hostname.clone(),
port: p,
},
);
}
if let Some(p) = node.ssl_ports.mgmt {
mgmt_endpoints.insert(mgmt_ep_id, format!("https://{}:{}", node.hostname, p));
}
if let Some(p) = node.ssl_ports.query {
query_endpoints.insert(query_ep_id, format!("https://{}:{}", node.hostname, p));
}
if let Some(p) = node.ssl_ports.search {
search_endpoints
.insert(search_ep_id, format!("https://{}:{}", node.hostname, p));
}
} else {
if let Some(p) = node.non_ssl_ports.kv {
kv_data_hosts.insert(
kv_ep_id,
Address {
host: node.hostname.clone(),
port: p,
},
);
}
if let Some(p) = node.non_ssl_ports.mgmt {
mgmt_endpoints.insert(mgmt_ep_id, format!("http://{}:{}", node.hostname, p));
}
if let Some(p) = node.non_ssl_ports.query {
query_endpoints.insert(query_ep_id, format!("http://{}:{}", node.hostname, p));
}
if let Some(p) = node.non_ssl_ports.search {
search_endpoints
.insert(search_ep_id, format!("http://{}:{}", node.hostname, p));
}
}
}

let mut kv_targets = HashMap::new();
for (node_id, address) in kv_data_hosts {
let target = KvTarget {
address,
tls_config: tls_config.clone(),
};

kv_targets.insert(node_id, target);
}

let vbucket_routing_info = if let Some(info) = &config.bucket {
VbucketRoutingInfo {
vbucket_info: info.vbucket_map.clone(),
server_list: kv_data_node_ids,
bucket_selected: true,
}
} else {
VbucketRoutingInfo {
vbucket_info: None,
server_list: kv_data_node_ids,
bucket_selected: false,
}
};

let mut available_services = vec![ServiceType::MEMD];
if !query_endpoints.is_empty() {
available_services.push(ServiceType::QUERY)
}
if !search_endpoints.is_empty() {
available_services.push(ServiceType::SEARCH)
}

AgentComponentConfigs {
kv_targets,
auth: authenticator.clone(),
selected_bucket: bucket_name.clone(),
config_manager_memd_config: ConfigManagerMemdConfig {
endpoints: gcccp_node_ids,
},

vbucket_routing_info,
query_config: QueryComponentConfig {
endpoints: query_endpoints,
authenticator: authenticator.clone(),
},
search_config: SearchComponentConfig {
endpoints: search_endpoints,
authenticator: authenticator.clone(),
vector_search_enabled: config
.features
.contains(&ParsedConfigFeature::FtsVectorSearch),
},
http_client_config: ClientConfig {
tls_config,
idle_connection_timeout: http_client_config.idle_connection_timeout,
max_idle_connections_per_host: http_client_config.max_idle_connections_per_host,
tcp_keep_alive_time: http_client_config.tcp_keep_alive_time,
},
mgmt_config: MgmtComponentConfig {
endpoints: mgmt_endpoints,
authenticator: authenticator.clone(),
},
diagnostics_config: DiagnosticsComponentConfig {
bucket: bucket_name,
services: available_services,
rev_id,
},
}
}
}
37 changes: 26 additions & 11 deletions sdk/couchbase-core/src/configfetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,38 @@
use crate::cbconfig;
use crate::configparser::ConfigParser;
use crate::configwatcher::ConfigWatcherMemd;
use crate::error::Error;
use crate::error::{Error, ErrorKind};
use crate::kv_orchestration::KvClientManagerClientType;
use crate::kvclient::{KvClient, StdKvClient};
use crate::kvclient_ops::KvClientOps;
use crate::kvclientmanager::{KvClientManager, KvClientManagerClientType};
use crate::kvclientpool::KvClientPool;
use crate::kvendpointclientmanager::KvEndpointClientManager;
use crate::memdx::hello_feature::HelloFeature;
use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest};
use crate::parsedconfig::ParsedConfig;
use log::{debug, trace};
use std::env;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{timeout, timeout_at};

#[derive(Clone)]
pub(crate) struct ConfigFetcherMemd<M: KvClientManager> {
pub(crate) struct ConfigFetcherMemd<M: KvEndpointClientManager> {
kv_client_manager: Arc<M>,
fetch_timeout: Duration,
}

impl<M: KvClientManager> ConfigFetcherMemd<M> {
pub fn new(kv_client_manager: Arc<M>) -> Self {
Self { kv_client_manager }
pub(crate) struct ConfigFetcherMemdOptions<M: KvEndpointClientManager> {
pub kv_client_manager: Arc<M>,
pub fetch_timeout: Duration,
}

impl<M: KvEndpointClientManager> ConfigFetcherMemd<M> {
pub fn new(opts: ConfigFetcherMemdOptions<M>) -> Self {
Self {
kv_client_manager: opts.kv_client_manager.clone(),
fetch_timeout: opts.fetch_timeout,
}
}
pub(crate) async fn poll_one(
&self,
Expand All @@ -47,7 +59,7 @@ impl<M: KvClientManager> ConfigFetcherMemd<M> {
rev_epoch: i64,
skip_fetch_cb: impl FnOnce(Arc<KvClientManagerClientType<M>>) -> bool,
) -> crate::error::Result<Option<ParsedConfig>> {
let client = self.kv_client_manager.get_client(endpoint).await?;
let client = self.kv_client_manager.get_endpoint_client(endpoint).await?;

if skip_fetch_cb(client.clone()) {
return Ok(None);
Expand All @@ -64,10 +76,13 @@ impl<M: KvClientManager> ConfigFetcherMemd<M> {
}
};

let resp = client
.get_cluster_config(GetClusterConfigRequest { known_version })
.await
.map_err(Error::new_contextual_memdx_error)?;
let resp = timeout(
self.fetch_timeout,
client.get_cluster_config(GetClusterConfigRequest { known_version }),
)
.await
.map_err(|e| Error::new_message_error("get cluster config timed out"))?
.map_err(Error::new_contextual_memdx_error)?;

if resp.config.is_empty() {
return Ok(None);
Expand Down
Loading