From cf3469a2b37808e6ded8abdd59e7f842963c65b8 Mon Sep 17 00:00:00 2001 From: Charles Dixon Date: Wed, 22 Oct 2025 15:12:29 +0100 Subject: [PATCH] RSCBC-136: Rework kv client stack --- sdk/couchbase-core/src/agent.rs | 349 +++----- .../src/collection_resolver_memd.rs | 11 +- sdk/couchbase-core/src/componentconfigs.rs | 197 +++++ sdk/couchbase-core/src/configfetcher.rs | 37 +- sdk/couchbase-core/src/configmanager.rs | 105 ++- sdk/couchbase-core/src/configwatcher.rs | 12 +- sdk/couchbase-core/src/crudcomponent.rs | 67 +- .../src/diagnosticscomponent.rs | 112 +-- sdk/couchbase-core/src/httpcomponent.rs | 10 +- sdk/couchbase-core/src/kv_orchestration.rs | 82 ++ sdk/couchbase-core/src/kvclient.rs | 257 +++--- sdk/couchbase-core/src/kvclient_babysitter.rs | 521 +++++++++++ sdk/couchbase-core/src/kvclient_ops.rs | 108 ++- sdk/couchbase-core/src/kvclientmanager.rs | 363 -------- sdk/couchbase-core/src/kvclientpool.rs | 827 ++++-------------- .../src/kvendpointclientmanager.rs | 315 +++++++ sdk/couchbase-core/src/lib.rs | 7 +- sdk/couchbase-core/src/memdx/client.rs | 18 +- sdk/couchbase-core/src/memdx/dispatcher.rs | 4 +- sdk/couchbase-core/src/mgmtcomponent.rs | 2 +- sdk/couchbase-core/src/options/agent.rs | 15 +- sdk/couchbase-core/src/querycomponent.rs | 2 +- sdk/couchbase-core/src/searchcomponent.rs | 2 +- sdk/couchbase-core/src/vbucketrouter.rs | 1 - sdk/couchbase/src/options/cluster_options.rs | 3 +- 25 files changed, 1791 insertions(+), 1636 deletions(-) create mode 100644 sdk/couchbase-core/src/componentconfigs.rs create mode 100644 sdk/couchbase-core/src/kv_orchestration.rs create mode 100644 sdk/couchbase-core/src/kvclient_babysitter.rs delete mode 100644 sdk/couchbase-core/src/kvclientmanager.rs create mode 100644 sdk/couchbase-core/src/kvendpointclientmanager.rs diff --git a/sdk/couchbase-core/src/agent.rs b/sdk/couchbase-core/src/agent.rs index 6f34bb2d..b9c13c09 100644 --- a/sdk/couchbase-core/src/agent.rs +++ b/sdk/couchbase-core/src/agent.rs @@ -36,14 +36,11 @@ use crate::error::{Error, ErrorKind, Result}; use crate::features::BucketFeature; use crate::httpcomponent::HttpComponent; use crate::httpx::client::{ClientConfig, ReqwestClient}; -use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions, StdKvClient}; -use crate::kvclient_ops::KvClientOps; -use crate::kvclientmanager::{ - KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager, -}; -use crate::kvclientpool::{ - KvClientPool, KvClientPoolConfig, KvClientPoolOptions, NaiveKvClientPool, +use crate::kvclient::{ + KvClient, KvClientBootstrapOptions, KvClientOptions, StdKvClient, UnsolicitedPacket, }; +use crate::kvclient_ops::KvClientOps; +use crate::kvclientpool::{KvClientPool, KvClientPoolOptions, StdKvClientPool}; use crate::memdx::client::Client; use crate::memdx::opcode::OpCode; use crate::memdx::packet::ResponsePacket; @@ -76,10 +73,15 @@ use std::error::Error as StdError; use std::fmt::format; use std::io::Cursor; use std::net::ToSocketAddrs; -use std::ops::Add; +use std::ops::{Add, Deref}; use std::sync::{Arc, Weak}; use std::time::Duration; +use crate::componentconfigs::{AgentComponentConfigs, HttpClientConfig}; +use crate::kvclient_babysitter::{KvTarget, StdKvClientBabysitter}; +use crate::kvendpointclientmanager::{ + KvEndpointClientManager, KvEndpointClientManagerOptions, StdKvEndpointClientManager, +}; use crate::orphan_reporter::OrphanReporter; use tokio::io::AsyncReadExt; use tokio::net; @@ -89,18 +91,18 @@ use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout, timeout_at, Instant}; -#[derive(Clone)] +#[derive(Clone, Debug)] struct AgentState { bucket: Option, tls_config: Option, - authenticator: Arc, + authenticator: Authenticator, auth_mechanisms: Vec, num_pool_connections: usize, // http_transport: - last_clients: HashMap, latest_config: ParsedConfig, network_type: String, + disable_error_map: bool, disable_mutation_tokens: bool, disable_server_durations: bool, kv_connect_timeout: Duration, @@ -112,7 +114,10 @@ struct AgentState { client_name: String, } -type AgentClientManager = StdKvClientManager>>; +type AgentClientManager = StdKvEndpointClientManager< + StdKvClientPool>, StdKvClient>, + StdKvClient, +>; type AgentCollectionResolver = CollectionResolverCached>; pub(crate) struct AgentInner { @@ -144,19 +149,24 @@ pub struct Agent { pub(crate) inner: Arc, } -struct AgentComponentConfigs { - pub config_manager_memd_config: ConfigManagerMemdConfig, - pub kv_client_manager_client_configs: HashMap, - pub vbucket_routing_info: VbucketRoutingInfo, - pub query_config: QueryComponentConfig, - pub search_config: SearchComponentConfig, - pub mgmt_config: MgmtComponentConfig, - pub diagnostics_config: DiagnosticsComponentConfig, - pub http_client_config: ClientConfig, -} - impl AgentInner { - pub async fn unsolicited_packet_handler(&self, packet: ResponsePacket) { + fn gen_agent_component_configs_locked(state: &AgentState) -> AgentComponentConfigs { + AgentComponentConfigs::gen_from_config( + &state.latest_config, + &state.network_type, + state.tls_config.clone(), + state.bucket.clone(), + state.authenticator.clone(), + HttpClientConfig { + idle_connection_timeout: state.http_idle_connection_timeout, + max_idle_connections_per_host: state.http_max_idle_connections_per_host, + tcp_keep_alive_time: state.tcp_keep_alive_time, + }, + ) + } + + pub async fn unsolicited_packet_handler(&self, up: UnsolicitedPacket) { + let packet = up.packet; if packet.op_code == OpCode::Set { if let Some(ref extras) = packet.extras { if extras.len() < 16 { @@ -170,7 +180,7 @@ impl AgentInner { if let Some(config) = self .cfg_manager - .out_of_band_version(server_rev_id, server_rev_epoch) + .out_of_band_version(server_rev_id, server_rev_epoch, up.endpoint_id) .await { self.apply_config(config).await; @@ -191,13 +201,13 @@ impl AgentInner { ); state.latest_config = config; - self.update_state(&mut state).await; + self.update_state_locked(&mut state).await; } - async fn update_state(&self, state: &mut AgentState) { - debug!("Agent updating state"); + async fn update_state_locked(&self, state: &mut AgentState) { + debug!("Agent updating state {:?}", state); - let agent_component_configs = Self::gen_agent_component_configs(state); + let agent_component_configs = Self::gen_agent_component_configs_locked(state); // In order to avoid race conditions between operations selecting the // endpoint they need to send the request to, and fetching an actual @@ -206,26 +216,12 @@ impl AgentInner { // the routing table. Then go back and remove the old entries from // the connection manager list. - let mut old_clients = HashMap::new(); - for (client_name, client) in &state.last_clients { - old_clients.insert(client_name.clone(), client.clone()); - } - - for (client_name, client) in &agent_component_configs.kv_client_manager_client_configs { - old_clients - .entry(client_name.clone()) - .or_insert(client.clone()); - } - if let Err(e) = self .conn_mgr - .reconfigure(KvClientManagerConfig { - num_pool_connections: state.num_pool_connections, - clients: old_clients, - }) + .update_endpoints(agent_component_configs.kv_targets.clone(), true) .await { - error!("Failed to reconfigure connection manager (old clients); {e}"); + error!("Failed to reconfigure connection manager (add-only); {e}"); }; self.vb_router @@ -240,13 +236,10 @@ impl AgentInner { if let Err(e) = self .conn_mgr - .reconfigure(KvClientManagerConfig { - num_pool_connections: state.num_pool_connections, - clients: agent_component_configs.kv_client_manager_client_configs, - }) + .update_endpoints(agent_component_configs.kv_targets, false) .await { - error!("Failed to reconfigure connection manager (updated clients); {e}"); + error!("Failed to reconfigure connection manager; {e}"); } if let Err(e) = self @@ -264,150 +257,6 @@ impl AgentInner { .reconfigure(agent_component_configs.diagnostics_config); } - fn gen_agent_component_configs(state: &mut AgentState) -> AgentComponentConfigs { - let config = &state.latest_config; - let rev_id = config.rev_id; - let network_info = config.addresses_group_for_network_type(&state.network_type); - - let mut gcccp_node_ids = Vec::new(); - let mut kv_data_node_ids = Vec::new(); - let mut kv_data_hosts: HashMap = HashMap::new(); - let mut mgmt_endpoints: HashMap = HashMap::new(); - let mut query_endpoints: HashMap = HashMap::new(); - let mut search_endpoints: HashMap = HashMap::new(); - - for node in network_info.nodes { - let kv_ep_id = format!("kv{}", node.node_id); - let mgmt_ep_id = format!("mgmt{}", node.node_id); - let query_ep_id = format!("query{}", node.node_id); - let search_ep_id = format!("search{}", node.node_id); - - gcccp_node_ids.push(kv_ep_id.clone()); - - if node.has_data { - kv_data_node_ids.push(kv_ep_id.clone()); - } - - if state.tls_config.is_some() { - if let Some(p) = node.ssl_ports.kv { - kv_data_hosts.insert( - kv_ep_id, - Address { - host: node.hostname.clone(), - port: p, - }, - ); - } - if let Some(p) = node.ssl_ports.mgmt { - mgmt_endpoints.insert(mgmt_ep_id, format!("https://{}:{}", node.hostname, p)); - } - if let Some(p) = node.ssl_ports.query { - query_endpoints.insert(query_ep_id, format!("https://{}:{}", node.hostname, p)); - } - if let Some(p) = node.ssl_ports.search { - search_endpoints - .insert(search_ep_id, format!("https://{}:{}", node.hostname, p)); - } - } else { - if let Some(p) = node.non_ssl_ports.kv { - kv_data_hosts.insert( - kv_ep_id, - Address { - host: node.hostname.clone(), - port: p, - }, - ); - } - if let Some(p) = node.non_ssl_ports.mgmt { - mgmt_endpoints.insert(mgmt_ep_id, format!("http://{}:{}", node.hostname, p)); - } - if let Some(p) = node.non_ssl_ports.query { - query_endpoints.insert(query_ep_id, format!("http://{}:{}", node.hostname, p)); - } - if let Some(p) = node.non_ssl_ports.search { - search_endpoints - .insert(search_ep_id, format!("http://{}:{}", node.hostname, p)); - } - } - } - - let mut clients = HashMap::new(); - for (node_id, address) in kv_data_hosts { - let config = KvClientConfig { - address, - tls: state.tls_config.clone(), - client_name: state.client_name.clone(), - authenticator: state.authenticator.clone(), - selected_bucket: state.bucket.clone(), - disable_error_map: false, - auth_mechanisms: state.auth_mechanisms.clone(), - disable_mutation_tokens: state.disable_mutation_tokens, - disable_server_durations: state.disable_server_durations, - connect_timeout: state.kv_connect_timeout, - tcp_keep_alive_time: state.tcp_keep_alive_time, - }; - clients.insert(node_id, config); - } - - let vbucket_routing_info = if let Some(info) = &state.latest_config.bucket { - VbucketRoutingInfo { - vbucket_info: info.vbucket_map.clone(), - server_list: kv_data_node_ids, - bucket_selected: state.bucket.is_some(), - } - } else { - VbucketRoutingInfo { - vbucket_info: None, - server_list: kv_data_node_ids, - bucket_selected: state.bucket.is_some(), - } - }; - - let mut available_services = vec![ServiceType::MEMD]; - if !query_endpoints.is_empty() { - available_services.push(ServiceType::QUERY) - } - if !search_endpoints.is_empty() { - available_services.push(ServiceType::SEARCH) - } - - AgentComponentConfigs { - config_manager_memd_config: ConfigManagerMemdConfig { - endpoints: gcccp_node_ids, - }, - kv_client_manager_client_configs: clients, - vbucket_routing_info, - query_config: QueryComponentConfig { - endpoints: query_endpoints, - authenticator: state.authenticator.clone(), - }, - search_config: SearchComponentConfig { - endpoints: search_endpoints, - authenticator: state.authenticator.clone(), - vector_search_enabled: state - .latest_config - .features - .contains(&ParsedConfigFeature::FtsVectorSearch), - }, - http_client_config: ClientConfig { - tls_config: state.tls_config.clone(), - idle_connection_timeout: state.http_idle_connection_timeout, - max_idle_connections_per_host: state.http_max_idle_connections_per_host, - tcp_keep_alive_time: state.tcp_keep_alive_time, - }, - mgmt_config: MgmtComponentConfig { - endpoints: mgmt_endpoints, - authenticator: state.authenticator.clone(), - }, - diagnostics_config: DiagnosticsComponentConfig { - bucket: state.bucket.clone(), - services: available_services, - rev_id, - }, - } - } - - // TODO: This really shouldn't be async pub async fn bucket_features(&self) -> Result> { let guard = self.state.lock().await; @@ -491,14 +340,14 @@ impl Agent { let mut state = AgentState { bucket: opts.bucket_name.clone(), - authenticator: Arc::new(opts.authenticator), + authenticator: opts.authenticator.clone(), num_pool_connections: opts.kv_config.num_connections, - last_clients: Default::default(), latest_config: ParsedConfig::default(), network_type: "".to_string(), client_name: client_name.clone(), tls_config: opts.tls_config, - auth_mechanisms, + auth_mechanisms: auth_mechanisms.clone(), + disable_error_map: !opts.kv_config.enable_error_map, disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens, disable_server_durations: !opts.kv_config.enable_server_durations, kv_connect_timeout: opts.kv_config.connect_timeout, @@ -527,6 +376,7 @@ impl Agent { Self::gen_first_http_endpoints(&opts.seed_config.http_addrs, &state); let first_config = Self::get_first_config( first_kv_client_configs, + &state, first_http_client_configs, http_client.clone(), err_map_component.clone(), @@ -539,32 +389,38 @@ impl Agent { let network_type = NetworkTypeHeuristic::identify(&state.latest_config); state.network_type = network_type; - let agent_component_configs = AgentInner::gen_agent_component_configs(&mut state); + let agent_component_configs = AgentInner::gen_agent_component_configs_locked(&state); let err_map_component_conn_mgr = err_map_component.clone(); let num_pool_connections = state.num_pool_connections; - let state = Arc::new(Mutex::new(state)); let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel(); let conn_mgr = Arc::new( - StdKvClientManager::new( - KvClientManagerConfig { - num_pool_connections, - clients: agent_component_configs.kv_client_manager_client_configs, - }, - KvClientManagerOptions { - connect_timeout, - connect_throttle_period: opts.kv_config.connect_throttle_timeout, - unsolicited_packet_tx: Some(unsolicited_packet_tx), - orphan_handler: opts.orphan_response_handler, - on_err_map_fetched_handler: Some(Arc::new(move |err_map| { + StdKvEndpointClientManager::new(KvEndpointClientManagerOptions { + on_close_handler: Arc::new(|_manager_id| {}), + num_pool_connections, + connect_throttle_period: opts.kv_config.connect_throttle_timeout, + bootstrap_options: KvClientBootstrapOptions { + client_name: client_name.clone(), + disable_error_map: state.disable_error_map, + disable_mutation_tokens: state.disable_mutation_tokens, + disable_server_durations: state.disable_server_durations, + on_err_map_fetched: Some(Arc::new(move |err_map| { err_map_component_conn_mgr.on_err_map(err_map); })), - disable_decompression: opts.compression_config.disable_decompression, + tcp_keep_alive_time: state.tcp_keep_alive_time, + auth_mechanisms, + connect_timeout, }, - ) + unsolicited_packet_tx: Some(unsolicited_packet_tx), + orphan_handler: opts.orphan_response_handler, + endpoints: agent_component_configs.kv_targets, + authenticator: opts.authenticator, + disable_decompression: opts.compression_config.disable_decompression, + selected_bucket: opts.bucket_name, + }) .await?, ); @@ -574,6 +430,7 @@ impl Agent { polling_period: opts.config_poller_config.poll_interval, kv_client_manager: conn_mgr.clone(), first_config, + fetch_timeout: opts.config_poller_config.fetch_timeout, }, )); let vb_router = Arc::new(StdVbucketRouter::new( @@ -640,6 +497,8 @@ impl Agent { agent_component_configs.diagnostics_config, ); + let state = Arc::new(Mutex::new(state)); + let inner = Arc::new(AgentInner { state, cfg_manager: cfg_manager.clone(), @@ -717,35 +576,46 @@ impl Agent { } async fn get_first_config( - kv_client_manager_client_configs: HashMap, + kv_targets: HashMap, + state: &AgentState, http_configs: HashMap, http_client: Arc, err_map_component: Arc, connect_timeout: Duration, ) -> Result { loop { - for endpoint_config in kv_client_manager_client_configs.values() { - let host = &endpoint_config.address; + for target in kv_targets.values() { + let host = &target.address; let err_map_component_clone = err_map_component.clone(); let timeout_result = timeout( connect_timeout, - StdKvClient::new( - endpoint_config.clone(), - KvClientOptions { - unsolicited_packet_tx: None, - orphan_handler: None, - on_close: Arc::new(|id| { - Box::pin(async move { - debug!("Bootstrap client {id} closed"); - }) - }), + StdKvClient::new(KvClientOptions { + address: target.clone(), + authenticator: state.authenticator.clone(), + selected_bucket: state.bucket.clone(), + bootstrap_options: KvClientBootstrapOptions { + client_name: state.client_name.clone(), + disable_error_map: state.disable_error_map, + disable_mutation_tokens: true, + disable_server_durations: true, on_err_map_fetched: Some(Arc::new(move |err_map| { err_map_component_clone.on_err_map(err_map); })), - disable_decompression: false, - id: Uuid::new_v4().to_string(), + tcp_keep_alive_time: state.tcp_keep_alive_time, + auth_mechanisms: state.auth_mechanisms.clone(), + connect_timeout, }, - ), + endpoint_id: "".to_string(), + unsolicited_packet_tx: None, + orphan_handler: None, + on_close: Arc::new(|id| { + Box::pin(async move { + debug!("Bootstrap client {id} closed"); + }) + }), + disable_decompression: false, + id: Uuid::new_v4().to_string(), + }), ) .await; @@ -793,7 +663,7 @@ impl Agent { for endpoint_config in http_configs.values() { let endpoint = endpoint_config.endpoint.clone(); let host = get_host_port_from_uri(&endpoint)?; - let user_pass = match endpoint_config.authenticator.as_ref() { + let user_pass = match &endpoint_config.authenticator { Authenticator::PasswordAuthenticator(authenticator) => { authenticator.get_credentials(&ServiceType::MGMT, host)? } @@ -878,24 +748,15 @@ impl Agent { fn gen_first_kv_client_configs( memd_addrs: &Vec
, state: &AgentState, - ) -> HashMap { + ) -> HashMap { let mut clients = HashMap::new(); for addr in memd_addrs { - let node_id = format!("kv{addr}"); - let config = KvClientConfig { + let node_id = format!("kv-{addr}"); + let target = KvTarget { address: addr.clone(), - tls: state.tls_config.clone(), - client_name: state.client_name.clone(), - authenticator: state.authenticator.clone(), - selected_bucket: state.bucket.clone(), - disable_error_map: false, - auth_mechanisms: state.auth_mechanisms.clone(), - disable_mutation_tokens: state.disable_mutation_tokens, - disable_server_durations: state.disable_server_durations, - connect_timeout: state.kv_connect_timeout, - tcp_keep_alive_time: state.tcp_keep_alive_time, + tls_config: state.tls_config.clone(), }; - clients.insert(node_id, config); + clients.insert(node_id, target); } clients @@ -952,6 +813,6 @@ struct FirstHttpConfig { pub endpoint: String, pub tls: Option, pub user_agent: String, - pub authenticator: Arc, + pub authenticator: Authenticator, pub bucket_name: Option, } diff --git a/sdk/couchbase-core/src/collection_resolver_memd.rs b/sdk/couchbase-core/src/collection_resolver_memd.rs index e2eea3f1..cd816778 100644 --- a/sdk/couchbase-core/src/collection_resolver_memd.rs +++ b/sdk/couchbase-core/src/collection_resolver_memd.rs @@ -18,10 +18,9 @@ use crate::collectionresolver::CollectionResolver; use crate::error::{Error, Result}; +use crate::kv_orchestration::{orchestrate_kv_client, KvClientManagerClientType}; use crate::kvclient_ops::KvClientOps; -use crate::kvclientmanager::{ - orchestrate_random_memd_client, KvClientManager, KvClientManagerClientType, -}; +use crate::kvendpointclientmanager::KvEndpointClientManager; use crate::memdx::request::GetCollectionIdRequest; use crate::memdx::response::GetCollectionIdResponse; use futures::TryFutureExt; @@ -37,7 +36,7 @@ pub(crate) struct CollectionResolverMemdOptions { impl CollectionResolverMemd where - K: KvClientManager, + K: KvEndpointClientManager, { pub fn new(opts: CollectionResolverMemdOptions) -> Self { Self { @@ -48,14 +47,14 @@ where impl CollectionResolver for CollectionResolverMemd where - K: KvClientManager, + K: KvEndpointClientManager + Send + Sync, { async fn resolve_collection_id( &self, scope_name: &str, collection_name: &str, ) -> Result<(u32, u64)> { - let resp: GetCollectionIdResponse = orchestrate_random_memd_client( + let resp: GetCollectionIdResponse = orchestrate_kv_client( self.conn_mgr.clone(), async |client: Arc>| { client diff --git a/sdk/couchbase-core/src/componentconfigs.rs b/sdk/couchbase-core/src/componentconfigs.rs new file mode 100644 index 00000000..1e6136ed --- /dev/null +++ b/sdk/couchbase-core/src/componentconfigs.rs @@ -0,0 +1,197 @@ +/* + * + * * Copyright (c) 2025 Couchbase, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +use crate::address::Address; +use crate::authenticator::Authenticator; +use crate::configmanager::ConfigManagerMemdConfig; +use crate::diagnosticscomponent::DiagnosticsComponentConfig; +use crate::httpx::client::ClientConfig; +use crate::kvclient_babysitter::KvTarget; +use crate::mgmtcomponent::MgmtComponentConfig; +use crate::parsedconfig::{ParsedConfig, ParsedConfigFeature}; +use crate::querycomponent::QueryComponentConfig; +use crate::searchcomponent::SearchComponentConfig; +use crate::service_type::ServiceType; +use crate::tls_config::TlsConfig; +use crate::vbucketrouter::VbucketRoutingInfo; +use std::collections::HashMap; +use std::time::Duration; + +pub(crate) struct AgentComponentConfigs { + pub kv_targets: HashMap, + pub auth: Authenticator, + pub selected_bucket: Option, + + pub config_manager_memd_config: ConfigManagerMemdConfig, + pub vbucket_routing_info: VbucketRoutingInfo, + pub query_config: QueryComponentConfig, + pub search_config: SearchComponentConfig, + pub mgmt_config: MgmtComponentConfig, + pub diagnostics_config: DiagnosticsComponentConfig, + pub http_client_config: ClientConfig, +} + +pub(crate) struct HttpClientConfig { + pub idle_connection_timeout: Duration, + pub max_idle_connections_per_host: Option, + pub tcp_keep_alive_time: Duration, +} + +impl AgentComponentConfigs { + pub fn gen_from_config( + config: &ParsedConfig, + network_type: &str, + tls_config: Option, + bucket_name: Option, + authenticator: Authenticator, + http_client_config: HttpClientConfig, + ) -> AgentComponentConfigs { + let rev_id = config.rev_id; + let network_info = config.addresses_group_for_network_type(network_type); + + let mut gcccp_node_ids = Vec::new(); + let mut kv_data_node_ids = Vec::new(); + let mut kv_data_hosts: HashMap = HashMap::new(); + let mut mgmt_endpoints: HashMap = HashMap::new(); + let mut query_endpoints: HashMap = HashMap::new(); + let mut search_endpoints: HashMap = HashMap::new(); + + for node in network_info.nodes { + let kv_ep_id = format!("kv{}", node.node_id); + let mgmt_ep_id = format!("mgmt{}", node.node_id); + let query_ep_id = format!("query{}", node.node_id); + let search_ep_id = format!("search{}", node.node_id); + + gcccp_node_ids.push(kv_ep_id.clone()); + + if node.has_data { + kv_data_node_ids.push(kv_ep_id.clone()); + } + + if tls_config.is_some() { + if let Some(p) = node.ssl_ports.kv { + kv_data_hosts.insert( + kv_ep_id, + Address { + host: node.hostname.clone(), + port: p, + }, + ); + } + if let Some(p) = node.ssl_ports.mgmt { + mgmt_endpoints.insert(mgmt_ep_id, format!("https://{}:{}", node.hostname, p)); + } + if let Some(p) = node.ssl_ports.query { + query_endpoints.insert(query_ep_id, format!("https://{}:{}", node.hostname, p)); + } + if let Some(p) = node.ssl_ports.search { + search_endpoints + .insert(search_ep_id, format!("https://{}:{}", node.hostname, p)); + } + } else { + if let Some(p) = node.non_ssl_ports.kv { + kv_data_hosts.insert( + kv_ep_id, + Address { + host: node.hostname.clone(), + port: p, + }, + ); + } + if let Some(p) = node.non_ssl_ports.mgmt { + mgmt_endpoints.insert(mgmt_ep_id, format!("http://{}:{}", node.hostname, p)); + } + if let Some(p) = node.non_ssl_ports.query { + query_endpoints.insert(query_ep_id, format!("http://{}:{}", node.hostname, p)); + } + if let Some(p) = node.non_ssl_ports.search { + search_endpoints + .insert(search_ep_id, format!("http://{}:{}", node.hostname, p)); + } + } + } + + let mut kv_targets = HashMap::new(); + for (node_id, address) in kv_data_hosts { + let target = KvTarget { + address, + tls_config: tls_config.clone(), + }; + + kv_targets.insert(node_id, target); + } + + let vbucket_routing_info = if let Some(info) = &config.bucket { + VbucketRoutingInfo { + vbucket_info: info.vbucket_map.clone(), + server_list: kv_data_node_ids, + bucket_selected: true, + } + } else { + VbucketRoutingInfo { + vbucket_info: None, + server_list: kv_data_node_ids, + bucket_selected: false, + } + }; + + let mut available_services = vec![ServiceType::MEMD]; + if !query_endpoints.is_empty() { + available_services.push(ServiceType::QUERY) + } + if !search_endpoints.is_empty() { + available_services.push(ServiceType::SEARCH) + } + + AgentComponentConfigs { + kv_targets, + auth: authenticator.clone(), + selected_bucket: bucket_name.clone(), + config_manager_memd_config: ConfigManagerMemdConfig { + endpoints: gcccp_node_ids, + }, + + vbucket_routing_info, + query_config: QueryComponentConfig { + endpoints: query_endpoints, + authenticator: authenticator.clone(), + }, + search_config: SearchComponentConfig { + endpoints: search_endpoints, + authenticator: authenticator.clone(), + vector_search_enabled: config + .features + .contains(&ParsedConfigFeature::FtsVectorSearch), + }, + http_client_config: ClientConfig { + tls_config, + idle_connection_timeout: http_client_config.idle_connection_timeout, + max_idle_connections_per_host: http_client_config.max_idle_connections_per_host, + tcp_keep_alive_time: http_client_config.tcp_keep_alive_time, + }, + mgmt_config: MgmtComponentConfig { + endpoints: mgmt_endpoints, + authenticator: authenticator.clone(), + }, + diagnostics_config: DiagnosticsComponentConfig { + bucket: bucket_name, + services: available_services, + rev_id, + }, + } + } +} diff --git a/sdk/couchbase-core/src/configfetcher.rs b/sdk/couchbase-core/src/configfetcher.rs index fea2f3e6..9fd55418 100644 --- a/sdk/couchbase-core/src/configfetcher.rs +++ b/sdk/couchbase-core/src/configfetcher.rs @@ -19,26 +19,38 @@ use crate::cbconfig; use crate::configparser::ConfigParser; use crate::configwatcher::ConfigWatcherMemd; -use crate::error::Error; +use crate::error::{Error, ErrorKind}; +use crate::kv_orchestration::KvClientManagerClientType; use crate::kvclient::{KvClient, StdKvClient}; use crate::kvclient_ops::KvClientOps; -use crate::kvclientmanager::{KvClientManager, KvClientManagerClientType}; use crate::kvclientpool::KvClientPool; +use crate::kvendpointclientmanager::KvEndpointClientManager; use crate::memdx::hello_feature::HelloFeature; use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest}; use crate::parsedconfig::ParsedConfig; use log::{debug, trace}; use std::env; use std::sync::Arc; +use std::time::Duration; +use tokio::time::{timeout, timeout_at}; #[derive(Clone)] -pub(crate) struct ConfigFetcherMemd { +pub(crate) struct ConfigFetcherMemd { kv_client_manager: Arc, + fetch_timeout: Duration, } -impl ConfigFetcherMemd { - pub fn new(kv_client_manager: Arc) -> Self { - Self { kv_client_manager } +pub(crate) struct ConfigFetcherMemdOptions { + pub kv_client_manager: Arc, + pub fetch_timeout: Duration, +} + +impl ConfigFetcherMemd { + pub fn new(opts: ConfigFetcherMemdOptions) -> Self { + Self { + kv_client_manager: opts.kv_client_manager.clone(), + fetch_timeout: opts.fetch_timeout, + } } pub(crate) async fn poll_one( &self, @@ -47,7 +59,7 @@ impl ConfigFetcherMemd { rev_epoch: i64, skip_fetch_cb: impl FnOnce(Arc>) -> bool, ) -> crate::error::Result> { - let client = self.kv_client_manager.get_client(endpoint).await?; + let client = self.kv_client_manager.get_endpoint_client(endpoint).await?; if skip_fetch_cb(client.clone()) { return Ok(None); @@ -64,10 +76,13 @@ impl ConfigFetcherMemd { } }; - let resp = client - .get_cluster_config(GetClusterConfigRequest { known_version }) - .await - .map_err(Error::new_contextual_memdx_error)?; + let resp = timeout( + self.fetch_timeout, + client.get_cluster_config(GetClusterConfigRequest { known_version }), + ) + .await + .map_err(|e| Error::new_message_error("get cluster config timed out"))? + .map_err(Error::new_contextual_memdx_error)?; if resp.config.is_empty() { return Ok(None); diff --git a/sdk/couchbase-core/src/configmanager.rs b/sdk/couchbase-core/src/configmanager.rs index 8b6a0109..4494f250 100644 --- a/sdk/couchbase-core/src/configmanager.rs +++ b/sdk/couchbase-core/src/configmanager.rs @@ -18,10 +18,10 @@ use crate::agent::AgentInner; use crate::cbconfig::TerseConfig; -use crate::configfetcher::ConfigFetcherMemd; +use crate::configfetcher::{ConfigFetcherMemd, ConfigFetcherMemdOptions}; use crate::configparser::ConfigParser; use crate::configwatcher::{ConfigWatcherMemd, ConfigWatcherMemdConfig, ConfigWatcherMemdOptions}; -use crate::kvclientmanager::KvClientManager; +use crate::kvendpointclientmanager::KvEndpointClientManager; use crate::nmvbhandler::ConfigUpdater; use crate::parsedconfig::{ParsedConfig, ParsedConfigBucket}; use log::{debug, warn}; @@ -41,6 +41,7 @@ pub(crate) trait ConfigManager: Sized + Send + Sync { &self, rev_id: i64, rev_epoch: i64, + endpoint_id: String, ) -> impl Future> + Send; fn out_of_band_config(&self, config: ParsedConfig) -> Option; } @@ -55,17 +56,18 @@ pub(crate) struct ConfigManagerMemdConfig { pub endpoints: Vec, } -pub(crate) struct ConfigManagerMemdOptions { +pub(crate) struct ConfigManagerMemdOptions { pub polling_period: Duration, pub first_config: ParsedConfig, pub kv_client_manager: Arc, + pub fetch_timeout: Duration, } -pub(crate) struct ConfigManagerMemd { +pub(crate) struct ConfigManagerMemd { inner: Arc>, } -pub(crate) struct ConfigManagerMemdInner { +pub(crate) struct ConfigManagerMemdInner { fetcher: Arc>, watcher: Arc>, @@ -81,7 +83,7 @@ pub(crate) struct ConfigManagerMemdInner { watcher_shutdown_tx: broadcast::Sender<()>, } -impl ConfigManagerMemdInner { +impl ConfigManagerMemdInner { pub fn watch(&self) -> watch::Receiver { self.on_new_config_tx.subscribe() } @@ -92,7 +94,12 @@ impl ConfigManagerMemdInner { }) } - async fn perform_out_of_band_fetch(&self, rev_id: i64, rev_epoch: i64) -> Option { + async fn perform_out_of_band_fetch( + &self, + rev_id: i64, + rev_epoch: i64, + endpoint_id: String, + ) -> Option { loop { let (latest_rev_epoch, latest_rev_id) = { let latest_config = self.latest_config.lock().unwrap(); @@ -104,8 +111,8 @@ impl ConfigManagerMemdInner { { debug!( "Skipping out-of-band fetch, already have newer or same version: new: \ - rev_epoch={rev_epoch}, rev_id={rev_id}, old: rev_epoch={latest_rev_epoch}, \ - rev_id={latest_rev_id}" + rev_epoch={rev_epoch}, rev_id={rev_id}, old: rev_epoch={latest_rev_epoch}, \ + rev_id={latest_rev_id}" ); // No need to poll, we already have a newer version. return None; @@ -123,34 +130,30 @@ impl ConfigManagerMemdInner { self.out_of_band_notify.notified().await; continue; } else { - let endpoints = self.watcher.endpoints(); - - for endpoint in endpoints { - let parsed_config = match self - .fetcher - .poll_one(&endpoint, latest_rev_id, latest_rev_epoch, |_c| false) - .await - { - Ok(c) => c, - Err(e) => { - warn!("Out-of-band fetch from {endpoint} failed: {e}"); - continue; - } - }; - - if let Some(parsed_config) = parsed_config { - if let Some(cfg) = Self::handle_config( - self.latest_config.lock().unwrap(), - parsed_config, - self.latest_version_tx.clone(), - ) { - self.performing_out_of_band_fetch - .store(false, std::sync::atomic::Ordering::SeqCst); - self.out_of_band_notify.notify_waiters(); - - return Some(cfg); - }; + let parsed_config = match self + .fetcher + .poll_one(&endpoint_id, latest_rev_id, latest_rev_epoch, |_c| false) + .await + { + Ok(c) => c, + Err(e) => { + debug!("Out-of-band fetch from {endpoint_id} failed: {e}"); + return None; } + }; + + if let Some(parsed_config) = parsed_config { + if let Some(cfg) = Self::handle_config( + self.latest_config.lock().unwrap(), + parsed_config, + self.latest_version_tx.clone(), + ) { + self.performing_out_of_band_fetch + .store(false, std::sync::atomic::Ordering::SeqCst); + self.out_of_band_notify.notify_waiters(); + + return Some(cfg); + }; } debug!("No newer config found during out-of-band fetch"); @@ -164,8 +167,14 @@ impl ConfigManagerMemdInner { } } - pub async fn out_of_band_version(&self, rev_id: i64, rev_epoch: i64) -> Option { - self.perform_out_of_band_fetch(rev_id, rev_epoch).await + pub async fn out_of_band_version( + &self, + rev_id: i64, + rev_epoch: i64, + endpoint_id: String, + ) -> Option { + self.perform_out_of_band_fetch(rev_id, rev_epoch, endpoint_id) + .await } pub fn out_of_band_config(&self, parsed_config: ParsedConfig) -> Option { @@ -267,7 +276,7 @@ impl ConfigManagerMemdInner { } } -impl ConfigManagerMemd { +impl ConfigManagerMemd { pub fn new( config: ConfigManagerMemdConfig, opts: ConfigManagerMemdOptions, @@ -279,7 +288,10 @@ impl ConfigManagerMemd { let (latest_version_tx, latest_version_rx) = watch::channel(latest_version.clone()); - let fetcher = Arc::new(ConfigFetcherMemd::new(opts.kv_client_manager)); + let fetcher = Arc::new(ConfigFetcherMemd::new(ConfigFetcherMemdOptions { + kv_client_manager: opts.kv_client_manager, + fetch_timeout: opts.fetch_timeout, + })); let watcher = Arc::new(ConfigWatcherMemd::new( ConfigWatcherMemdConfig { endpoints: config.endpoints, @@ -319,7 +331,7 @@ impl ConfigManagerMemd { } } -impl ConfigManager for ConfigManagerMemd { +impl ConfigManager for ConfigManagerMemd { fn watch(&self) -> watch::Receiver { self.inner.watch() } @@ -328,9 +340,16 @@ impl ConfigManager for ConfigManagerMemd { self.inner.reconfigure(config) } - async fn out_of_band_version(&self, rev_id: i64, rev_epoch: i64) -> Option { + async fn out_of_band_version( + &self, + rev_id: i64, + rev_epoch: i64, + endpoint_id: String, + ) -> Option { let inner = self.inner.clone(); - inner.out_of_band_version(rev_id, rev_epoch).await + inner + .out_of_band_version(rev_id, rev_epoch, endpoint_id) + .await } fn out_of_band_config(&self, config: ParsedConfig) -> Option { diff --git a/sdk/couchbase-core/src/configwatcher.rs b/sdk/couchbase-core/src/configwatcher.rs index 57f6294d..120625f5 100644 --- a/sdk/couchbase-core/src/configwatcher.rs +++ b/sdk/couchbase-core/src/configwatcher.rs @@ -24,7 +24,7 @@ use crate::configparser::ConfigParser; use crate::error::{Error, Result}; use crate::kvclient::KvClient; use crate::kvclient_ops::KvClientOps; -use crate::kvclientmanager::KvClientManager; +use crate::kvendpointclientmanager::KvEndpointClientManager; use crate::memdx::hello_feature::HelloFeature; use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest}; use crate::parsedconfig::ParsedConfig; @@ -46,20 +46,20 @@ pub(crate) struct ConfigWatcherMemdConfig { pub endpoints: Vec, } -pub(crate) struct ConfigWatcherMemdOptions { +pub(crate) struct ConfigWatcherMemdOptions { pub polling_period: Duration, pub config_fetcher: Arc>, pub latest_version_rx: watch::Receiver, } -pub struct ConfigWatcherMemdInner { +pub struct ConfigWatcherMemdInner { config_fetcher: Arc>, polling_period: Duration, endpoints: Mutex>, latest_version_rx: watch::Receiver, } -impl ConfigWatcherMemdInner { +impl ConfigWatcherMemdInner { pub fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> Result<()> { let mut endpoints = self.endpoints.lock().unwrap(); *endpoints = config.endpoints; @@ -179,13 +179,13 @@ impl ConfigWatcherMemdInner { } #[derive(Clone)] -pub(crate) struct ConfigWatcherMemd { +pub(crate) struct ConfigWatcherMemd { inner: Arc>, } impl ConfigWatcherMemd where - M: KvClientManager + 'static, + M: KvEndpointClientManager + 'static, { pub fn new(config: ConfigWatcherMemdConfig, opts: ConfigWatcherMemdOptions) -> Self { Self { diff --git a/sdk/couchbase-core/src/crudcomponent.rs b/sdk/couchbase-core/src/crudcomponent.rs index 9e10b398..555bb0cc 100644 --- a/sdk/couchbase-core/src/crudcomponent.rs +++ b/sdk/couchbase-core/src/crudcomponent.rs @@ -24,12 +24,12 @@ use crate::collectionresolver::{orchestrate_memd_collection_id, CollectionResolv use crate::compressionmanager::{CompressionManager, Compressor}; use crate::error; use crate::error::{Error, MemdxError, Result}; +use crate::kv_orchestration::{ + orchestrate_endpoint_kv_client, orchestrate_kv_client, KvClientManagerClientType, +}; use crate::kvclient::KvClient; use crate::kvclient_ops::KvClientOps; -use crate::kvclientmanager::{ - orchestrate_memd_client, orchestrate_random_memd_client, KvClientManager, - KvClientManagerClientType, -}; +use crate::kvendpointclientmanager::KvEndpointClientManager; use crate::memdx::datatype::DataTypeFlag; use crate::memdx::error::ServerErrorKind; use crate::memdx::hello_feature::HelloFeature; @@ -61,7 +61,7 @@ use log::debug; use tokio::time::sleep; pub(crate) struct CrudComponent< - M: KvClientManager, + M: KvEndpointClientManager, V: VbucketRouter, Nmvb: NotMyVbucketConfigHandler, C: CollectionResolver, @@ -77,7 +77,7 @@ pub(crate) struct CrudComponent< // TODO: So much clone. impl< - M: KvClientManager, + M: KvEndpointClientManager, V: VbucketRouter, Nmvb: NotMyVbucketConfigHandler, C: CollectionResolver, @@ -907,34 +907,33 @@ impl< let mut retry_info = RetryInfo::new("get_collection_id", true, opts.retry_strategy); loop { - let mut err = - match orchestrate_random_memd_client(self.conn_manager.clone(), async |client| { - client - .get_collection_id(GetCollectionIdRequest { - scope_name: opts.scope_name, - collection_name: opts.collection_name, - }) - .map_err(|e| { - let e = e - .set_bucket_name(client.bucket_name().unwrap_or_default()) - .set_collection_name(opts.collection_name.to_string()) - .set_scope_name(opts.scope_name.to_string()); + let mut err = match orchestrate_kv_client(self.conn_manager.clone(), async |client| { + client + .get_collection_id(GetCollectionIdRequest { + scope_name: opts.scope_name, + collection_name: opts.collection_name, + }) + .map_err(|e| { + let e = e + .set_bucket_name(client.bucket_name().unwrap_or_default()) + .set_collection_name(opts.collection_name.to_string()) + .set_scope_name(opts.scope_name.to_string()); - Error::new_contextual_memdx_error(e) - }) - .map_ok(|resp| GetCollectionIdResult { - collection_id: resp.collection_id, - manifest_rev: resp.manifest_rev, - }) - .await - }) - .await - { - Ok(r) => { - return Ok(r); - } - Err(e) => e, - }; + Error::new_contextual_memdx_error(e) + }) + .map_ok(|resp| GetCollectionIdResult { + collection_id: resp.collection_id, + manifest_rev: resp.manifest_rev, + }) + .await + }) + .await + { + Ok(r) => { + return Ok(r); + } + Err(e) => e, + }; if let Some(memdx_err) = err.is_memdx_error() { if memdx_err.is_server_error_kind(ServerErrorKind::UnknownCollectionName) { @@ -991,7 +990,7 @@ impl< key, 0, async |endpoint: String, vb_id: u16| { - orchestrate_memd_client( + orchestrate_endpoint_kv_client( self.conn_manager.clone(), &endpoint, async |client: Arc>| { diff --git a/sdk/couchbase-core/src/diagnosticscomponent.rs b/sdk/couchbase-core/src/diagnosticscomponent.rs index 404e89b0..c2b04b1c 100644 --- a/sdk/couchbase-core/src/diagnosticscomponent.rs +++ b/sdk/couchbase-core/src/diagnosticscomponent.rs @@ -23,8 +23,8 @@ use crate::httpx::client::Client; use crate::httpx::request::OnBehalfOfInfo; use crate::kvclient::KvClient; use crate::kvclient_ops::KvClientOps; -use crate::kvclientmanager::KvClientManager; -use crate::kvclientpool::{KvClientPool, KvClientPoolClient}; +use crate::kvclientpool::KvClientPool; +use crate::kvendpointclientmanager::KvEndpointClientManager; use crate::memdx::request::PingRequest; use crate::options::diagnostics::DiagnosticsOptions; use crate::options::ping::PingOptions; @@ -80,7 +80,7 @@ struct PingEveryKvConnectionOptions<'a> { pub(crate) on_behalf_of: Option<&'a str>, } -pub struct DiagnosticsComponent { +pub struct DiagnosticsComponent { kv_client_manager: Arc, query_component: Arc>, search_component: Arc>, @@ -103,7 +103,7 @@ struct DiagnosticsComponentState { rev_id: i64, } -impl DiagnosticsComponent { +impl DiagnosticsComponent { pub fn new( kv_client_manager: Arc, query_component: Arc>, @@ -139,36 +139,7 @@ impl DiagnosticsComponent { &self, _opts: &DiagnosticsOptions, ) -> crate::error::Result { - let pools = self.kv_client_manager.get_all_pools(); - - let mut endpoint_reports = vec![]; - for (endpoint, pool) in &pools { - for (id, client) in pool.get_all_clients().await? { - let (local_address, last_activity) = if let Some(cli) = &client.client { - ( - Some(cli.local_addr().to_string()), - Some( - Utc::now() - .sub(cli.last_activity().to_utc()) - .num_microseconds() - .unwrap_or_default(), - ), - ) - } else { - (None, None) - }; - - endpoint_reports.push(EndpointDiagnostics { - service_type: ServiceType::MEMD, - id, - local_address, - remote_address: endpoint.to_string(), - last_activity, - namespace: pool.get_bucket().await, - state: client.connection_state, - }); - } - } + let endpoint_reports = self.kv_client_manager.endpoint_diagnostics().await; Ok(DiagnosticsResult { version: 2, @@ -181,7 +152,7 @@ impl DiagnosticsComponent { pub async fn ping(&self, opts: &PingOptions) -> crate::error::Result where - <::Pool as KvClientPool>::Client: 'static, + ::Client: 'static, { let (rev_id, bucket, available_services) = { let state = self.state.lock().unwrap(); @@ -376,71 +347,22 @@ impl DiagnosticsComponent { } async fn is_kv_ready(&self, on_behalf_of: Option<&str>, desired_state: ClusterState) -> bool { - let pools = self.kv_client_manager.get_all_pools(); - let req = PingRequest { on_behalf_of }; - let mut handles = Vec::with_capacity(pools.len()); - for (pool_id, pool) in pools { - let clients = match pool.get_all_clients().await { - Ok(clients) => clients, - Err(e) => { - debug!("Failed to get clients from pool: {e}"); - return false; - } - }; + let responses = self.kv_client_manager.ping_all_clients(req).await; - let req = req.clone(); - let handle = async move { - debug!("Pinging pool {pool_id} with {} clients", clients.len()); - let mut pool_handles = Vec::with_capacity(clients.len()); - for (id, client) in clients { - let req = req.clone(); - - let handle = self.maybe_ping_client(id.clone(), req, client); - - pool_handles.push(handle); - } - - let results = join_all(pool_handles).await; - - if desired_state == ClusterState::Online { - results.into_iter().all(|ready| ready) - } else { - results.into_iter().any(|ready| ready) - } + let mut pools_ok = Vec::with_capacity(responses.len()); + for response in responses.values() { + let is_pool_ok = if desired_state == ClusterState::Online { + response.iter().all(|res| res.is_ok()) + } else { + response.iter().any(|res| res.is_ok()) }; - handles.push(handle); + pools_ok.push(is_pool_ok); } - let results = join_all(handles).await; - - results.into_iter().all(|ready| ready) - } - - async fn maybe_ping_client( - &self, - pool_id: String, - req: PingRequest<'_>, - client: KvClientPoolClient, - ) -> bool - where - K: KvClient + KvClientOps, - { - if let Some(client) = &client.client { - let client = client.clone(); - match client.ping(req).await { - Ok(_) => true, - Err(e) => { - debug!("Ping against client {} failed: {}", client.id(), e); - false - } - } - } else { - debug!("Client {pool_id} is not connected"); - false - } + pools_ok.into_iter().all(|ready| ready) } async fn ping_all_kv_nodes( @@ -448,7 +370,7 @@ impl DiagnosticsComponent { opts: PingKvOptions<'_>, ) -> crate::error::Result> where - <::Pool as KvClientPool>::Client: 'static, + ::Client: 'static, { let clients = self.kv_client_manager.get_client_per_endpoint().await?; @@ -498,7 +420,7 @@ impl DiagnosticsComponent { async fn ping_one_kv_node( &self, - client: Arc<<::Pool as KvClientPool>::Client>, + client: Arc<::Client>, timeout: Duration, req: PingRequest<'_>, ) -> EndpointPingReport { diff --git a/sdk/couchbase-core/src/httpcomponent.rs b/sdk/couchbase-core/src/httpcomponent.rs index f561c975..108e1e75 100644 --- a/sdk/couchbase-core/src/httpcomponent.rs +++ b/sdk/couchbase-core/src/httpcomponent.rs @@ -40,7 +40,7 @@ pub(crate) struct HttpComponent { pub(crate) struct HttpComponentState { endpoints: HashMap, - authenticator: Arc, + authenticator: Authenticator, } pub(crate) struct HttpEndpointProperties { @@ -94,7 +94,7 @@ impl HttpComponent { }; let host = get_host_port_from_uri(found_endpoint)?; - let user_pass = match state.authenticator.as_ref() { + let user_pass = match &state.authenticator { Authenticator::PasswordAuthenticator(authenticator) => { authenticator.get_credentials(&self.service_type, host)? } @@ -145,7 +145,7 @@ impl HttpComponent { let endpoint = remaining_endpoints[endpoint_id]; let host = get_host_port_from_uri(endpoint)?; - let user_pass = match state.authenticator.as_ref() { + let user_pass = match &state.authenticator { Authenticator::PasswordAuthenticator(authenticator) => { authenticator.get_credentials(&self.service_type, host)? } @@ -232,7 +232,7 @@ impl HttpComponent { for (_ep_id, endpoint) in remaining_endpoints { let host = get_host_port_from_uri(endpoint)?; - let user_pass = match state.authenticator.as_ref() { + let user_pass = match &state.authenticator { Authenticator::PasswordAuthenticator(authenticator) => { authenticator.get_credentials(&self.service_type, host)? } @@ -280,7 +280,7 @@ impl HttpComponent { } impl HttpComponentState { - pub fn new(endpoints: HashMap, authenticator: Arc) -> Self { + pub fn new(endpoints: HashMap, authenticator: Authenticator) -> Self { Self { endpoints, authenticator, diff --git a/sdk/couchbase-core/src/kv_orchestration.rs b/sdk/couchbase-core/src/kv_orchestration.rs new file mode 100644 index 00000000..57be9c73 --- /dev/null +++ b/sdk/couchbase-core/src/kv_orchestration.rs @@ -0,0 +1,82 @@ +/* + * + * * Copyright (c) 2025 Couchbase, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +use crate::error; +use crate::kvendpointclientmanager::KvEndpointClientManager; +use std::future::Future; +use std::sync::Arc; + +pub(crate) type KvClientManagerClientType = ::Client; + +pub(crate) async fn orchestrate_kv_client( + manager: Arc, + mut operation: impl FnMut(Arc>) -> Fut, +) -> error::Result +where + M: KvEndpointClientManager, + Fut: Future> + Send, +{ + loop { + let client = manager.get_client().await?; + + let res = operation(client.clone()).await; + return match res { + Ok(r) => Ok(r), + Err(e) => { + if let Some(memdx_err) = e.is_memdx_error() { + if memdx_err.is_dispatch_error() { + // this was a dispatch error, so we can just try with + // a different client instead... + continue; + } + } + + Err(e) + } + }; + } +} + +pub(crate) async fn orchestrate_endpoint_kv_client( + manager: Arc, + endpoint: &str, + mut operation: impl FnMut(Arc>) -> Fut, +) -> error::Result +where + M: KvEndpointClientManager, + Fut: Future> + Send, +{ + loop { + let client = manager.get_endpoint_client(endpoint).await?; + + let res = operation(client.clone()).await; + return match res { + Ok(r) => Ok(r), + Err(e) => { + if let Some(memdx_err) = e.is_memdx_error() { + if memdx_err.is_dispatch_error() { + // this was a dispatch error, so we can just try with + // a different client instead... + continue; + } + } + + Err(e) + } + }; + } +} diff --git a/sdk/couchbase-core/src/kvclient.rs b/sdk/couchbase-core/src/kvclient.rs index 34ddc3d7..141a6b5a 100644 --- a/sdk/couchbase-core/src/kvclient.rs +++ b/sdk/couchbase-core/src/kvclient.rs @@ -20,8 +20,9 @@ use crate::address::Address; use crate::auth_mechanism::AuthMechanism; use crate::authenticator::{Authenticator, UserPassPair}; use crate::error::Error; +use crate::error::ErrorKind::Memdx; use crate::error::{MemdxError, Result}; -use crate::memdx; +use crate::kvclient_babysitter::KvTarget; use crate::memdx::connection::{ConnectOptions, ConnectionType, TcpConnection, TlsConnection}; use crate::memdx::dispatcher::{ Dispatcher, DispatcherOptions, OrphanResponseHandler, UnsolicitedPacketHandler, @@ -34,6 +35,8 @@ use crate::memdx::request::{GetErrorMapRequest, HelloRequest, SelectBucketReques use crate::service_type::ServiceType; use crate::tls_config::TlsConfig; use crate::util::hostname_from_addr_str; +use crate::{error, memdx}; +use arc_swap::ArcSwap; use chrono::{DateTime, FixedOffset, Local, NaiveDateTime, Utc}; use futures::future::BoxFuture; use log::{debug, info, warn}; @@ -41,7 +44,7 @@ use std::future::Future; use std::net::SocketAddr; use std::ops::{Add, Deref}; use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicPtr, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, Mutex}; @@ -49,57 +52,61 @@ use tokio::time::Instant; use uuid::Uuid; #[derive(Clone)] -pub(crate) struct KvClientConfig { - pub address: Address, - pub tls: Option, +pub(crate) struct KvClientBootstrapOptions { pub client_name: String, - pub authenticator: Arc, - pub selected_bucket: Option, + pub disable_error_map: bool, pub disable_mutation_tokens: bool, pub disable_server_durations: bool, + + pub on_err_map_fetched: Option, + pub tcp_keep_alive_time: Duration, pub auth_mechanisms: Vec, pub connect_timeout: Duration, - pub tcp_keep_alive_time: Duration, } -impl PartialEq for KvClientConfig { +impl PartialEq for KvClientBootstrapOptions { fn eq(&self, other: &Self) -> bool { - // TODO: compare root certs or something somehow. - self.address == other.address - && self.client_name == other.client_name - && self.selected_bucket == other.selected_bucket + self.client_name == other.client_name && self.disable_error_map == other.disable_error_map && self.disable_server_durations == other.disable_server_durations && self.disable_mutation_tokens == other.disable_mutation_tokens } } -pub(crate) type OnKvClientCloseHandler = - Arc BoxFuture<'static, ()> + Send + Sync>; - -pub(crate) type OnErrMapFetchedHandler = Arc; - -pub(crate) type UnsolicitedPacketSender = mpsc::UnboundedSender; - #[derive(Clone)] pub(crate) struct KvClientOptions { + pub address: KvTarget, + pub authenticator: Authenticator, + pub selected_bucket: Option, + + pub bootstrap_options: KvClientBootstrapOptions, + pub endpoint_id: String, + pub unsolicited_packet_tx: Option, pub orphan_handler: Option, pub on_close: OnKvClientCloseHandler, - pub on_err_map_fetched: Option, pub disable_decompression: bool, pub id: String, } +pub(crate) type OnKvClientCloseHandler = + Arc BoxFuture<'static, ()> + Send + Sync>; + +pub(crate) type OnErrMapFetchedHandler = Arc; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct UnsolicitedPacket { + pub packet: ResponsePacket, + pub endpoint_id: String, +} + +pub(crate) type UnsolicitedPacketSender = mpsc::UnboundedSender; + pub(crate) trait KvClient: Sized + PartialEq + Send + Sync { - fn new( - config: KvClientConfig, - opts: KvClientOptions, - ) -> impl Future> + Send; - fn reconfigure(&self, config: KvClientConfig) -> impl Future> + Send; + fn new(opts: KvClientOptions) -> impl Future> + Send; + fn select_bucket(&self, bucket_name: String) -> impl Future> + Send; fn has_feature(&self, feature: HelloFeature) -> bool; - fn load_factor(&self) -> f64; fn remote_hostname(&self) -> &str; fn remote_addr(&self) -> SocketAddr; fn local_addr(&self) -> SocketAddr; @@ -113,17 +120,14 @@ pub(crate) struct StdKvClient { remote_addr: SocketAddr, local_addr: SocketAddr, remote_hostname: String, + endpoint_id: String, - pending_operations: u64, cli: D, - current_config: Mutex, + closed: Arc, + on_close: OnKvClientCloseHandler, supported_features: Vec, - // selected_bucket atomically stores the currently selected bucket, - // so that we can use it in our errors. Note that it is set before - // we send the operation to select the bucket, since things happen - // asynchronously and we do not support changing selected buckets. pub(crate) selected_bucket: std::sync::Mutex>, pub(crate) last_activity_timestamp_micros: AtomicI64, @@ -144,7 +148,7 @@ impl KvClient for StdKvClient where D: Dispatcher, { - async fn new(config: KvClientConfig, opts: KvClientOptions) -> Result> { + async fn new(opts: KvClientOptions) -> Result> { let mut requested_features = vec![ HelloFeature::DataType, HelloFeature::Xattr, @@ -166,41 +170,44 @@ where HelloFeature::PreserveExpiry, ]; - if !config.disable_mutation_tokens { + if !opts.bootstrap_options.disable_mutation_tokens { requested_features.push(HelloFeature::SeqNo) } - if !config.disable_server_durations { + if !opts.bootstrap_options.disable_server_durations { requested_features.push(HelloFeature::Durations); } - let boostrap_hello = if !config.client_name.is_empty() { + let boostrap_hello = if !opts.bootstrap_options.client_name.is_empty() { Some(HelloRequest { - client_name: Vec::from(config.client_name.clone()), + client_name: Vec::from(opts.bootstrap_options.client_name.clone()), requested_features, }) } else { None }; - let bootstrap_get_error_map = if !config.disable_error_map { + let bootstrap_get_error_map = if !opts.bootstrap_options.disable_error_map { Some(GetErrorMapRequest { version: 2 }) } else { None }; - let creds = match config.authenticator.as_ref() { + let address = opts.address.address; + + let creds = match opts.authenticator { Authenticator::PasswordAuthenticator(a) => { - Some(a.get_credentials(&ServiceType::MEMD, config.address.to_string())?) + Some(a.get_credentials(&ServiceType::MEMD, address.to_string())?) } - Authenticator::CertificateAuthenticator(a) => None, + Authenticator::CertificateAuthenticator(_a) => None, }; let bootstrap_auth = if let Some(creds) = creds { Some(SASLAuthAutoOptions { username: creds.username.clone(), password: creds.password.clone(), - enabled_mechs: config + enabled_mechs: opts + .bootstrap_options .auth_mechanisms .iter() .cloned() @@ -212,8 +219,7 @@ where }; let bootstrap_select_bucket = - config - .selected_bucket + opts.selected_bucket .as_ref() .map(|bucket_name| SelectBucketRequest { bucket_name: bucket_name.clone(), @@ -232,15 +238,21 @@ where debug!( "Kvclient {} assigning client id {} for {}", - &id, &client_id, &config.address + &id, &client_id, &address ); let unsolicited_packet_tx = opts.unsolicited_packet_tx.clone(); let on_close = opts.on_close.clone(); + let endpoint_id = opts.endpoint_id.clone(); let memdx_client_opts = DispatcherOptions { - on_connection_close_handler: Arc::new(move || { + on_read_close_handler: Arc::new(move || { // There's not much to do when the connection closes so just mark us as closed. - closed_clone.store(true, Ordering::SeqCst); + if closed_clone.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + != Ok(true) + { + return Box::pin(async move {}); + } + let on_close = on_close.clone(); let read_id = read_id.clone(); @@ -251,9 +263,13 @@ where orphan_handler: opts.orphan_handler, unsolicited_packet_handler: Arc::new(move |p| { let unsolicited_packet_tx = unsolicited_packet_tx.clone(); + let endpoint_id = endpoint_id.clone(); Box::pin(async move { if let Some(sender) = unsolicited_packet_tx { - if let Err(e) = sender.send(p) { + if let Err(e) = sender.send(UnsolicitedPacket { + packet: p, + endpoint_id, + }) { warn!("Failed to send unsolicited packet {e:?}"); }; } @@ -263,13 +279,13 @@ where id: client_id, }; - let conn = if let Some(tls) = config.tls.clone() { + let conn = if let Some(tls) = opts.address.tls_config { let conn = match TlsConnection::connect( - config.address.clone(), + address.clone(), tls, ConnectOptions { - deadline: Instant::now().add(config.connect_timeout), - tcp_keep_alive_time: config.tcp_keep_alive_time, + deadline: Instant::now().add(opts.bootstrap_options.connect_timeout), + tcp_keep_alive_time: opts.bootstrap_options.tcp_keep_alive_time, }, ) .await @@ -277,17 +293,17 @@ where Ok(conn) => conn, Err(e) => { return Err(Error::new_contextual_memdx_error( - MemdxError::new(e).with_dispatched_to(config.address.to_string()), + MemdxError::new(e).with_dispatched_to(address.to_string()), )) } }; ConnectionType::Tls(conn) } else { let conn = match TcpConnection::connect( - config.address.clone(), + address.clone(), ConnectOptions { - deadline: Instant::now().add(config.connect_timeout), - tcp_keep_alive_time: config.tcp_keep_alive_time, + deadline: Instant::now().add(opts.bootstrap_options.connect_timeout), + tcp_keep_alive_time: opts.bootstrap_options.tcp_keep_alive_time, }, ) .await @@ -295,7 +311,7 @@ where Ok(conn) => conn, Err(e) => { return Err(Error::new_contextual_memdx_error( - MemdxError::new(e).with_dispatched_to(config.address.to_string()), + MemdxError::new(e).with_dispatched_to(address.to_string()), )) } }; @@ -304,7 +320,7 @@ where let remote_addr = *conn.peer_addr(); let local_addr = *conn.local_addr(); - let remote_hostname = hostname_from_addr_str(config.address.host.as_str()); + let remote_hostname = hostname_from_addr_str(address.host.as_str()); let mut cli = D::new(conn, memdx_client_opts); @@ -312,9 +328,10 @@ where remote_addr, local_addr, remote_hostname, - pending_operations: 0, + endpoint_id: opts.endpoint_id, cli, - current_config: Mutex::new(config), + closed, + on_close: opts.on_close, supported_features: vec![], selected_bucket: std::sync::Mutex::new(None), id: id.clone(), @@ -350,7 +367,7 @@ where kv_cli.supported_features = hello.enabled_features; } - if let Some(handler) = opts.on_err_map_fetched { + if let Some(handler) = opts.bootstrap_options.on_err_map_fetched { if let Some(err_map) = res.error_map { handler(&err_map.error_map); } @@ -360,79 +377,41 @@ where Ok(kv_cli) } - async fn reconfigure(&self, config: KvClientConfig) -> Result<()> { - debug!("Reconfiguring KvClient {}", &self.id); - let mut current_config = self.current_config.lock().await; + async fn select_bucket(&self, bucket_name: String) -> Result<()> { + debug!("Selecting bucket on KvClient {}", &self.id); - // TODO: compare root certs or something somehow. - if !(current_config.address == config.address - && current_config.client_name == config.client_name - && current_config.disable_error_map == config.disable_error_map - && current_config.disable_server_durations == config.disable_server_durations - && current_config.disable_mutation_tokens == config.disable_mutation_tokens) { - return Err(Error::new_invalid_argument_error( - "cannot reconfigure due to conflicting options", - None, - )); - } - - let selected_bucket_name = if current_config.selected_bucket != config.selected_bucket { - if current_config.selected_bucket.is_some() { + let mut guard = self.selected_bucket.lock().unwrap(); + let selected_bucket = guard.as_ref(); + if selected_bucket.is_some() { return Err(Error::new_invalid_argument_error( - "cannot reconfigure from one selected bucket to another", - None, + "cannot select bucket when a bucket is already selected", + Some("bucket_name".to_string()), )); } - current_config - .selected_bucket - .clone_from(&config.selected_bucket); - config.selected_bucket.clone() - } else { - None - }; - - if *current_config.deref() != config { - return Err(Error::new_invalid_argument_error( - "client config after reconfigure did not match new configuration", - None, - )); + *guard = Some(bucket_name.clone()); } - if let Some(bucket_name) = selected_bucket_name { - { - let mut current_bucket = self.selected_bucket.lock().unwrap(); - *current_bucket = Some(bucket_name.clone()); - } - - match self - .select_bucket(SelectBucketRequest { bucket_name }) - .await - { - Ok(_) => {} - Err(_e) => { - { - let mut current_bucket = self.selected_bucket.lock().unwrap(); - *current_bucket = None; - } - - current_config.selected_bucket = None; - } + match self + .select_bucket(SelectBucketRequest { + bucket_name: bucket_name.clone(), + }) + .await + { + Ok(_r) => Ok(()), + Err(e) => { + let mut guard = self.selected_bucket.lock().unwrap(); + *guard = None; + Err(Error::new(Memdx(e))) } } - - Ok(()) } fn has_feature(&self, feature: HelloFeature) -> bool { self.supported_features.contains(&feature) } - fn load_factor(&self) -> f64 { - 0.0 - } - fn remote_hostname(&self) -> &str { &self.remote_hostname } @@ -454,10 +433,24 @@ where } async fn close(&self) -> Result<()> { + if self + .closed + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + != Ok(true) + { + return Ok(()); + } + + info!("Closing kvclient {}", self.id); + self.cli .close() .await - .map_err(|e| Error::new_contextual_memdx_error(MemdxError::new(e))) + .map_err(|e| Error::new_contextual_memdx_error(MemdxError::new(e)))?; + + (self.on_close)(self.id.clone()).await; + + Ok(()) } fn id(&self) -> &str { @@ -473,3 +466,27 @@ where self.id == other.id } } + +impl StdKvClient +where + D: Dispatcher, +{ + pub(crate) async fn mark_closed(&self) { + if self + .closed + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + != Ok(true) + { + return; + } + + if let Err(e) = self.cli.close().await { + debug!( + "Failed to close connection for kvclient {}: {}", + &self.id, e + ); + } + + (self.on_close)(self.id.clone()).await; + } +} diff --git a/sdk/couchbase-core/src/kvclient_babysitter.rs b/sdk/couchbase-core/src/kvclient_babysitter.rs new file mode 100644 index 00000000..403099ce --- /dev/null +++ b/sdk/couchbase-core/src/kvclient_babysitter.rs @@ -0,0 +1,521 @@ +/* + * + * * Copyright (c) 2025 Couchbase, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +use crate::address::Address; +use crate::authenticator::Authenticator; +use crate::connection_state::ConnectionState; +use crate::error::{Error, ErrorKind}; +use crate::kvclient::{ + KvClient, KvClientBootstrapOptions, KvClientOptions, OnKvClientCloseHandler, + UnsolicitedPacketSender, +}; +use crate::kvclient_ops::KvClientOps; +use crate::memdx::dispatcher::OrphanResponseHandler; +use crate::memdx::op_bootstrap::BootstrapOptions; +use crate::memdx::packet::ResponsePacket; +use crate::orphan_reporter::OrphanContext; +use crate::results::diagnostics::EndpointDiagnostics; +use crate::service_type::ServiceType; +use crate::tls_config::TlsConfig; +use crate::{authenticator, error}; +use arc_swap::ArcSwap; +use chrono::Utc; +use futures_core::future::BoxFuture; +use log::{debug, info, warn}; +use std::error::Error as stdError; +use std::future::Future; +use std::mem::take; +use std::ops::{Add, Sub}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::select; +use tokio::sync::mpsc::Sender; +use tokio::sync::{watch, MutexGuard}; +use tokio::time::{sleep, Instant}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +#[derive(Clone, Debug)] +pub(crate) struct KvTarget { + pub address: Address, + pub tls_config: Option, +} + +pub(crate) type KvClientStateChangeHandler = + Arc>, Option) -> BoxFuture<'static, ()> + Send + Sync>; + +pub(crate) trait KvClientBabysitter { + type Client: KvClient + KvClientOps + Send + Sync; + + fn new(opts: KvClientBabysitterOptions) -> Self; + fn id(&self) -> &str; + fn get_client(&self) -> impl Future>> + Send; + fn endpoint_diagnostics(&self) -> EndpointDiagnostics; + fn update_auth(&self, authenticator: Authenticator) -> impl Future + Send; + // async fn update_selected_bucket(&self, bucket_name: String); + fn close(&self) -> impl Future> + Send; +} + +#[derive(Clone)] +pub(crate) struct KvClientBabysitterClientConfig { + pub target: KvTarget, + pub auth: Authenticator, + pub selected_bucket: Option, +} + +pub(crate) struct KvClientBabysitterOptions { + pub id: String, + + pub connect_throttle_period: Duration, + pub disable_decompression: bool, + pub bootstrap_opts: KvClientBootstrapOptions, + pub endpoint_id: String, + + pub state_change_handler: KvClientStateChangeHandler, + + pub unsolicited_packet_tx: Option, + pub orphan_handler: Option, + + pub target: KvTarget, + pub auth: Authenticator, + pub selected_bucket: Option, +} + +#[derive(Debug, Clone)] +struct ConnectionError { + pub connect_error: Error, + pub connect_error_time: Instant, +} + +struct StdKvClientBabysitterState { + // current_config: Option, + desired_config: KvClientBabysitterClientConfig, + connect_err: Option, + client: Option>, + current_state: ConnectionState, +} + +struct StdKvClientBabysitterClientState { + client: Option>, +} + +#[derive(Clone)] +struct StaticKvClientOptions { + pub bootstrap_options: KvClientBootstrapOptions, + + pub disable_decompression: bool, + pub unsolicited_packet_tx: Option, + pub orphan_handler: Option, +} + +struct ClientThreadOptions { + id: String, + endpoint_id: String, + + connect_throttle_period: Duration, + + static_kv_client_options: StaticKvClientOptions, + + // on_client_close_tx: watch::Sender, + state_change_handler: KvClientStateChangeHandler, + on_client_connected_tx: watch::Sender>>, + + fast_client: Arc>>, + slow_state: Arc>>, + + shutdown_token: CancellationToken, +} + +pub(crate) struct StdKvClientBabysitter { + id: String, + + connect_throttle_period: Duration, + + state_change_handler: KvClientStateChangeHandler, + on_client_connected_tx: watch::Sender>>, + + kv_client_options: StaticKvClientOptions, + + fast_client: Arc>>, + slow_state: Arc>>, + + shutdown_token: CancellationToken, +} + +impl StdKvClientBabysitter { + async fn maybe_throttle_on_error( + babysitter_id: &str, + throttle_period: Duration, + connection_error: Option, + shutdown_token: &CancellationToken, + ) -> error::Result<()> { + if let Some(e) = connection_error { + let elapsed = e.connect_error_time.elapsed(); + if elapsed < throttle_period { + let to_sleep = throttle_period.sub(elapsed); + debug!( + "Client pool {} throttling new connection attempt for {:?}", + &babysitter_id, to_sleep + ); + return select! { + _ = shutdown_token.cancelled() => { + debug!("Client babysitter {babysitter_id} shutdown notified during throttle sleep"); + Err(ErrorKind::Shutdown.into()) + } + _ = sleep(to_sleep) => Ok(()), + }; + } + } + + Ok(()) + } + + async fn create_client_with_shutdown( + babysitter_id: &str, + opts: KvClientOptions, + shutdown_token: &CancellationToken, + ) -> error::Result { + select! { + _ = shutdown_token.cancelled() => { + debug!("Client babysitter {babysitter_id} shutdown notified during client creation"); + Err(ErrorKind::Shutdown.into()) + } + c = K::new(opts) => c, + } + } + + fn begin_client_build(client_opts: Arc>) { + let state = client_opts.slow_state.clone(); + let client_id = Uuid::new_v4().to_string(); + + let opts = { + let desired_config = { + let guard = state.lock().unwrap(); + + guard.desired_config.clone() + }; + + let on_close_opts = client_opts.clone(); + + KvClientOptions { + address: desired_config.target.clone(), + authenticator: desired_config.auth.clone(), + selected_bucket: desired_config.selected_bucket.clone(), + bootstrap_options: client_opts + .static_kv_client_options + .bootstrap_options + .clone(), + endpoint_id: client_opts.endpoint_id.clone(), + unsolicited_packet_tx: client_opts + .static_kv_client_options + .unsolicited_packet_tx + .clone(), + orphan_handler: client_opts.static_kv_client_options.orphan_handler.clone(), + on_close: Arc::new(move |client_id| { + let babysitter_id = on_close_opts.id.clone(); + let opts_clone = on_close_opts.clone(); + let state_clone = on_close_opts.slow_state.clone(); + let fast_client_clone = on_close_opts.fast_client.clone(); + let state_change_handler = on_close_opts.state_change_handler.clone(); + + Box::pin(async move { + { + let mut guard = state_clone.lock().unwrap(); + if let Some(cli) = &guard.client { + if cli.id() != client_id { + return; + } + } else { + return; + } + + guard.client = None; + fast_client_clone + .store(Arc::new(StdKvClientBabysitterClientState { client: None })); + } + + state_change_handler(babysitter_id, None, None).await; + + Self::begin_client_build(opts_clone); + }) + }), + disable_decompression: client_opts.static_kv_client_options.disable_decompression, + id: client_id.clone(), + } + }; + + tokio::spawn(async move { + loop { + let connect_err = { + let mut guard = state.lock().unwrap(); + guard.connect_err.clone() + }; + if Self::maybe_throttle_on_error( + &client_opts.id, + client_opts.connect_throttle_period, + connect_err, + &client_opts.shutdown_token, + ) + .await + .is_err() + { + debug!( + "Client babysitter {} shutdown during connection throttling", + &client_opts.id + ); + return; + }; + + let opts = { + let mut guard = state.lock().unwrap(); + guard.current_state = ConnectionState::Connecting; + + let mut opts = opts.clone(); + opts.authenticator = guard.desired_config.auth.clone(); + opts.address = guard.desired_config.target.clone(); + opts.selected_bucket = guard.desired_config.selected_bucket.clone(); + + opts + }; + + match Self::create_client_with_shutdown( + &client_opts.id, + opts, + &client_opts.shutdown_token, + ) + .await + { + Ok(client) => { + let client = Arc::new(client); + debug!( + "Client babysitter {} changing client {} connection state to Connected", + &client_opts.id, + client.id() + ); + + { + let mut guard = state.lock().unwrap(); + guard.current_state = ConnectionState::Connected; + guard.client = Some(client.clone()); + } + + client_opts + .fast_client + .store(Arc::new(StdKvClientBabysitterClientState { + client: Some(client.clone()), + })); + + match client_opts + .on_client_connected_tx + .send(Some(client.clone())) + { + Ok(_) => {} + Err(e) => { + warn!("Client babysitter {} error sending new client notification: {}", &client_opts.id, e); + } + } + + (client_opts.state_change_handler)( + client_opts.id.clone(), + Some(client), + None, + ) + .await; + + return; + } + Err(e) => { + client_opts + .fast_client + .store(Arc::new(StdKvClientBabysitterClientState { client: None })); + let mut msg = format!( + "Client babysitter {} error creating new client {}", + &client_opts.id, e + ); + if *e.kind() == ErrorKind::Shutdown { + return; + } + + if let Some(source) = e.source() { + msg = format!("{msg} - {source}"); + } + debug!("{msg}"); + + let mut guard = state.lock().unwrap(); + + guard.current_state = ConnectionState::Disconnected; + guard.connect_err = Some(ConnectionError { + connect_error: e, + connect_error_time: Instant::now(), + }); + } + } + } + }); + } +} + +impl KvClientBabysitter for StdKvClientBabysitter { + type Client = K; + + fn new(opts: KvClientBabysitterOptions) -> StdKvClientBabysitter { + let (on_client_connected_tx, _) = watch::channel(None); + let babysitter = StdKvClientBabysitter { + id: opts.id, + connect_throttle_period: opts.connect_throttle_period, + state_change_handler: opts.state_change_handler, + on_client_connected_tx, + kv_client_options: StaticKvClientOptions { + bootstrap_options: opts.bootstrap_opts, + unsolicited_packet_tx: opts.unsolicited_packet_tx, + orphan_handler: opts.orphan_handler, + disable_decompression: opts.disable_decompression, + }, + fast_client: Arc::new(ArcSwap::from_pointee(StdKvClientBabysitterClientState { + client: None, + })), + slow_state: Arc::new(Mutex::new(StdKvClientBabysitterState { + // current_config: None, + desired_config: KvClientBabysitterClientConfig { + target: opts.target, + auth: opts.auth, + selected_bucket: opts.selected_bucket, + }, + connect_err: None, + client: None, + current_state: ConnectionState::Disconnected, + })), + shutdown_token: CancellationToken::new(), + }; + + Self::begin_client_build(Arc::new(ClientThreadOptions { + id: babysitter.id.clone(), + endpoint_id: opts.endpoint_id, + connect_throttle_period: babysitter.connect_throttle_period, + static_kv_client_options: babysitter.kv_client_options.clone(), + state_change_handler: babysitter.state_change_handler.clone(), + on_client_connected_tx: babysitter.on_client_connected_tx.clone(), + fast_client: babysitter.fast_client.clone(), + slow_state: babysitter.slow_state.clone(), + shutdown_token: babysitter.shutdown_token.clone(), + })); + + babysitter + } + + fn id(&self) -> &str { + &self.id + } + + async fn get_client(&self) -> error::Result> { + let state = self.fast_client.load(); + if let Some(client) = &state.client { + return Ok(client.clone()); + } + + { + let guard = self.slow_state.lock().unwrap(); + if let Some(client) = &guard.client { + return Ok(client.clone()); + } + } + + let mut rx = self.on_client_connected_tx.subscribe(); + loop { + let changed = select! { + () = self.shutdown_token.cancelled() => { + return Err(Error::new_message_error("babysitter shutdown")) + }, + (res) = rx.changed() => res + }; + + match changed { + Ok(_) => { + if let Some(client) = rx.borrow_and_update().clone() { + return Ok(client); + } + } + Err(e) => { + debug!( + "Client babysitter {} failed to wait for client to become available: {}", + &self.id, e + ); + + return Err(Error::new_message_error(format!( + "client babysitter failed to wait for client to become available {e}" + ))); + } + } + } + } + + fn endpoint_diagnostics(&self) -> EndpointDiagnostics { + let state = self.slow_state.lock().unwrap(); + + let connection_state = state.current_state; + + let (local_address, last_activity) = match &state.client { + Some(cli) => ( + Some(cli.local_addr().to_string()), + Some( + Utc::now() + .sub(cli.last_activity().to_utc()) + .num_microseconds() + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + + EndpointDiagnostics { + service_type: ServiceType::MEMD, + id: self.id.to_string(), + local_address, + remote_address: state.desired_config.target.address.to_string(), + last_activity, + namespace: state.desired_config.selected_bucket.clone(), + state: connection_state, + } + } + + async fn update_auth(&self, authenticator: Authenticator) { + let mut guard = self.slow_state.lock().unwrap(); + guard.desired_config.auth = authenticator; + } + + async fn close(&self) -> error::Result<()> { + info!("Closing babysitter {}", self.id); + self.shutdown_token.cancel(); + + let client = { + let mut guard = self.slow_state.lock().unwrap(); + + self.fast_client + .store(Arc::new(StdKvClientBabysitterClientState { client: None })); + + take(&mut guard.client) + }; + + if let Some(client) = client { + client.close().await?; + } + + (self.state_change_handler)(self.id.clone(), None, None); + + Ok(()) + } +} diff --git a/sdk/couchbase-core/src/kvclient_ops.rs b/sdk/couchbase-core/src/kvclient_ops.rs index ccdd21de..cd592852 100644 --- a/sdk/couchbase-core/src/kvclient_ops.rs +++ b/sdk/couchbase-core/src/kvclient_ops.rs @@ -108,14 +108,14 @@ where D: Dispatcher, { fn bucket_name(&self) -> Option { - let guard = self.selected_bucket.lock().unwrap(); - guard.clone() + self.selected_bucket.lock().unwrap().clone() } async fn set(&self, req: SetRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().set(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().set(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -123,8 +123,9 @@ where async fn get(&self, req: GetRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().get(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().get(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -132,8 +133,9 @@ where async fn get_meta(&self, req: GetMetaRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().get_meta(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().get_meta(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -141,8 +143,9 @@ where async fn delete(&self, req: DeleteRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().delete(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().delete(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -151,7 +154,8 @@ where async fn get_and_lock(&self, req: GetAndLockRequest<'_>) -> KvResult { self.update_last_activity(); let mut op = self - .handle_dispatch_side_result(self.ops_crud().get_and_lock(self.client(), req).await)?; + .handle_dispatch_side_result(self.ops_crud().get_and_lock(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -160,7 +164,8 @@ where async fn get_and_touch(&self, req: GetAndTouchRequest<'_>) -> KvResult { self.update_last_activity(); let mut op = self - .handle_dispatch_side_result(self.ops_crud().get_and_touch(self.client(), req).await)?; + .handle_dispatch_side_result(self.ops_crud().get_and_touch(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -168,8 +173,9 @@ where async fn unlock(&self, req: UnlockRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().unlock(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().unlock(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -177,8 +183,9 @@ where async fn touch(&self, req: TouchRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().touch(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().touch(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -186,8 +193,9 @@ where async fn add(&self, req: AddRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().add(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().add(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -195,8 +203,9 @@ where async fn replace(&self, req: ReplaceRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().replace(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().replace(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -204,8 +213,9 @@ where async fn append(&self, req: AppendRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().append(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().append(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -213,8 +223,9 @@ where async fn prepend(&self, req: PrependRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().prepend(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().prepend(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -222,8 +233,9 @@ where async fn increment(&self, req: IncrementRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().increment(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().increment(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -231,8 +243,9 @@ where async fn decrement(&self, req: DecrementRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().decrement(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().decrement(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -240,8 +253,9 @@ where async fn lookup_in(&self, req: LookupInRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().lookup_in(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().lookup_in(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -249,8 +263,9 @@ where async fn mutate_in(&self, req: MutateInRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(self.ops_crud().mutate_in(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(self.ops_crud().mutate_in(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -262,7 +277,8 @@ where ) -> KvResult { self.update_last_activity(); let mut op = self - .handle_dispatch_side_result(OpsCore {}.get_cluster_config(self.client(), req).await)?; + .handle_dispatch_side_result(OpsCore {}.get_cluster_config(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -274,7 +290,8 @@ where ) -> KvResult { self.update_last_activity(); let mut op = self - .handle_dispatch_side_result(OpsUtil {}.get_collection_id(self.client(), req).await)?; + .handle_dispatch_side_result(OpsUtil {}.get_collection_id(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -282,7 +299,9 @@ where async fn ping(&self, req: PingRequest<'_>) -> KvResult { self.update_last_activity(); - let mut op = self.handle_dispatch_side_result(OpsUtil {}.ping(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(OpsUtil {}.ping(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) @@ -298,12 +317,18 @@ where .store(Utc::now().timestamp_micros(), Ordering::SeqCst); } - fn handle_dispatch_side_result(&self, result: memdx::error::Result) -> KvResult { + async fn handle_dispatch_side_result(&self, result: memdx::error::Result) -> KvResult { match result { Ok(v) => Ok(v), - Err(e) => Err(MemdxError::new(e) - .with_dispatched_to(self.remote_addr().to_string()) - .with_dispatched_from(self.local_addr().to_string())), + Err(e) => { + if let memdx::error::ErrorKind::Dispatch { .. } = e.kind() { + self.mark_closed().await; + } + + Err(MemdxError::new(e) + .with_dispatched_to(self.remote_addr().to_string()) + .with_dispatched_from(self.local_addr().to_string())) + } } } @@ -328,8 +353,9 @@ where pub async fn select_bucket(&self, req: SelectBucketRequest) -> KvResult { self.update_last_activity(); - let mut op = - self.handle_dispatch_side_result(OpsCore {}.select_bucket(self.client(), req).await)?; + let mut op = self + .handle_dispatch_side_result(OpsCore {}.select_bucket(self.client(), req).await) + .await?; let res = self.handle_response_side_result(op.recv().await)?; Ok(res) diff --git a/sdk/couchbase-core/src/kvclientmanager.rs b/sdk/couchbase-core/src/kvclientmanager.rs deleted file mode 100644 index 67e2aea3..00000000 --- a/sdk/couchbase-core/src/kvclientmanager.rs +++ /dev/null @@ -1,363 +0,0 @@ -/* - * - * * Copyright (c) 2025 Couchbase, Inc. - * * - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -use std::collections::HashMap; -use std::future::Future; -use std::sync::Arc; -use std::time::Duration; - -use arc_swap::ArcSwap; -use log::debug; -use tokio::sync::Mutex; - -use crate::error::ErrorKind; -use crate::error::Result; -use crate::kvclient::{KvClient, KvClientConfig, OnErrMapFetchedHandler, UnsolicitedPacketSender}; -use crate::kvclient_ops::KvClientOps; -use crate::kvclientpool::{ - KvClientPool, KvClientPoolClient, KvClientPoolConfig, KvClientPoolOptions, -}; -use crate::memdx::dispatcher::OrphanResponseHandler; - -pub(crate) type KvClientManagerClientType = - <::Pool as KvClientPool>::Client; - -pub(crate) trait KvClientManager: Sized + Send + Sync { - type Pool: KvClientPool + Send + Sync; - - fn new( - config: KvClientManagerConfig, - opts: KvClientManagerOptions, - ) -> impl Future> + Send; - fn reconfigure(&self, config: KvClientManagerConfig) - -> impl Future> + Send; - fn get_client( - &self, - endpoint: &str, - ) -> impl Future>>> + Send; - fn get_random_client( - &self, - ) -> impl Future>>> + Send; - async fn get_client_per_endpoint(&self) -> Result>>>; - async fn get_all_clients( - &self, - ) -> Result>>>; - fn get_all_pools(&self) -> HashMap>; - fn shutdown_client( - &self, - endpoint: Option<&str>, - client: Arc>, - ) -> impl Future> + Send; -} - -pub(crate) struct KvClientManagerConfig { - pub num_pool_connections: usize, - pub clients: HashMap, -} - -#[derive(Clone)] -pub(crate) struct KvClientManagerOptions { - pub connect_timeout: Duration, - pub connect_throttle_period: Duration, - pub unsolicited_packet_tx: Option, - pub orphan_handler: Option, - pub on_err_map_fetched_handler: Option, - pub disable_decompression: bool, -} - -#[derive(Debug, Default, Clone)] -struct KvClientManagerState

-where - P: KvClientPool, -{ - pub client_pools: HashMap>, -} - -pub(crate) struct StdKvClientManager

-where - P: KvClientPool, -{ - closed: Mutex, - state: ArcSwap>, - opts: KvClientManagerOptions, -} - -impl

StdKvClientManager

-where - P: KvClientPool, -{ - async fn get_pool(&self, endpoint: &str) -> Result> { - let state = self.state.load(); - - let pool = match state.client_pools.get(endpoint) { - Some(p) => p, - None => { - return Err(ErrorKind::EndpointNotKnown { - endpoint: endpoint.to_string(), - } - .into()); - } - }; - - Ok(pool.clone()) - } - - async fn get_random_pool(&self) -> Result> { - let state = self.state.load(); - - // Just pick one at random for now - if let Some((_, pool)) = state.client_pools.iter().next() { - return Ok(pool.clone()); - } - - Err(ErrorKind::NoEndpointsAvailable.into()) - } - - async fn create_pool(&self, pool_config: KvClientPoolConfig) -> Arc

{ - let pool = P::new( - pool_config.clone(), - KvClientPoolOptions { - connect_timeout: self.opts.connect_timeout, - connect_throttle_period: self.opts.connect_throttle_period, - unsolicited_packet_tx: self.opts.unsolicited_packet_tx.clone(), - orphan_handler: self.opts.orphan_handler.clone(), - disable_decompression: self.opts.disable_decompression, - on_err_map_fetched: self.opts.on_err_map_fetched_handler.clone(), - }, - ) - .await; - - Arc::new(pool) - } - - async fn shutdown_random_client( - &self, - client: Arc>, - ) -> Result<()> { - let state = self.state.load(); - - for (_, pool) in state.client_pools.iter() { - pool.shutdown_client(client.clone()).await; - } - - Ok(()) - } -} - -impl

KvClientManager for StdKvClientManager

-where - P: KvClientPool, -{ - type Pool = P; - - async fn new(config: KvClientManagerConfig, opts: KvClientManagerOptions) -> Result { - let manager = Self { - closed: Mutex::new(false), - state: ArcSwap::from_pointee(KvClientManagerState { - client_pools: Default::default(), - }), - opts, - }; - - manager.reconfigure(config).await?; - Ok(manager) - } - - async fn reconfigure(&self, config: KvClientManagerConfig) -> Result<()> { - let mut guard = self.closed.lock().await; - if *guard { - return Err(ErrorKind::IllegalState { - msg: "reconfigure called after close".to_string(), - } - .into()); - } - - debug!("Reconfiguring client manager"); - - let mut new_state = KvClientManagerState::

{ - client_pools: Default::default(), - }; - - let state = self.state.load(); - - let mut old_pools = HashMap::new(); - old_pools.clone_from(&state.client_pools); - - for (endpoint, endpoint_config) in config.clients { - let pool_config = KvClientPoolConfig { - num_connections: config.num_pool_connections, - client_config: endpoint_config, - }; - - let old_pool = old_pools.remove(&endpoint); - let new_pool = if let Some(pool) = old_pool { - match pool.reconfigure(pool_config.clone()).await { - Ok(_) => pool, - Err(e) => { - debug!("Failed to reconfigure client pool: {}", e); - self.create_pool(pool_config).await - } - } - } else { - self.create_pool(pool_config).await - }; - - new_state.client_pools.insert(endpoint, new_pool); - } - - for (_, pool) in old_pools { - // TODO: log? - pool.close().await.unwrap_or_default(); - } - - self.state.store(Arc::from(new_state)); - - Ok(()) - } - - async fn get_client(&self, endpoint: &str) -> Result>> { - let pool = self.get_pool(endpoint).await?; - - pool.get_client().await - } - - async fn get_random_client(&self) -> Result>> { - let pool = self.get_random_pool().await?; - - pool.get_client().await - } - - async fn get_client_per_endpoint(&self) -> Result>>> { - let state = self.state.load(); - - let mut clients = Vec::with_capacity(state.client_pools.len()); - for pool in state.client_pools.values() { - clients.push(pool.get_client().await?); - } - - Ok(clients) - } - - async fn get_all_clients( - &self, - ) -> Result>>> { - let state = self.state.load(); - - let mut clients = HashMap::new(); - for pool in state.client_pools.values() { - clients.extend(pool.get_all_clients().await?); - } - - Ok(clients) - } - - fn get_all_pools(&self) -> HashMap> { - let state = self.state.load(); - let mut pools = HashMap::new(); - for (endpoint, pool) in state.client_pools.iter() { - pools.insert(endpoint.clone(), pool.clone()); - } - - pools - } - - async fn shutdown_client( - &self, - endpoint: Option<&str>, - client: Arc>, - ) -> Result<()> { - if let Some(ep) = endpoint { - let pool = self.get_pool(ep).await?; - - pool.shutdown_client(client).await; - - return Ok(()); - } - - // we don't know which endpoint this belongs to, so we need to send the - // shutdown request to all the possibilities... - self.shutdown_random_client(client).await - } -} - -pub(crate) async fn orchestrate_memd_client( - manager: Arc, - endpoint: &str, - mut operation: impl FnMut(Arc>) -> Fut, -) -> Result -where - M: KvClientManager, - Fut: Future> + Send, -{ - let client = manager.get_client(endpoint).await?; - - let res = operation(client.clone()).await; - match res { - Ok(r) => Ok(r), - Err(e) => { - if let Some(memdx_err) = e.is_memdx_error() { - if memdx_err.is_dispatch_error() { - // TODO: Log something - manager - .shutdown_client(Some(endpoint), client) - .await - .unwrap_or_default(); - - return Err(e); - } - } - - Err(e) - } - } -} - -pub(crate) async fn orchestrate_random_memd_client( - manager: Arc, - mut operation: impl FnMut(Arc>) -> Fut, -) -> Result -where - M: KvClientManager, - Fut: Future> + Send, -{ - loop { - let client = manager.get_random_client().await?; - - let res = operation(client.clone()).await; - let res = match res { - Ok(r) => Ok(r), - Err(e) => { - if let Some(memdx_err) = e.is_memdx_error() { - if memdx_err.is_dispatch_error() { - // This was a dispatch error, so we can just try with - // a different client instead... - // TODO: Log something - manager - .shutdown_client(None, client) - .await - .unwrap_or_default(); - continue; - } - } - - Err(e) - } - }; - return res; - } -} diff --git a/sdk/couchbase-core/src/kvclientpool.rs b/sdk/couchbase-core/src/kvclientpool.rs index dcf62f75..f876f50a 100644 --- a/sdk/couchbase-core/src/kvclientpool.rs +++ b/sdk/couchbase-core/src/kvclientpool.rs @@ -26,19 +26,25 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use crate::authenticator::Authenticator; use crate::connection_state::ConnectionState; use crate::error; use crate::error::Result; use crate::error::{Error, ErrorKind}; use crate::kvclient::{ - KvClient, KvClientConfig, KvClientOptions, OnErrMapFetchedHandler, OnKvClientCloseHandler, - UnsolicitedPacketSender, + KvClient, KvClientBootstrapOptions, KvClientOptions, OnErrMapFetchedHandler, + OnKvClientCloseHandler, UnsolicitedPacketSender, }; +use crate::kvclient_babysitter::{KvClientBabysitter, KvClientBabysitterOptions, KvTarget}; use crate::kvclient_ops::KvClientOps; use crate::memdx::dispatcher::{Dispatcher, OrphanResponseHandler, UnsolicitedPacketHandler}; +use crate::memdx::request::PingRequest; +use crate::memdx::response::PingResponse; +use crate::results::diagnostics::EndpointDiagnostics; use arc_swap::ArcSwap; use futures::executor::block_on; -use log::{debug, warn}; +use futures::future::join_all; +use log::{debug, error, info, warn}; use tokio::select; use tokio::sync::mpsc::Sender; use tokio::sync::{broadcast, Mutex, MutexGuard, Notify}; @@ -46,728 +52,247 @@ use tokio::time::{sleep, Instant}; use tokio_util::sync::CancellationToken; use urlencoding::decode_binary; use uuid::Uuid; -// TODO: This needs some work, some more thought should go into the locking strategy as it's possible -// there are still races in this. Additionally it's extremely easy to write in deadlocks. -pub(crate) trait KvClientPool: Sized + Send + Sync { +pub(crate) trait KvClientPool: Send + Sync { type Client: KvClient + KvClientOps + Send + Sync; - fn new( - config: KvClientPoolConfig, - opts: KvClientPoolOptions, - ) -> impl Future + Send; + fn new(opts: KvClientPoolOptions) -> impl Future + Send; + fn id(&self) -> &str; fn get_client(&self) -> impl Future>> + Send; - fn shutdown_client(&self, client: Arc) -> impl Future + Send; - fn close(&self) -> impl Future> + Send; - fn reconfigure(&self, config: KvClientPoolConfig) -> impl Future> + Send; - fn get_all_clients( + fn ping_all_clients( &self, - ) -> impl Future>>> + Send; - async fn get_bucket(&self) -> Option; -} - -#[derive(Clone)] -pub(crate) struct KvClientPoolConfig { - pub num_connections: usize, - pub client_config: KvClientConfig, + req: PingRequest, + ) -> impl Future>> + Send; + fn endpoint_diagnostics(&self) -> impl Future> + Send; + fn update_auth(&self, authenticator: Authenticator) -> impl Future + Send; + // async fn update_selected_bucket(&self, bucket_name: String); + fn close(&self) -> impl Future> + Send; } pub(crate) struct KvClientPoolOptions { - pub connect_timeout: Duration, + pub num_connections: usize, pub connect_throttle_period: Duration, - pub unsolicited_packet_tx: Option, - pub orphan_handler: Option, - pub on_err_map_fetched: Option, pub disable_decompression: bool, -} + pub bootstrap_options: KvClientBootstrapOptions, + pub endpoint_id: String, -#[derive(Debug, Clone)] -struct ConnectionError { - pub connect_error: Error, - pub connect_error_time: Instant, -} - -#[derive(Debug)] -pub(crate) struct KvClientPoolClient { - pub client: Option>, - pub connection_state: ConnectionState, -} + pub target: KvTarget, + pub auth: Authenticator, + pub selected_bucket: Option, -impl Clone for KvClientPoolClient { - fn clone(&self) -> Self { - KvClientPoolClient { - client: self.client.clone(), - connection_state: self.connection_state, - } - } + pub unsolicited_packet_tx: Option, + pub orphan_handler: Option, } -impl KvClientPoolClient { - pub fn new() -> Self { - KvClientPoolClient { - client: None, - connection_state: ConnectionState::Disconnected, - } - } - - pub fn with_connection_state(connection_state: ConnectionState) -> Self { - KvClientPoolClient { - client: None, - connection_state, - } - } - - pub fn with_client(client: Arc, connection_state: ConnectionState) -> Self { - KvClientPoolClient { - client: Some(client), - connection_state, - } - } +struct KvClientPoolFastMap { + clients: Vec>, } -type KvClientPoolClients = HashMap>; - -struct KvClientPoolClientState { - current_config: KvClientConfig, - - connection_error: Option, - - clients: KvClientPoolClients, - reconfiguring_clients: KvClientPoolClients, +#[derive(Clone)] +struct KvClientPoolEntry +where + B: KvClientBabysitter, + K: KvClient + KvClientOps, +{ + babysitter: Arc, + client: Option>, } -struct KvClientPoolClientInner { - connect_timeout: Duration, - connect_throttle_period: Duration, - - unsolicited_packet_tx: Option, - orphan_handler: Option, - on_client_close_tx: Sender, - on_err_map_fetched: Option, - - disable_decompression: bool, - - num_connections_wanted: AtomicUsize, - fast_map: ArcSwap>>, - +pub(crate) struct StdKvClientPool +where + B: KvClientBabysitter, + K: KvClient + KvClientOps, +{ id: String, - state: Arc>>, - client_idx: AtomicUsize, - new_client_watcher_tx: broadcast::Sender<()>, - // We hold onto this to prevent the sender from erroring. - new_client_watcher_rx: broadcast::Receiver<()>, - - closed: AtomicBool, - shutdown_token: CancellationToken, -} + client_idx: AtomicUsize, + fast_map: Arc>>, -pub(crate) struct NaiveKvClientPool { - inner: Arc>, + babysitters: Arc>>>, } -impl KvClientPoolClientInner +impl KvClientPool for StdKvClientPool where - K: KvClient + KvClientOps + PartialEq + Sync + Send + 'static, + B: KvClientBabysitter + Send + 'static + std::marker::Sync, + K: KvClient + KvClientOps + 'static, { - pub async fn get_client(&self) -> Result> { - { - let fm = self.fast_map.load(); + type Client = K; - if !fm.is_empty() { - let idx = self.client_idx.fetch_add(1, Ordering::SeqCst); - if let Some(client) = fm.get(idx % fm.len()) { - return Ok(client.clone()); - } - } - } + async fn new(opts: KvClientPoolOptions) -> Self { + let id = Uuid::new_v4().to_string(); + debug!( + "Creating new client pool {} for {} - {:?}", + &id, &opts.target.address, &opts.selected_bucket + ); - self.get_client_slow().await - } + let fast_map = Arc::new(ArcSwap::from_pointee(KvClientPoolFastMap { + clients: vec![], + })); - pub async fn close(&self) -> Result<()> { - if self.closed.swap(true, Ordering::SeqCst) { - return Err(ErrorKind::Shutdown.into()); - } + let babysitters: Arc>>> = + Arc::new(Mutex::new(Vec::with_capacity(opts.num_connections))); - debug!("Closing pool {}", &self.id); + let babysitters_clone = babysitters.clone(); + let fast_map_clone = fast_map.clone(); { - let mut state = self.state.lock().await; - for (_id, client) in state.clients.iter() { - // TODO: probably log - if let Some(client) = &client.client { - client.close().await.unwrap_or_default(); - } - } - - state.clients = HashMap::new(); - } + let mut babysitters_guard = babysitters.lock().await; + for idx in 0..opts.num_connections { + let babysitters_clone = babysitters_clone.clone(); + let fast_map_clone = fast_map_clone.clone(); + let babysitter = KvClientBabysitter::new(KvClientBabysitterOptions { + id: Uuid::new_v4().to_string(), + endpoint_id: opts.endpoint_id.clone(), + + connect_throttle_period: opts.connect_throttle_period, + disable_decompression: opts.disable_decompression, + bootstrap_opts: opts.bootstrap_options.clone(), + state_change_handler: Arc::new(move |babysitter_id, client, error| { + let babysitters_clone = babysitters_clone.clone(); + let fast_map_clone = fast_map_clone.clone(); + Box::pin(async move { + let mut guard = babysitters_clone.lock().await; + + let entry = guard + .iter_mut() + .find(|entry| entry.babysitter.id() == babysitter_id); + if let Some(entry) = entry { + entry.client = client; + } - Ok(()) - } + let mut clients = vec![]; + for entry in guard.iter() { + if let Some(client) = &entry.client { + clients.push(client.clone()); + } + } - pub async fn check_connections(&self) { - let mut state = self.state.lock().await; - let num_clients = state.clients.len() as isize; - let num_wanted_clients = self.num_connections_wanted.load(Ordering::SeqCst) as isize; - let num_needed_clients = num_wanted_clients - num_clients; - - if num_needed_clients > 0 { - for _ in 0..num_needed_clients { - let client_id = Uuid::new_v4().to_string(); - let config = state.current_config.clone(); - state - .clients - .insert(client_id.clone(), KvClientPoolClient::new()); - - self.start_new_client(config, client_id).await; + fast_map_clone.store(Arc::new(KvClientPoolFastMap { clients })); + }) + }), + unsolicited_packet_tx: opts.unsolicited_packet_tx.clone(), + orphan_handler: opts.orphan_handler.clone(), + target: opts.target.clone(), + auth: opts.auth.clone(), + selected_bucket: opts.selected_bucket.clone(), + }); + + babysitters_guard.insert( + idx, + KvClientPoolEntry { + babysitter: Arc::new(babysitter), + client: None, + }, + ); } } - if num_needed_clients < 0 { - let num_excess_clients = num_clients - num_wanted_clients; - let mut clients = &mut state.clients; - let mut ids_to_remove = vec![]; - for (id, client) in clients.iter_mut() { - if let Some(cli) = &client.client { - client.connection_state = ConnectionState::Disconnecting; - cli.close().await.unwrap_or_default(); - ids_to_remove.push(id.clone()); - } - - if ids_to_remove.len() >= num_excess_clients as usize { - break; - } - } - for id in ids_to_remove { - clients.remove(&id); - } + StdKvClientPool { + id, + client_idx: Default::default(), + fast_map, + babysitters, } } - pub async fn reconfigure(&self, config: KvClientPoolConfig) -> Result<()> { - debug!("Reconfiguring client pool {}", &self.id); - self.num_connections_wanted - .store(config.num_connections, Ordering::SeqCst); - - { - let mut state = self.state.lock().await; - if state.current_config != config.client_config { - let mut clients_to_remove = Vec::new(); - for (id, client) in state.clients.iter_mut() { - if let Some(cli) = &client.client { - client.connection_state = ConnectionState::Connecting; - let res = select! { - _ = self.shutdown_token.cancelled() => { - debug!("Client pool {} shutdown notified during client reconfigure", &self.id); - return Ok(()); - } - res = cli.reconfigure(config.client_config.clone()) => res - }; - - if let Err(e) = res { - // TODO: log here. - dbg!(e); - cli.close().await.unwrap_or_default(); - client.connection_state = ConnectionState::Disconnected; - clients_to_remove.push(id.clone()); - continue; - }; - - client.connection_state = ConnectionState::Connected; - } - } - - for id in clients_to_remove { - state.clients.remove(&id); - } - } - } - - self.check_connections().await; - self.rebuild_fast_map().await; - - Ok(()) + fn id(&self) -> &str { + &self.id } - pub async fn get_all_clients(&self) -> Result>> { - if self.closed.load(Ordering::SeqCst) { - return Err(ErrorKind::Shutdown.into()); + async fn get_client(&self) -> Result> { + let fast_map = self.fast_map.load(); + let num_fast_map_connections = fast_map.clients.len(); + if num_fast_map_connections > 0 { + let client_idx = self.client_idx.fetch_add(1, Ordering::Relaxed); + let client = fast_map.clients[client_idx % num_fast_map_connections].clone(); + return Ok(client); } - let guard = self.state.lock().await; - let mut clients = HashMap::new(); - for (id, client) in &guard.clients { - clients.insert(id.clone(), client.clone()); - } + debug!("Client pool {} no client found in fast_map", self.id); - Ok(clients) + self.get_client_slow().await } - async fn get_client_slow(&self) -> Result> { - if self.closed.load(Ordering::SeqCst) { - return Err(ErrorKind::Shutdown.into()); - } - + async fn ping_all_clients(&self, req: PingRequest<'_>) -> Vec> { + let mut babysitters = vec![]; { - let state = self.state.lock().await; - let clients = &state.clients; - let available_clients: Vec<_> = clients - .iter() - .filter_map(|(_id, c)| c.client.as_ref().map(|client| client.clone())) - .collect(); - if !available_clients.is_empty() { - let idx = self.client_idx.fetch_add(1, Ordering::SeqCst); - if let Some(client) = available_clients.get(idx % available_clients.len()) { - return Ok(client.clone()); - }; - } + let guard = self.babysitters.lock().await; - if let Some(e) = &state.connection_error { - return Err(e.connect_error.clone()); + for babysitter_entry in guard.iter() { + babysitters.push(babysitter_entry.babysitter.clone()) } } - let mut rx = self.new_client_watcher_tx.subscribe(); - let _ = rx.recv().await; - Box::pin(self.get_client_slow()).await - } - - pub async fn handle_client_close(&self, client_id: String) { - // TODO: not sure the ordering of close leading to here is great. - if self.closed.load(Ordering::SeqCst) { - debug!( - "Client pool {} is closed, ignoring client close for {}", - &self.id, &client_id - ); - return; + let mut pool_handles = Vec::with_capacity(babysitters.len()); + for babysitter in babysitters { + let req = req.clone(); + let handle = async move { + let client = babysitter.get_client().await?; + client + .ping(req) + .await + .map_err(Error::new_contextual_memdx_error) + }; + + pool_handles.push(handle); } - { - let mut state = self.state.lock().await; - let mut clients = &mut state.clients; - // If the client is not in the pool, we don't need to do anything. - clients.remove(&client_id); - } - - self.check_connections().await; - self.rebuild_fast_map().await; - } - - async fn rebuild_fast_map(&self) { - let state = self.state.lock().await; - let clients = &state.clients; - let mut new_map = Vec::new(); - for client in clients.values() { - if let Some(client) = &client.client { - new_map.push(client.clone()); - } - } - self.fast_map.store(Arc::from(new_map)); + join_all(pool_handles).await } - pub async fn shutdown_client(&self, client: Arc) { - { - let mut state = self.state.lock().await; - let mut clients = &mut state.clients; - clients.remove(client.id()); - } - - self.rebuild_fast_map().await; - - // TODO: Should log - client.close().await.unwrap_or_default(); - } + async fn endpoint_diagnostics(&self) -> Vec { + let babysitters = self.babysitters.lock().await; - async fn maybe_throttle_on_error( - pool_id: &str, - throttle_period: Duration, - guard: &mut MutexGuard<'_, KvClientPoolClientState>, - shutdown_token: &CancellationToken, - ) -> Result<()> { - if let Some(e) = &guard.connection_error { - let elapsed = e.connect_error_time.elapsed(); - if elapsed < throttle_period { - let to_sleep = throttle_period.sub(elapsed); - debug!( - "Client pool {} throttling new connection attempt for {:?}", - &pool_id, to_sleep - ); - return select! { - _ = shutdown_token.cancelled() => { - debug!("Client pool {pool_id} shutdown notified during throttle sleep"); - Err(ErrorKind::Shutdown.into()) - } - _ = sleep(to_sleep) => Ok(()), - }; - } + let mut diags = vec![]; + for babysitter_entry in babysitters.iter() { + diags.push(babysitter_entry.babysitter.endpoint_diagnostics()); } - Ok(()) - } - - fn update_client_state_if_exists( - mut guard: MutexGuard<'_, KvClientPoolClientState>, - id: &str, - state: ConnectionState, - ) { - if let Some(client) = guard.clients.get_mut(id) { - client.connection_state = state; - } + diags } - async fn create_client_with_shutdown( - pool_id: &str, - config: KvClientConfig, - opts: KvClientOptions, - shutdown_token: &CancellationToken, - ) -> Result { - select! { - _ = shutdown_token.cancelled() => { - debug!("Client pool {pool_id} shutdown notified during client creation"); - Err(ErrorKind::Shutdown.into()) - } - c = K::new(config, opts) => c, + async fn update_auth(&self, authenticator: Authenticator) { + let babysitters = self.babysitters.lock().await; + for babysitter_entry in babysitters.iter() { + babysitter_entry + .babysitter + .update_auth(authenticator.clone()) + .await; } } - async fn start_new_client_thread( - state: Arc>>, - throttle_period: Duration, - pool_id: String, - config: KvClientConfig, - opts: KvClientOptions, - shutdown_token: CancellationToken, - on_new_client_tx: broadcast::Sender<()>, - ) { - loop { - { - let mut guard = state.lock().await; - if Self::maybe_throttle_on_error( - &pool_id, - throttle_period, - &mut guard, - &shutdown_token, - ) - .await - .is_err() - { - debug!( - "Client pool {} shutdown during connection throttling", - &pool_id - ); - return; - }; - - Self::update_client_state_if_exists(guard, &opts.id, ConnectionState::Connecting); - } - - match Self::create_client_with_shutdown( - &pool_id, - config.clone(), - opts.clone(), - &shutdown_token, - ) - .await - { - Ok(r) => { - debug!( - "Client pool {} created client successfully: {}", - &pool_id, - r.id() - ); - let new_config = { - let mut guard = state.lock().await; - guard.connection_error = None; - - if config == guard.current_config { - let mut clients = &mut guard.clients; - if let Some(mut client) = clients.get_mut(r.id()) { - debug!("Client pool {} changing client {} connection state to Connected", &pool_id, r.id()); - client.connection_state = ConnectionState::Connected; - client.client = Some(Arc::new(r)); - - drop(guard); - - match on_new_client_tx.send(()) { - Ok(_) => {} - Err(e) => { - warn!("Client pool {pool_id} error sending new client notification: {e}"); - } - }; - } else { - drop(guard); - - match r.close().await { - Ok(_) => {} - Err(e) => { - debug!( - "Client pool {} error closing client {}: {}", - &pool_id, - r.id(), - e - ); - } - } - - continue; - }; - - return; - } - - guard.current_config.clone() - }; - - match r.reconfigure(new_config).await { - Ok(_) => { - debug!( - "Client pool {} reconfigured client {} to new config", - &pool_id, - r.id() - ); - let mut guard = state.lock().await; - let mut clients = &mut guard.clients; - if let Some(mut client) = clients.get_mut(r.id()) { - client.connection_state = ConnectionState::Connected; - client.client = Some(Arc::new(r)); - - drop(guard); - - match on_new_client_tx.send(()) { - Ok(_) => {} - Err(e) => { - warn!("Client pool {pool_id} error sending new client notification: {e}"); - } - }; - - return; - } else { - drop(guard); - - match r.close().await { - Ok(_) => {} - Err(e) => { - debug!( - "Client pool {} error closing client {}: {}", - &pool_id, - r.id(), - e - ); - } - } + async fn close(&self) -> Result<()> { + info!("Closing pool {}", self.id); - continue; - }; - } - Err(e) => { - let mut msg = format!( - "Client pool {} failed to reconfigure client {}: {}", - &pool_id, - r.id(), - e - ); - if let Some(source) = e.source() { - msg = format!("{msg} - {source}"); - } - debug!("{msg}"); - - match r.close().await { - Ok(_) => {} - Err(e) => { - debug!( - "Client pool {} error closing client {}: {}", - &pool_id, - r.id(), - e - ); - } - } + self.fast_map + .swap(Arc::new(KvClientPoolFastMap { clients: vec![] })); - continue; - } - }; - } - Err(e) => { - let mut msg = format!("Client pool {pool_id} error creating new client {e}"); - if *e.kind() == ErrorKind::Shutdown { - return; - } - - if let Some(source) = e.source() { - msg = format!("{msg} - {source}"); - } - debug!("{msg}"); - - let mut guard = state.lock().await; - - guard.connection_error = Some(ConnectionError { - connect_error: e, - connect_error_time: Instant::now(), - }); - } + let mut babysitters = self.babysitters.lock().await; + for babysitter_entry in babysitters.drain(..) { + if let Err(e) = babysitter_entry.babysitter.close().await { + debug!("Failed to close babysitter: {e:?}"); } } - } - - async fn start_new_client(&self, config: KvClientConfig, id: String) { - debug!( - "Client pool {} creating new client with id {}", - &self.id, &id - ); - - let on_client_close_tx = self.on_client_close_tx.clone(); - let state = self.state.clone(); - let opts = KvClientOptions { - unsolicited_packet_tx: self.unsolicited_packet_tx.clone(), - orphan_handler: self.orphan_handler.clone(), - on_close: Arc::new(move |client_id| { - let on_client_close_tx = on_client_close_tx.clone(); - Box::pin(async move { - if let Err(e) = on_client_close_tx.send(client_id).await { - debug!("Failed to send client close notification: {e}"); - } - }) - }), - disable_decompression: self.disable_decompression, - on_err_map_fetched: self.on_err_map_fetched.clone(), - id: id.clone(), - }; - let throttle_period = self.connect_throttle_period; - - let tx = self.new_client_watcher_tx.clone(); - let shutdown_token = self.shutdown_token.child_token(); - - let pool_id = self.id.clone(); - - tokio::spawn(Self::start_new_client_thread( - state, - throttle_period, - pool_id, - config, - opts, - shutdown_token, - tx, - )); - } - - async fn get_bucket(&self) -> Option { - let state = self.state.lock().await; - state.current_config.selected_bucket.clone() + Ok(()) } } -impl KvClientPool for NaiveKvClientPool +impl StdKvClientPool where - K: KvClient + KvClientOps + PartialEq + Sync + Send + 'static, + B: KvClientBabysitter, + K: KvClient + KvClientOps, { - type Client = K; - - async fn new(config: KvClientPoolConfig, opts: KvClientPoolOptions) -> Self { - let id = uuid::Uuid::new_v4().to_string(); - debug!( - "Creating new client pool {} for {}", - &id, &config.client_config.address - ); - - let (on_client_close_tx, on_client_close_rx) = tokio::sync::mpsc::channel(1); - let mut clients = HashMap::with_capacity(config.num_connections); - for _ in 0..config.num_connections { - clients.insert(Uuid::new_v4().to_string(), KvClientPoolClient::new()); - } - - let (new_client_watcher_tx, new_client_watcher_rx) = broadcast::channel(1); - - let mut inner = Arc::new(KvClientPoolClientInner { - connect_timeout: opts.connect_timeout, - connect_throttle_period: opts.connect_throttle_period, - - num_connections_wanted: AtomicUsize::new(config.num_connections), - client_idx: AtomicUsize::new(0), - fast_map: ArcSwap::from_pointee(vec![]), - - id, - state: Arc::new(Mutex::new(KvClientPoolClientState { - current_config: config.client_config, - connection_error: None, - clients, - reconfiguring_clients: Default::default(), - })), - - unsolicited_packet_tx: opts.unsolicited_packet_tx, - orphan_handler: opts.orphan_handler.clone(), - on_client_close_tx, - on_err_map_fetched: opts.on_err_map_fetched, - - disable_decompression: opts.disable_decompression, - - new_client_watcher_tx, - new_client_watcher_rx, - - closed: AtomicBool::new(false), - shutdown_token: CancellationToken::new(), - }); - - { - let inner_clone = Arc::downgrade(&inner); - tokio::spawn(async move { - let mut on_client_close_rx = on_client_close_rx; - while let Some(client_id) = on_client_close_rx.recv().await { - if let Some(inner) = inner_clone.upgrade() { - inner.handle_client_close(client_id).await; - } else { - debug!("Client close handler exited"); - return; - } - } - }); - } - - { - let state = inner.state.lock().await; - for id in state.clients.keys() { - let config = state.current_config.clone(); - inner.start_new_client(config, id.clone()).await; - } - } - - inner.check_connections().await; - - NaiveKvClientPool { inner } - } - - async fn get_client(&self) -> Result> { - self.inner.get_client().await - } - - async fn shutdown_client(&self, client: Arc) { - self.inner.shutdown_client(client).await; - } - - async fn close(&self) -> Result<()> { - self.inner.close().await - } - - async fn reconfigure(&self, config: KvClientPoolConfig) -> Result<()> { - self.inner.reconfigure(config).await - } - - async fn get_all_clients(&self) -> Result>> { - self.inner.get_all_clients().await - } + async fn get_client_slow(&self) -> Result> { + let babysitter = { + let babysitters = self.babysitters.lock().await; + let client_idx = self.client_idx.fetch_add(1, Ordering::Relaxed); - async fn get_bucket(&self) -> Option { - self.inner.get_bucket().await - } -} + babysitters[client_idx % babysitters.len()] + .babysitter + .clone() + }; -impl Drop for KvClientPoolClientInner -where - K: KvClient, -{ - fn drop(&mut self) { - self.shutdown_token.cancel(); + babysitter.get_client().await } } diff --git a/sdk/couchbase-core/src/kvendpointclientmanager.rs b/sdk/couchbase-core/src/kvendpointclientmanager.rs new file mode 100644 index 00000000..c7a11967 --- /dev/null +++ b/sdk/couchbase-core/src/kvendpointclientmanager.rs @@ -0,0 +1,315 @@ +/* + * + * * Copyright (c) 2025 Couchbase, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +use crate::authenticator::Authenticator; +use crate::error; +use crate::kvclient::{KvClient, KvClientBootstrapOptions, UnsolicitedPacketSender}; +use crate::kvclient_babysitter::{KvClientBabysitter, KvTarget}; +use crate::kvclient_ops::KvClientOps; +use crate::kvclientpool::{KvClientPool, KvClientPoolOptions}; +use crate::memdx::dispatcher::OrphanResponseHandler; +use crate::memdx::request::PingRequest; +use crate::memdx::response::PingResponse; +use crate::results::diagnostics::EndpointDiagnostics; +use arc_swap::ArcSwap; +use futures::future::join_all; +use futures::AsyncWriteExt; +use log::{debug, error}; +use std::collections::HashMap; +use std::future::Future; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use uuid::Uuid; + +pub(crate) trait KvEndpointClientManager: Sized + Send + Sync { + type Client: KvClient + KvClientOps + Send + Sync; + + async fn new(opts: KvEndpointClientManagerOptions) -> error::Result; + + fn get_client(&self) -> impl Future>> + Send; + fn get_endpoint_client( + &self, + endpoint: &str, + ) -> impl Future>> + Send; + fn update_endpoints( + &self, + endpoints: HashMap, + add_only: bool, + ) -> impl Future> + Send; + fn update_auth(&self, authenticator: Authenticator) -> impl Future + Send; + fn ping_all_clients( + &self, + req: PingRequest, + ) -> impl Future>>> + Send; + fn endpoint_diagnostics(&self) -> impl Future> + Send; + fn get_client_per_endpoint( + &self, + ) -> impl Future>>> + Send; + // async fn update_selected_bucket(&self, bucket_name: String); +} + +pub(crate) type KvEndpointClientManagerCloseHandler = Arc; + +pub(crate) struct KvEndpointClientManagerOptions { + pub on_close_handler: KvEndpointClientManagerCloseHandler, + + pub num_pool_connections: usize, + pub connect_throttle_period: Duration, + pub disable_decompression: bool, + pub bootstrap_options: KvClientBootstrapOptions, + pub unsolicited_packet_tx: Option, + pub orphan_handler: Option, + + pub endpoints: HashMap, + pub authenticator: Authenticator, + pub selected_bucket: Option, +} + +struct KvEndpointClientManagerFastState +where + P: KvClientPool, + K: KvClient, +{ + client_pools: HashMap>, +} + +struct KvEndpointClientManagerSlowState +where + P: KvClientPool, + K: KvClient, +{ + auth: Authenticator, + selected_bucket: Option, + client_pools: HashMap>, +} + +pub(crate) struct StdKvEndpointClientManager +where + P: KvClientPool, + K: KvClient, +{ + id: String, + + on_close_handler: KvEndpointClientManagerCloseHandler, + + num_pool_connections: usize, + connect_throttle_period: Duration, + disable_decompression: bool, + bootstrap_options: KvClientBootstrapOptions, + unsolicited_packet_tx: Option, + orphan_handler: Option, + + slow_state: Arc>>, + fast_state: ArcSwap>, +} + +impl KvEndpointClientManager for StdKvEndpointClientManager +where + P: KvClientPool, + K: KvClient + KvClientOps, +{ + type Client = K; + + async fn new(opts: KvEndpointClientManagerOptions) -> error::Result { + let slow_state = KvEndpointClientManagerSlowState { + auth: opts.authenticator, + selected_bucket: opts.selected_bucket, + client_pools: HashMap::new(), + }; + + let fast_state = KvEndpointClientManagerFastState { + client_pools: HashMap::new(), + }; + + let mgr = StdKvEndpointClientManager { + id: Uuid::new_v4().to_string(), + on_close_handler: opts.on_close_handler, + num_pool_connections: opts.num_pool_connections, + connect_throttle_period: opts.connect_throttle_period, + disable_decompression: opts.disable_decompression, + bootstrap_options: opts.bootstrap_options, + unsolicited_packet_tx: opts.unsolicited_packet_tx, + orphan_handler: opts.orphan_handler, + + slow_state: Arc::new(Mutex::new(slow_state)), + fast_state: ArcSwap::from_pointee(fast_state), + }; + + mgr.update_endpoints(opts.endpoints, false).await?; + + Ok(mgr) + } + + async fn get_client(&self) -> error::Result> { + let state = self.fast_state.load(); + + // Just pick one at random for now + if let Some(pool) = state.client_pools.values().next() { + return pool.get_client().await; + } + + Err(error::Error::new_message_error("invalid endpoint")) + } + + async fn get_endpoint_client(&self, endpoint: &str) -> error::Result> { + let state = self.fast_state.load(); + + let pool = match state.client_pools.get(endpoint) { + Some(p) => p, + None => { + return Err(error::Error::new_message_error("invalid endpoint")); + } + }; + + pool.get_client().await + } + + async fn update_endpoints( + &self, + endpoints: HashMap, + add_only: bool, + ) -> error::Result<()> { + debug!( + "Kvclientmanager {} updating endpoints to {:?}", + self.id, + endpoints.keys() + ); + + let mut slow_state = self.slow_state.lock().await; + + let mut old_pools = HashMap::with_capacity(slow_state.client_pools.len()); + for (pool_name, pool) in slow_state.client_pools.drain() { + old_pools.insert(pool_name, pool); + } + + let mut new_pools = HashMap::new(); + + for (endpoint, target) in endpoints.into_iter() { + let old_pool = old_pools.remove(&endpoint); + let pool = if let Some(old_pool) = old_pool { + old_pool + } else { + let pool = P::new(KvClientPoolOptions { + num_connections: self.num_pool_connections, + connect_throttle_period: self.connect_throttle_period, + disable_decompression: self.disable_decompression, + bootstrap_options: self.bootstrap_options.clone(), + endpoint_id: endpoint.clone(), + target, + auth: slow_state.auth.clone(), + selected_bucket: slow_state.selected_bucket.clone(), + unsolicited_packet_tx: self.unsolicited_packet_tx.clone(), + orphan_handler: self.orphan_handler.clone(), + }) + .await; + + Arc::new(pool) + }; + + new_pools.insert(endpoint, pool); + } + + if add_only { + // in add-only mode, we keep any existing pools that aren't in the new set + // this is useful for making sure all routers still work until we've updated + // the routers separately... + for (endpoint, pool) in old_pools.into_iter() { + new_pools.insert(endpoint, pool); + } + } else { + for pool in old_pools.into_values() { + let id = pool.id(); + if let Err(e) = pool.close().await { + debug!("Failed to close pool {id}: {e}"); + }; + } + } + + slow_state.client_pools = new_pools; + + let mut client_pools = HashMap::with_capacity(slow_state.client_pools.len()); + for (endpoint, pool) in slow_state.client_pools.iter() { + client_pools.insert(endpoint.clone(), pool.clone()); + } + + self.fast_state + .store(Arc::new(KvEndpointClientManagerFastState { client_pools })); + + Ok(()) + } + + async fn endpoint_diagnostics(&self) -> Vec { + let state = self.fast_state.load(); + + let mut diags = Vec::with_capacity(state.client_pools.len()); + for pool in state.client_pools.values() { + diags.extend(pool.endpoint_diagnostics().await); + } + + diags + } + + async fn update_auth(&self, authenticator: Authenticator) { + let state = self.slow_state.lock().await; + + for pool in state.client_pools.values() { + pool.update_auth(authenticator.clone()).await; + } + } + + async fn get_client_per_endpoint(&self) -> error::Result>> { + let state = self.fast_state.load(); + + let mut clients = Vec::with_capacity(state.client_pools.len()); + for pool in state.client_pools.values() { + let client = pool.get_client().await?; + clients.push(client); + } + + Ok(clients) + } + + async fn ping_all_clients( + &self, + req: PingRequest<'_>, + ) -> HashMap>> { + let state = self.fast_state.load(); + + let mut handles = vec![]; + for (endpoint, pool) in state.client_pools.iter() { + let req = req.clone(); + + let handle = async move { + let pool_id = pool.id(); + debug!("Pinging pool {pool_id}"); + (endpoint, pool.ping_all_clients(req).await) + }; + + handles.push(handle); + } + + let resp = join_all(handles).await; + let mut results = HashMap::new(); + for (endpoint, resp) in resp { + results.insert(endpoint.clone(), resp); + } + + results + } +} diff --git a/sdk/couchbase-core/src/lib.rs b/sdk/couchbase-core/src/lib.rs index 638892d5..c7ba3bd4 100644 --- a/sdk/couchbase-core/src/lib.rs +++ b/sdk/couchbase-core/src/lib.rs @@ -45,9 +45,10 @@ mod helpers; mod httpcomponent; pub mod httpx; mod kvclient; +mod kvclient_babysitter; mod kvclient_ops; -mod kvclientmanager; mod kvclientpool; +mod kvendpointclientmanager; pub mod memdx; pub mod mgmtcomponent; pub mod mgmtx; @@ -57,6 +58,7 @@ mod nmvbhandler; pub mod on_behalf_of; pub mod ondemand_agentmanager; pub mod options; +pub mod orphan_reporter; mod parsedconfig; pub mod querycomponent; pub mod queryx; @@ -73,6 +75,7 @@ mod util; mod vbucketmap; mod vbucketrouter; +mod componentconfigs; #[cfg(feature = "rustls-tls")] pub mod insecure_certverfier; -pub mod orphan_reporter; +mod kv_orchestration; diff --git a/sdk/couchbase-core/src/memdx/client.rs b/sdk/couchbase-core/src/memdx/client.rs index 2b09bac0..fbc05df1 100644 --- a/sdk/couchbase-core/src/memdx/client.rs +++ b/sdk/couchbase-core/src/memdx/client.rs @@ -47,7 +47,7 @@ use crate::memdx::codec::KeyValueCodec; use crate::memdx::connection::{ConnectionType, Stream}; use crate::memdx::datatype::DataTypeFlag; use crate::memdx::dispatcher::{ - Dispatcher, DispatcherOptions, OnConnectionCloseHandler, OrphanResponseHandler, + Dispatcher, DispatcherOptions, OnReadLoopCloseHandler, OrphanResponseHandler, UnsolicitedPacketHandler, }; use crate::memdx::error; @@ -82,7 +82,7 @@ struct ReadLoopOptions { pub client_id: String, pub unsolicited_packet_handler: UnsolicitedPacketHandler, pub orphan_handler: Option, - pub on_connection_close_tx: OnConnectionCloseHandler, + pub on_read_close_handler: OnReadLoopCloseHandler, pub on_close_cancel: CancellationToken, pub disable_decompression: bool, pub local_addr: SocketAddr, @@ -151,13 +151,13 @@ impl Client { client_id: &str, stream: FramedRead>, KeyValueCodec>, opaque_map: Arc>, - on_connection_close: OnConnectionCloseHandler, + on_read_loop_close: OnReadLoopCloseHandler, ) { drop(stream); Self::drain_opaque_map(opaque_map).await; - on_connection_close().await; + on_read_loop_close().await; debug!("{client_id} read loop shut down"); } @@ -170,7 +170,7 @@ impl Client { loop { select! { (_) = opts.on_close_cancel.cancelled() => { - Self::on_read_loop_close(&opts.client_id, stream, opaque_map, opts.on_connection_close_tx).await; + Self::on_read_loop_close(&opts.client_id, stream, opaque_map, opts.on_read_close_handler).await; return; }, (next) = stream.next() => { @@ -247,6 +247,8 @@ impl Client { Ok(_) => {} Err(e) => { debug!("Sending response to caller failed: {e}"); + Self::on_read_loop_close(&opts.client_id, stream, opaque_map, opts.on_read_close_handler).await; + return; } }; drop(context); @@ -264,11 +266,13 @@ impl Client { } Err(e) => { warn!("{} failed to read frame {}", opts.client_id, e); + Self::on_read_loop_close(&opts.client_id, stream, opaque_map, opts.on_read_close_handler).await; + return; } } } None => { - Self::on_read_loop_close(&opts.client_id, stream, opaque_map, opts.on_connection_close_tx).await; + Self::on_read_loop_close(&opts.client_id, stream, opaque_map, opts.on_read_close_handler).await; return; } } @@ -313,7 +317,7 @@ impl Dispatcher for Client { client_id: read_uuid, unsolicited_packet_handler: opts.unsolicited_packet_handler, orphan_handler: opts.orphan_handler, - on_connection_close_tx: opts.on_connection_close_handler, + on_read_close_handler: opts.on_read_close_handler, on_close_cancel: cancel_child, disable_decompression: opts.disable_decompression, local_addr, diff --git a/sdk/couchbase-core/src/memdx/dispatcher.rs b/sdk/couchbase-core/src/memdx/dispatcher.rs index 69082b3a..d43a66bf 100644 --- a/sdk/couchbase-core/src/memdx/dispatcher.rs +++ b/sdk/couchbase-core/src/memdx/dispatcher.rs @@ -31,12 +31,12 @@ use crate::orphan_reporter::OrphanContext; pub type UnsolicitedPacketHandler = Arc BoxFuture<'static, ()> + Send + Sync>; pub type OrphanResponseHandler = Arc; -pub type OnConnectionCloseHandler = Arc BoxFuture<'static, ()> + Send + Sync>; +pub type OnReadLoopCloseHandler = Arc BoxFuture<'static, ()> + Send + Sync>; pub struct DispatcherOptions { pub unsolicited_packet_handler: UnsolicitedPacketHandler, pub orphan_handler: Option, - pub on_connection_close_handler: OnConnectionCloseHandler, + pub on_read_close_handler: OnReadLoopCloseHandler, pub disable_decompression: bool, pub id: String, } diff --git a/sdk/couchbase-core/src/mgmtcomponent.rs b/sdk/couchbase-core/src/mgmtcomponent.rs index dc90bd55..2025fff2 100644 --- a/sdk/couchbase-core/src/mgmtcomponent.rs +++ b/sdk/couchbase-core/src/mgmtcomponent.rs @@ -62,7 +62,7 @@ pub(crate) struct MgmtComponent { #[derive(Debug)] pub(crate) struct MgmtComponentConfig { pub endpoints: HashMap, - pub authenticator: Arc, + pub authenticator: Authenticator, } pub(crate) struct MgmtComponentOptions { diff --git a/sdk/couchbase-core/src/options/agent.rs b/sdk/couchbase-core/src/options/agent.rs index 52550fec..96c02498 100644 --- a/sdk/couchbase-core/src/options/agent.rs +++ b/sdk/couchbase-core/src/options/agent.rs @@ -22,7 +22,6 @@ use crate::authenticator::Authenticator; use crate::memdx::dispatcher::OrphanResponseHandler; use crate::tls_config::TlsConfig; use std::fmt::Debug; -use std::sync::Arc; use std::time::Duration; #[derive(Clone)] @@ -204,6 +203,7 @@ impl Default for CompressionMode { #[non_exhaustive] pub struct ConfigPollerConfig { pub poll_interval: Duration, + pub fetch_timeout: Duration, } impl ConfigPollerConfig { @@ -215,12 +215,18 @@ impl ConfigPollerConfig { self.poll_interval = poll_interval; self } + + pub fn fetch_timeout(mut self, fetch_timeout: Duration) -> Self { + self.fetch_timeout = fetch_timeout; + self + } } impl Default for ConfigPollerConfig { fn default() -> Self { Self { poll_interval: Duration::from_millis(2500), + fetch_timeout: Duration::from_millis(2500), } } } @@ -228,6 +234,7 @@ impl Default for ConfigPollerConfig { #[derive(Clone, Debug, PartialEq)] #[non_exhaustive] pub struct KvConfig { + pub enable_error_map: bool, pub enable_mutation_tokens: bool, pub enable_server_durations: bool, pub num_connections: usize, @@ -240,6 +247,11 @@ impl KvConfig { Self::default() } + pub fn enable_error_map(mut self, enable: bool) -> Self { + self.enable_error_map = enable; + self + } + pub fn enable_mutation_tokens(mut self, enable: bool) -> Self { self.enable_mutation_tokens = enable; self @@ -269,6 +281,7 @@ impl KvConfig { impl Default for KvConfig { fn default() -> Self { Self { + enable_error_map: true, enable_mutation_tokens: true, enable_server_durations: true, num_connections: 1, diff --git a/sdk/couchbase-core/src/querycomponent.rs b/sdk/couchbase-core/src/querycomponent.rs index f064f217..8f6dcb6b 100644 --- a/sdk/couchbase-core/src/querycomponent.rs +++ b/sdk/couchbase-core/src/querycomponent.rs @@ -57,7 +57,7 @@ pub(crate) struct QueryComponent { #[derive(Debug)] pub(crate) struct QueryComponentConfig { pub endpoints: HashMap, - pub authenticator: Arc, + pub authenticator: Authenticator, } pub(crate) struct QueryComponentOptions { diff --git a/sdk/couchbase-core/src/searchcomponent.rs b/sdk/couchbase-core/src/searchcomponent.rs index 0cc35503..a56302fa 100644 --- a/sdk/couchbase-core/src/searchcomponent.rs +++ b/sdk/couchbase-core/src/searchcomponent.rs @@ -66,7 +66,7 @@ pub(crate) struct SearchComponentState { #[derive(Debug)] pub(crate) struct SearchComponentConfig { pub endpoints: HashMap, - pub authenticator: Arc, + pub authenticator: Authenticator, pub vector_search_enabled: bool, } diff --git a/sdk/couchbase-core/src/vbucketrouter.rs b/sdk/couchbase-core/src/vbucketrouter.rs index 6dff4e7a..4c70f307 100644 --- a/sdk/couchbase-core/src/vbucketrouter.rs +++ b/sdk/couchbase-core/src/vbucketrouter.rs @@ -174,7 +174,6 @@ where #[cfg(test)] mod tests { use crate::cbconfig::TerseConfig; - use crate::kvclientmanager::KvClientManager; use crate::vbucketmap::VbucketMap; use crate::vbucketrouter::{ NotMyVbucketConfigHandler, StdVbucketRouter, VbucketRouter, VbucketRouterOptions, diff --git a/sdk/couchbase/src/options/cluster_options.rs b/sdk/couchbase/src/options/cluster_options.rs index 1d8bb069..d58c3eaa 100644 --- a/sdk/couchbase/src/options/cluster_options.rs +++ b/sdk/couchbase/src/options/cluster_options.rs @@ -265,7 +265,8 @@ impl KvOptions { impl From for couchbase_core::options::agent::KvConfig { fn from(opts: KvOptions) -> Self { - let mut core_opts = couchbase_core::options::agent::KvConfig::default(); + let mut core_opts = + couchbase_core::options::agent::KvConfig::default().enable_error_map(true); if let Some(enable) = opts.enable_mutation_tokens { core_opts = core_opts.enable_mutation_tokens(enable); }