diff --git a/sdk/couchbase-connstr/src/lib.rs b/sdk/couchbase-connstr/src/lib.rs index 6b5afbfa..e7aac4f1 100644 --- a/sdk/couchbase-connstr/src/lib.rs +++ b/sdk/couchbase-connstr/src/lib.rs @@ -198,26 +198,40 @@ pub async fn resolve( conn_spec: ConnSpec, dns_config: impl Into>, ) -> error::Result { - let (default_port, has_explicit_scheme, use_ssl) = if let Some(scheme) = &conn_spec.scheme { + if let Some(scheme) = &conn_spec.scheme { match scheme.as_str() { - "couchbase" => (DEFAULT_MEMD_PORT, true, false), - "couchbases" => (DEFAULT_SSL_MEMD_PORT, true, true), - "couchbase2" => { - return handle_couchbase2_scheme(conn_spec); + "couchbase" => { + handle_couchbase_scheme(conn_spec, dns_config, DEFAULT_MEMD_PORT, true, false).await } - "" => (DEFAULT_MEMD_PORT, false, false), - _ => { - return Err(ErrorKind::InvalidArgument { - msg: "unrecognized scheme".to_string(), - arg: "scheme".to_string(), - } - .into()); + "couchbases" => { + handle_couchbase_scheme(conn_spec, dns_config, DEFAULT_SSL_MEMD_PORT, true, true) + .await } + "couchbase2" => handle_couchbase2_scheme(conn_spec), + "http" => handle_http_scheme(conn_spec, DEFAULT_LEGACY_HTTP_PORT, true, false).await, + "https" => handle_http_scheme(conn_spec, DEFAULT_LEGACY_HTTPS_PORT, true, true).await, + "" => { + handle_couchbase_scheme(conn_spec, dns_config, DEFAULT_MEMD_PORT, false, false) + .await + } + _ => Err(ErrorKind::InvalidArgument { + msg: "unrecognized scheme".to_string(), + arg: "scheme".to_string(), + } + .into()), } } else { - (DEFAULT_MEMD_PORT, false, false) - }; + handle_couchbase_scheme(conn_spec, dns_config, DEFAULT_MEMD_PORT, false, false).await + } +} +async fn handle_couchbase_scheme( + conn_spec: ConnSpec, + dns_config: impl Into>, + default_port: u16, + has_explicit_scheme: bool, + use_ssl: bool, +) -> error::Result { if let Some(srv_record) = conn_spec.srv_record() { match lookup_srv( &srv_record.scheme, @@ -358,6 +372,80 @@ fn handle_couchbase2_scheme(conn_spec: ConnSpec) -> error::Result error::Result { + if conn_spec.hosts.is_empty() { + let (memd_port, http_port) = if use_ssl { + (DEFAULT_SSL_MEMD_PORT, DEFAULT_LEGACY_HTTPS_PORT) + } else { + (DEFAULT_MEMD_PORT, DEFAULT_LEGACY_HTTP_PORT) + }; + + return Ok(ResolvedConnSpec { + use_ssl, + memd_hosts: vec![Address { + host: "127.0.0.1".to_string(), + port: memd_port, + }], + http_hosts: vec![Address { + host: "127.0.0.1".to_string(), + port: http_port, + }], + couchbase2_host: None, + srv_record: None, + options: conn_spec.options, + }); + } + + let mut memd_hosts = vec![]; + let mut http_hosts = vec![]; + for address in conn_spec.hosts { + if let Some(port) = &address.port { + if !has_explicit_scheme && address.port != Some(default_port) { + return Err(ErrorKind::InvalidArgument { + msg: "ambiguous port without scheme".to_string(), + arg: "port".to_string(), + } + .into()); + } + + http_hosts.push(Address { + host: address.host, + port: *port, + }); + } else { + let (memd_port, http_port) = if use_ssl { + (DEFAULT_SSL_MEMD_PORT, DEFAULT_LEGACY_HTTPS_PORT) + } else { + (DEFAULT_MEMD_PORT, DEFAULT_LEGACY_HTTP_PORT) + }; + + memd_hosts.push(Address { + host: address.host.clone(), + port: memd_port, + }); + + http_hosts.push(Address { + host: address.host, + port: http_port, + }); + } + } + + Ok(ResolvedConnSpec { + use_ssl, + memd_hosts, + http_hosts, + couchbase2_host: None, + srv_record: None, + options: conn_spec.options, + }) +} + async fn lookup_srv( scheme: &str, proto: &str, diff --git a/sdk/couchbase-core/Cargo.toml b/sdk/couchbase-core/Cargo.toml index a6d55954..fd9aaaec 100644 --- a/sdk/couchbase-core/Cargo.toml +++ b/sdk/couchbase-core/Cargo.toml @@ -42,14 +42,14 @@ tokio-util = { version = "0.7", features = ["codec"] } tokio-rustls = { version = "0.26.0", optional = true } +couchbase-connstr = { path = "../couchbase-connstr" } + [dev-dependencies] chrono = "0.4.38" env_logger = "0.11" envconfig = "0.10" serial_test = "3.2.0" -couchbase-connstr = { path = "../couchbase-connstr" } - [features] dhat-heap = ["dhat"] diff --git a/sdk/couchbase-core/src/address.rs b/sdk/couchbase-core/src/address.rs deleted file mode 100644 index e65cefa8..00000000 --- a/sdk/couchbase-core/src/address.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::fmt::Display; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Address { - pub host: String, - pub port: u16, -} - -impl Display for Address { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}:{}", self.host, self.port) - } -} diff --git a/sdk/couchbase-core/src/agent.rs b/sdk/couchbase-core/src/agent.rs index 503ab06d..49666b3c 100644 --- a/sdk/couchbase-core/src/agent.rs +++ b/sdk/couchbase-core/src/agent.rs @@ -1,4 +1,3 @@ -use crate::address::Address; use crate::auth_mechanism::AuthMechanism; use crate::authenticator::Authenticator; use crate::cbconfig::TerseConfig; @@ -48,6 +47,7 @@ use crate::vbucketrouter::{ use crate::{httpx, mgmtx}; use byteorder::BigEndian; +use couchbase_connstr::Address; use futures::executor::block_on; use log::{debug, error, info, warn}; use uuid::Uuid; diff --git a/sdk/couchbase-core/src/kvclient.rs b/sdk/couchbase-core/src/kvclient.rs index b5c054cd..0bf1eb05 100644 --- a/sdk/couchbase-core/src/kvclient.rs +++ b/sdk/couchbase-core/src/kvclient.rs @@ -1,4 +1,3 @@ -use crate::address::Address; use crate::auth_mechanism::AuthMechanism; use crate::authenticator::{Authenticator, UserPassPair}; use crate::error::Error; @@ -17,6 +16,7 @@ use crate::service_type::ServiceType; use crate::tls_config::TlsConfig; use crate::util::hostname_from_addr_str; use chrono::{DateTime, FixedOffset, Local, NaiveDateTime, Utc}; +use couchbase_connstr::Address; use futures::future::BoxFuture; use log::{debug, warn}; use std::future::Future; diff --git a/sdk/couchbase-core/src/lib.rs b/sdk/couchbase-core/src/lib.rs index 40fec4a9..f0373661 100644 --- a/sdk/couchbase-core/src/lib.rs +++ b/sdk/couchbase-core/src/lib.rs @@ -2,7 +2,6 @@ extern crate core; #[macro_use] extern crate lazy_static; -pub mod address; pub mod agent; pub mod agent_ops; pub mod auth_mechanism; @@ -50,7 +49,7 @@ mod scram; pub mod searchcomponent; pub mod searchx; pub mod service_type; -mod tls_config; +pub mod tls_config; mod util; mod vbucketmap; mod vbucketrouter; diff --git a/sdk/couchbase-core/src/memdx/connection.rs b/sdk/couchbase-core/src/memdx/connection.rs index 582e201e..61a94377 100644 --- a/sdk/couchbase-core/src/memdx/connection.rs +++ b/sdk/couchbase-core/src/memdx/connection.rs @@ -1,6 +1,7 @@ use crate::memdx::error::Error; use crate::memdx::error::Result; use crate::tls_config::TlsConfig; +use couchbase_connstr::Address; use socket2::TcpKeepalive; use std::fmt::Debug; use std::io; @@ -10,7 +11,6 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::time::{timeout_at, Instant}; -use crate::address::Address; #[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))] use { tokio_rustls::rustls::pki_types::DnsName, tokio_rustls::rustls::pki_types::ServerName, diff --git a/sdk/couchbase-core/src/mgmtx/mgmt.rs b/sdk/couchbase-core/src/mgmtx/mgmt.rs index 98035dc0..b3e14a48 100644 --- a/sdk/couchbase-core/src/mgmtx/mgmt.rs +++ b/sdk/couchbase-core/src/mgmtx/mgmt.rs @@ -6,6 +6,7 @@ use crate::mgmtx::error; use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions}; use bytes::Bytes; use http::Method; +use log::debug; use serde::de::DeserializeOwned; use serde::Deserialize; use std::collections::HashMap; @@ -196,7 +197,7 @@ impl Management { opts: &GetTerseClusterConfigOptions<'_>, ) -> error::Result { let method = Method::GET; - let path = "/pools/default/nodeServicesStreaming".to_string(); + let path = "/pools/default/nodeServices".to_string(); let resp = self .execute( diff --git a/sdk/couchbase-core/src/options/agent.rs b/sdk/couchbase-core/src/options/agent.rs index 2bbc842b..72f0654c 100644 --- a/sdk/couchbase-core/src/options/agent.rs +++ b/sdk/couchbase-core/src/options/agent.rs @@ -1,4 +1,5 @@ -use crate::address::Address; +use couchbase_connstr::Address; + use crate::auth_mechanism::AuthMechanism; use crate::authenticator::Authenticator; use crate::tls_config::TlsConfig; diff --git a/sdk/couchbase-core/tests/common/default_agent_options.rs b/sdk/couchbase-core/tests/common/default_agent_options.rs index 206138a2..980ba77c 100644 --- a/sdk/couchbase-core/tests/common/default_agent_options.rs +++ b/sdk/couchbase-core/tests/common/default_agent_options.rs @@ -1,6 +1,7 @@ use crate::common::test_config::TestSetupConfig; use couchbase_core::authenticator::{Authenticator, PasswordAuthenticator}; use couchbase_core::options::agent::{AgentOptions, SeedConfig}; +use couchbase_core::tls_config::TlsConfig; #[cfg(feature = "rustls-tls")] use { couchbase_core::insecure_certverfier::InsecureCertVerifier, std::sync::Arc, @@ -9,7 +10,23 @@ use { }; pub async fn create_default_options(config: TestSetupConfig) -> AgentOptions { - let tls_config = if config.use_ssl { + let tls_config = create_tls_config(&config); + + AgentOptions::new( + SeedConfig::new() + .memd_addrs(config.memd_addrs.clone()) + .http_addrs(config.http_addrs.clone()), + Authenticator::PasswordAuthenticator(PasswordAuthenticator { + username: config.username.clone(), + password: config.password.clone(), + }), + ) + .tls_config(tls_config) + .bucket_name(config.bucket.clone()) +} + +pub fn create_tls_config(config: &TestSetupConfig) -> Option { + if config.use_ssl { #[cfg(feature = "native-tls")] { let mut builder = tokio_native_tls::native_tls::TlsConnector::builder(); @@ -20,17 +37,7 @@ pub async fn create_default_options(config: TestSetupConfig) -> AgentOptions { Some(get_rustls_config()) } else { None - }; - - AgentOptions::new( - SeedConfig::new().memd_addrs(config.memd_addrs.clone()), - Authenticator::PasswordAuthenticator(PasswordAuthenticator { - username: config.username.clone(), - password: config.password.clone(), - }), - ) - .tls_config(tls_config) - .bucket_name(config.bucket.clone()) + } } pub async fn create_options_without_bucket(config: TestSetupConfig) -> AgentOptions { @@ -48,7 +55,9 @@ pub async fn create_options_without_bucket(config: TestSetupConfig) -> AgentOpti }; AgentOptions::new( - SeedConfig::new().memd_addrs(config.memd_addrs.clone()), + SeedConfig::new() + .memd_addrs(config.memd_addrs.clone()) + .http_addrs(config.http_addrs.clone()), Authenticator::PasswordAuthenticator(PasswordAuthenticator { username: config.username.clone(), password: config.password.clone(), diff --git a/sdk/couchbase-core/tests/common/test_config.rs b/sdk/couchbase-core/tests/common/test_config.rs index 574c01e9..f7c120c2 100644 --- a/sdk/couchbase-core/tests/common/test_config.rs +++ b/sdk/couchbase-core/tests/common/test_config.rs @@ -1,8 +1,7 @@ use crate::common::default_agent_options; use crate::common::node_version::NodeVersion; use crate::common::test_agent::TestAgent; -use couchbase_connstr::ResolvedConnSpec; -use couchbase_core::address::Address; +use couchbase_connstr::{Address, ResolvedConnSpec}; use couchbase_core::agent::Agent; use envconfig::Envconfig; use lazy_static::lazy_static; @@ -49,6 +48,7 @@ pub struct TestSetupConfig { pub username: String, pub password: String, pub memd_addrs: Vec
, + pub http_addrs: Vec
, pub data_timeout: String, pub use_ssl: bool, pub bucket: String, @@ -166,14 +166,8 @@ pub async fn create_test_config(test_config: &EnvTestConfig) -> TestSetupConfig TestSetupConfig { username: test_config.username.clone(), password: test_config.password.clone(), - memd_addrs: resolved_conn_spec - .memd_hosts - .iter() - .map(|h| Address { - host: h.host.clone(), - port: h.port, - }) - .collect(), + memd_addrs: resolved_conn_spec.memd_hosts.clone(), + http_addrs: resolved_conn_spec.http_hosts.clone(), data_timeout: test_config.data_timeout.clone(), use_ssl: resolved_conn_spec.use_ssl, bucket: test_config.default_bucket.clone(), diff --git a/sdk/couchbase-core/tests/http.rs b/sdk/couchbase-core/tests/http.rs index 8308f5c7..fd98981f 100644 --- a/sdk/couchbase-core/tests/http.rs +++ b/sdk/couchbase-core/tests/http.rs @@ -1,6 +1,7 @@ +use crate::common::default_agent_options::create_tls_config; use crate::common::test_config::setup_test; use bytes::Bytes; -use couchbase_core::httpx::client::{Client, ClientConfig, ReqwestClient}; +use couchbase_core::httpx::client::{self, Client, ClientConfig, ReqwestClient}; use couchbase_core::httpx::decoder::Decoder; use couchbase_core::httpx::raw_json_row_streamer::{RawJsonRowItem, RawJsonRowStreamer}; use couchbase_core::httpx::request::{Auth, BasicAuth, Request}; @@ -47,14 +48,17 @@ pub struct QueryMetrics { #[test] fn test_row_streamer() { setup_test(async |config| { - let addrs = config.memd_addrs; + let scheme = if config.use_ssl { "https" } else { "http" }; + let addrs = &config.http_addrs; - let host = addrs.first().unwrap().host.clone(); + let addr = addrs.first().unwrap(); + let host = &addr.host; + let port = addr.port; - let basic_auth = BasicAuth::new(config.username, config.password); + let basic_auth = BasicAuth::new(&config.username, &config.password); let request_body = json!({"statement": "select i from array_range(1, 10000) AS i;"}); - let uri = format!("http://{host}:8093/query/service"); + let uri = format!("{scheme}://{host}:{port}/_p/query/query/service"); let request = Request::new(Method::POST, uri) .user_agent("rscbcorex".to_string()) @@ -62,7 +66,8 @@ fn test_row_streamer() { .content_type("application/json".to_string()) .body(Bytes::from(serde_json::to_vec(&request_body).unwrap())); - let client = ReqwestClient::new(ClientConfig::default()).unwrap(); + let client_config = ClientConfig::default().tls_config(create_tls_config(&config)); + let client = ReqwestClient::new(client_config).unwrap(); let resp = timeout(Duration::from_secs(10), client.execute(request)) .await @@ -116,19 +121,23 @@ fn test_row_streamer() { #[test] fn test_json_block_read() { setup_test(async |config| { - let addrs = config.memd_addrs; + let scheme = if config.use_ssl { "https" } else { "http" }; + let addrs = &config.http_addrs; - let host = addrs.first().unwrap().host.clone(); + let addr = addrs.first().unwrap(); + let host = &addr.host; + let port = addr.port; - let basic_auth = BasicAuth::new(config.username, config.password); - let uri = format!("http://{host}:8091/pools/default/terseClusterInfo"); + let basic_auth = BasicAuth::new(&config.username, &config.password); + let uri = format!("{scheme}://{host}:{port}/pools/default/terseClusterInfo"); let request = Request::new(Method::GET, uri) .user_agent("rscbcorex".to_string()) .auth(Auth::BasicAuth(basic_auth)) .content_type("application/json".to_string()); - let client = ReqwestClient::new(ClientConfig::default()).expect("could not create client"); + let client_config = ClientConfig::default().tls_config(create_tls_config(&config)); + let client = ReqwestClient::new(client_config).expect("could not create client"); let res = timeout(Duration::from_secs(10), client.execute(request)) .await diff --git a/sdk/couchbase/src/clients/cluster_client.rs b/sdk/couchbase/src/clients/cluster_client.rs index b480be0d..334aff9d 100644 --- a/sdk/couchbase/src/clients/cluster_client.rs +++ b/sdk/couchbase/src/clients/cluster_client.rs @@ -16,7 +16,6 @@ use crate::clients::user_mgmt_client::{ use crate::error; use crate::options::cluster_options::ClusterOptions; use couchbase_connstr::{parse, resolve, Address, SrvRecord}; -use couchbase_core::address; use couchbase_core::ondemand_agentmanager::OnDemandAgentManager; use couchbase_core::options::agent::{CompressionConfig, SeedConfig}; use couchbase_core::options::ondemand_agentmanager::OnDemandAgentManagerOptions; @@ -193,24 +192,8 @@ impl CouchbaseClusterBackend { }; let seed_config = SeedConfig::new() - .memd_addrs( - memd_hosts - .into_iter() - .map(|a| address::Address { - host: a.host, - port: a.port, - }) - .collect(), - ) - .http_addrs( - http_hosts - .into_iter() - .map(|a| address::Address { - host: a.host, - port: a.port, - }) - .collect(), - ); + .memd_addrs(memd_hosts.clone()) + .http_addrs(http_hosts.clone()); let mut compression_config = CompressionConfig::default(); if let Some(cm) = opts.compression_mode {