From 0638c3e20857dd6222db1f5a0a77db2186c3e2fb Mon Sep 17 00:00:00 2001 From: wayslog Date: Thu, 13 Nov 2025 19:02:08 +0800 Subject: [PATCH 1/2] fixed: add backup request --- CHANGELOG.md | 4 + README.md | 5 + default.toml | 1 + docs/functionality.md | 1 + docs/usage.md | 5 + src/cluster/mod.rs | 241 +++++++++++++++++++++++++++++++++++++----- src/config/mod.rs | 164 +++++++++++++++++++++++++++- 7 files changed, 395 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c53bfbb..22c164f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # change log +## Unreleased + +- add backup request config + dispatcher to duplicate slow master reads to replicas in Redis Cluster mode + # 1.3.3 - avoid infinite call when cluster endpoint was down diff --git a/README.md b/README.md index 3346756..6128cfb 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,11 @@ fetch = 600 # read_from_slave is the feature make slave balanced readed by client and ignore side effects. read_from_slave = true +# backup_request duplicates slow reads to replica nodes when enabled. +# trigger_slow_ms decides the fixed delay (set "default" or remove field to rely on moving average). +# multiplier is applied to the rolling average latency to determine another trigger threshold. +backup_request = { enabled = false, trigger_slow_ms = 5, multiplier = 2.0 } + ############################# Proxy Mode Special ####################################################### # ping_fail_limit means when ping fail reach the limit number, the node will be ejected from the cluster # until the ping is ok in future. diff --git a/default.toml b/default.toml index f8a42e1..73f3f61 100644 --- a/default.toml +++ b/default.toml @@ -44,6 +44,7 @@ fetch_interval = 1800000 # 1800s , 30 minutes fetch_since_latest_cmd = 1000 # 3600s , 1 hour read_from_slave = false + backup_request = { enabled = false, trigger_slow_ms = 5, multiplier = 2.0 } ping_fail_limit = 10 ping_interval = 300 diff --git a/docs/functionality.md b/docs/functionality.md index a45307f..600da90 100644 --- a/docs/functionality.md +++ b/docs/functionality.md @@ -72,6 +72,7 @@ - 订阅与阻塞命令: - SUBSCRIBE / PSUBSCRIBE 会进入独占会话,按频道哈希槽选择节点,并在 MOVED / ASK 时自动重连与重放订阅; - BLPOP 等阻塞类命令复用独占连接,避免被 pipeline 请求阻塞。 +- 备份读(backup request):仅在 Redis Cluster 模式下可选启用;当 master 读命令在配置阈值上仍未返回时,会复制该请求至对应 replica,优先向客户端返回更快的副本响应,同时继续跟踪 master 延迟以动态更新阈值。 - 依赖大量 `Rc>`、`futures::unsync::mpsc`,并使用 `tokio::runtime::current_thread`. ## 协议与命令抽象 diff --git a/docs/usage.md b/docs/usage.md index 651d747..36e8705 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -26,6 +26,11 @@ cargo build --release - `hash_tag`:一致性 hash 标签,例如 `{}`。 - `read_timeout` / `write_timeout`:后端超时(毫秒)。 - `read_from_slave`:Cluster 模式下允许从 replica 读取。 +- `backup_request`:Cluster 模式下用于配置“副本兜底读”策略的表,包含: + - `enabled`:是否开启该策略(默认 `false`)。 + - `trigger_slow_ms`:固定延迟阈值(毫秒,可写 `"default"` 关闭固定阈值),超过该延迟仍未返回则发送副本备份请求。 + - `multiplier`:相对阈值,等于“master 累计平均耗时 × multiplier”;当满足固定阈值或相对阈值任意条件即派发备份请求。 +- `backup_request` 的三个字段均可通过 `CONFIG SET cluster..backup-request-*` 在线调整。 - `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。 - `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。 - `hotkey_sample_every`:热点 Key 采样间隔(默认 `32`,越大代表对请求采样越稀疏)。 diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 36abfec..2437e65 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,4 +1,4 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -8,11 +8,12 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::stream::FuturesOrdered; use futures::{SinkExt, StreamExt}; +use parking_lot::RwLock; use rand::{seq::SliceRandom, thread_rng}; #[cfg(any(unix, windows))] use socket2::{SockRef, TcpKeepalive}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{interval, sleep, timeout, MissedTickBehavior}; use tokio_util::codec::{Framed, FramedParts}; use tracing::{debug, info, warn}; @@ -20,7 +21,7 @@ use tracing::{debug, info, warn}; use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator}; use crate::backend::client::{ClientId, FrontConnectionGuard}; use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand}; -use crate::config::{ClusterConfig, ClusterRuntime, ConfigManager}; +use crate::config::{BackupRequestRuntime, ClusterConfig, ClusterRuntime, ConfigManager}; use crate::hotkey::Hotkey; use crate::info::{InfoContext, ProxyMode}; use crate::metrics; @@ -56,6 +57,7 @@ pub struct ClusterProxy { config_manager: Arc, slowlog: Arc, hotkey: Arc, + backup: Arc, listen_port: u16, seed_nodes: usize, } @@ -94,6 +96,10 @@ impl ClusterProxy { let hotkey = config_manager .hotkey_for(&config.name) .ok_or_else(|| anyhow!("missing hotkey state for cluster {}", config.name))?; + let backup_runtime = config_manager + .backup_request_for(&config.name) + .ok_or_else(|| anyhow!("missing backup request state for cluster {}", config.name))?; + let backup = Arc::new(BackupRequestController::new(backup_runtime)); let proxy = Self { cluster: cluster.clone(), hash_tag, @@ -107,6 +113,7 @@ impl ClusterProxy { config_manager, slowlog, hotkey, + backup, listen_port, seed_nodes: config.servers.len(), }; @@ -727,6 +734,7 @@ impl ClusterProxy { let cluster = self.cluster.clone(); let slowlog = self.slowlog.clone(); let hotkey = self.hotkey.clone(); + let backup = self.backup.clone(); let kind_label = command.kind_label(); Box::pin(async move { match dispatch_with_context( @@ -738,6 +746,7 @@ impl ClusterProxy { client_id, slowlog, hotkey, + backup, command, ) .await @@ -1425,6 +1434,7 @@ async fn dispatch_with_context( client_id: ClientId, slowlog: Arc, hotkey: Arc, + backup: Arc, command: RedisCommand, ) -> Result { let command_snapshot = command.clone(); @@ -1438,6 +1448,7 @@ async fn dispatch_with_context( pool, fetch_trigger, client_id, + backup, multi, ) .await @@ -1449,6 +1460,7 @@ async fn dispatch_with_context( pool, fetch_trigger, client_id, + backup, command, ) .await @@ -1465,6 +1477,7 @@ async fn dispatch_multi( pool: Arc>, fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, + backup: Arc, multi: MultiDispatch, ) -> Result { let mut tasks: FuturesOrdered>> = FuturesOrdered::new(); @@ -1473,6 +1486,7 @@ async fn dispatch_multi( let slots = slots.clone(); let pool = pool.clone(); let fetch_trigger = fetch_trigger.clone(); + let backup = backup.clone(); let SubCommand { positions, command } = sub; tasks.push_back(Box::pin(async move { let response = dispatch_single( @@ -1482,6 +1496,7 @@ async fn dispatch_multi( pool, fetch_trigger, client_id, + backup, command, ) .await?; @@ -1506,9 +1521,11 @@ async fn dispatch_single( pool: Arc>, fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, + backup: Arc, command: RedisCommand, ) -> Result { let blocking = command.as_blocking(); + let is_read_only = command.is_read_only(); let mut slot = command .hash_slot(hash_tag.as_deref()) .ok_or_else(|| anyhow!("command missing key"))?; @@ -1550,29 +1567,42 @@ async fn dispatch_single( Err(_) => return Err(anyhow!("backend session closed")), } } else { - let response_rx = pool - .dispatch(target.clone(), client_id, command.clone()) - .await?; + let backup_plan = if target_override.is_none() + && !read_from_slave + && is_read_only + && matches!(blocking, BlockingKind::None) + { + replica_node_for_slot(&slots, slot) + .and_then(|replica| backup.plan(&target, Some(replica))) + } else { + None + }; - match response_rx.await { - Ok(Ok(resp)) => match parse_redirect(resp.clone())? { - Some(Redirect::Moved { - slot: new_slot, - address, - }) => { - let _ = fetch_trigger.send(()); - slot = new_slot; - target_override = Some(BackendNode::new(address)); - continue; - } - Some(Redirect::Ask { address }) => { - target_override = Some(BackendNode::new(address)); - continue; - } - None => return Ok(resp), - }, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(anyhow!("backend session closed")), + let resp = execute_with_backup( + pool.clone(), + client_id, + &command, + target.clone(), + backup_plan, + backup.clone(), + ) + .await?; + + match parse_redirect(resp.clone())? { + Some(Redirect::Moved { + slot: new_slot, + address, + }) => { + let _ = fetch_trigger.send(()); + slot = new_slot; + target_override = Some(BackendNode::new(address)); + continue; + } + Some(Redirect::Ask { address }) => { + target_override = Some(BackendNode::new(address)); + continue; + } + None => return Ok(resp), } } } @@ -1580,6 +1610,88 @@ async fn dispatch_single( Err(anyhow!("too many cluster redirects")) } +#[derive(Clone)] +struct BackupPlan { + replica: BackendNode, + delay: Duration, +} + +async fn execute_with_backup( + pool: Arc>, + client_id: ClientId, + command: &RedisCommand, + target: BackendNode, + plan: Option, + controller: Arc, +) -> Result { + let primary_rx = pool + .dispatch(target.clone(), client_id, command.clone()) + .await?; + if let Some(plan) = plan { + race_with_backup( + pool, client_id, command, target, primary_rx, plan, controller, + ) + .await + } else { + let started = Instant::now(); + let result = primary_rx.await; + if result.is_ok() { + controller.record_primary(&target, started.elapsed()); + } + result? + } +} + +async fn race_with_backup( + pool: Arc>, + client_id: ClientId, + command: &RedisCommand, + master: BackendNode, + primary_rx: oneshot::Receiver>, + plan: BackupPlan, + controller: Arc, +) -> Result { + let master_start = Instant::now(); + let mut primary_future = Box::pin(primary_rx); + let mut delay_future = Box::pin(tokio::time::sleep(plan.delay)); + + if let Some(result) = tokio::select! { + res = primary_future.as_mut() => Some(res), + _ = delay_future.as_mut() => None, + } { + if result.is_ok() { + controller.record_primary(&master, master_start.elapsed()); + } + return result?; + } + + let backup_rx = pool + .dispatch(plan.replica.clone(), client_id, command.clone()) + .await?; + let mut backup_future = Box::pin(backup_rx); + + tokio::select! { + res = primary_future.as_mut() => { + if res.is_ok() { + controller.record_primary(&master, master_start.elapsed()); + } + res? + } + res = backup_future.as_mut() => { + let remaining = primary_future; + let controller_clone = controller.clone(); + let master_clone = master.clone(); + tokio::spawn(async move { + let mut future = remaining; + if let Ok(Ok(_)) = future.as_mut().await { + controller_clone.record_primary(&master_clone, master_start.elapsed()); + } + }); + res? + } + } +} + fn select_node_for_slot( slots: &watch::Sender, read_from_slave: bool, @@ -1597,6 +1709,85 @@ fn select_node_for_slot( bail!("slot {} not covered", slot) } +fn replica_node_for_slot(slots: &watch::Sender, slot: u16) -> Option { + slots + .borrow() + .replica_for_slot(slot) + .map(|addr| BackendNode::new(addr.to_string())) +} + +#[derive(Clone)] +struct BackupRequestController { + runtime: Arc, + averages: Arc>>, +} + +impl BackupRequestController { + fn new(runtime: Arc) -> Self { + Self { + runtime, + averages: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn plan(&self, master: &BackendNode, replica: Option) -> Option { + let replica = replica?; + let delay = self.delay_for(master)?; + Some(BackupPlan { replica, delay }) + } + + fn record_primary(&self, master: &BackendNode, elapsed: Duration) { + let micros = elapsed.as_secs_f64() * 1_000_000.0; + let mut guard = self.averages.write(); + let entry = guard + .entry(master.as_str().to_string()) + .or_insert_with(LatencyAverage::default); + entry.update(micros); + } + + fn delay_for(&self, master: &BackendNode) -> Option { + if !self.runtime.enabled() { + return None; + } + let mut candidates = Vec::new(); + if let Some(ms) = self.runtime.threshold_ms() { + candidates.push(Duration::from_millis(ms)); + } + if let Some(avg) = self.average_for(master) { + let multiplier = self.runtime.multiplier(); + if multiplier > 0.0 { + let scaled = (avg * multiplier).clamp(0.0, u64::MAX as f64); + let micros = scaled as u64; + candidates.push(Duration::from_micros(micros)); + } + } + candidates.into_iter().min() + } + + fn average_for(&self, master: &BackendNode) -> Option { + let guard = self.averages.read(); + guard.get(master.as_str()).map(|stats| stats.avg_micros) + } +} + +#[derive(Default, Clone)] +struct LatencyAverage { + avg_micros: f64, + samples: u64, +} + +impl LatencyAverage { + fn update(&mut self, sample: f64) { + self.samples = self.samples.saturating_add(1); + if self.samples == 1 { + self.avg_micros = sample; + } else { + let count = self.samples as f64; + self.avg_micros += (sample - self.avg_micros) / count; + } + } +} + fn subscription_count(resp: &RespValue) -> Option<(SubscriptionKind, i64)> { if let RespValue::Array(items) = resp { if items.len() >= 3 { diff --git a/src/config/mod.rs b/src/config/mod.rs index af305c9..ee8ee81 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::env; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use anyhow::{anyhow, bail, Context, Result}; @@ -55,6 +55,10 @@ fn default_backend_resp_version() -> RespVersion { RespVersion::Resp2 } +fn default_backup_multiplier() -> f64 { + 2.0 +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { #[serde(default)] @@ -125,6 +129,24 @@ impl Default for CacheType { } } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct BackupRequestConfig { + pub enabled: bool, + pub trigger_slow_ms: Option, + pub multiplier: f64, +} + +impl Default for BackupRequestConfig { + fn default() -> Self { + Self { + enabled: false, + trigger_slow_ms: None, + multiplier: default_backup_multiplier(), + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ClusterConfig { pub name: String, @@ -183,6 +205,8 @@ pub struct ClusterConfig { pub hotkey_decay: f64, #[serde(default = "default_backend_resp_version")] pub backend_resp_version: RespVersion, + #[serde(default)] + pub backup_request: BackupRequestConfig, } impl ClusterConfig { @@ -235,6 +259,12 @@ impl ClusterConfig { if !(self.hotkey_decay > 0.0 && self.hotkey_decay <= 1.0) { bail!("cluster {} hotkey_decay must be in (0, 1]", self.name); } + if self.backup_request.multiplier <= 0.0 { + bail!( + "cluster {} backup_request.multiplier must be greater than 0", + self.name + ); + } Ok(()) } @@ -345,6 +375,49 @@ impl ClusterRuntime { } } +#[derive(Debug)] +pub struct BackupRequestRuntime { + enabled: AtomicBool, + threshold_ms: AtomicI64, + multiplier_bits: AtomicU64, +} + +impl BackupRequestRuntime { + fn new(config: &BackupRequestConfig) -> Self { + Self { + enabled: AtomicBool::new(config.enabled), + threshold_ms: AtomicI64::new(option_to_atomic(config.trigger_slow_ms)), + multiplier_bits: AtomicU64::new(config.multiplier.to_bits()), + } + } + + pub fn enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + pub fn set_enabled(&self, value: bool) { + self.enabled.store(value, Ordering::Relaxed); + } + + pub fn threshold_ms(&self) -> Option { + atomic_to_option(self.threshold_ms.load(Ordering::Relaxed)) + } + + pub fn set_threshold_ms(&self, value: Option) { + self.threshold_ms + .store(option_to_atomic(value), Ordering::Relaxed); + } + + pub fn multiplier(&self) -> f64 { + f64::from_bits(self.multiplier_bits.load(Ordering::Relaxed)) + } + + pub fn set_multiplier(&self, value: f64) { + self.multiplier_bits + .store(value.to_bits(), Ordering::Relaxed); + } +} + fn option_to_atomic(value: Option) -> i64 { match value { Some(v) => v as i64, @@ -366,6 +439,7 @@ struct ClusterEntry { runtime: Arc, slowlog: Arc, hotkey: Arc, + backup: Arc, } #[derive(Debug)] @@ -396,6 +470,7 @@ impl ConfigManager { decay: cluster.hotkey_decay, }; let hotkey = Arc::new(Hotkey::new(hotkey_config)); + let backup = Arc::new(BackupRequestRuntime::new(&cluster.backup_request)); clusters.insert( key, ClusterEntry { @@ -403,6 +478,7 @@ impl ConfigManager { runtime, slowlog, hotkey, + backup, }, ); } @@ -432,6 +508,12 @@ impl ConfigManager { .map(|entry| entry.hotkey.clone()) } + pub fn backup_request_for(&self, name: &str) -> Option> { + self.clusters + .get(&name.to_ascii_lowercase()) + .map(|entry| entry.backup.clone()) + } + pub async fn handle_command(&self, command: &RedisCommand) -> Option { if !command.command_name().eq_ignore_ascii_case(b"CONFIG") { return None; @@ -609,6 +691,41 @@ impl ConfigManager { "cluster hotkey_decay updated via CONFIG SET" ); } + ClusterField::BackupRequestEnabled => { + let enabled = parse_bool_flag(value)?; + entry.backup.set_enabled(enabled); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].backup_request.enabled = enabled; + info!( + cluster = cluster_name, + value = value, + "cluster backup_request.enabled updated via CONFIG SET" + ); + } + ClusterField::BackupRequestThreshold => { + let parsed = parse_timeout_value(value)?; + entry.backup.set_threshold_ms(parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index] + .backup_request + .trigger_slow_ms = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster backup_request.trigger_slow_ms updated via CONFIG SET" + ); + } + ClusterField::BackupRequestMultiplier => { + let parsed = parse_backup_multiplier(value)?; + entry.backup.set_multiplier(parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].backup_request.multiplier = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster backup_request.multiplier updated via CONFIG SET" + ); + } } Ok(()) } @@ -666,6 +783,18 @@ impl ConfigManager { format!("cluster.{}.hotkey-decay", name), hotkey_cfg.decay.to_string(), )); + entries.push(( + format!("cluster.{}.backup-request-enabled", name), + bool_to_string(entry.backup.enabled()), + )); + entries.push(( + format!("cluster.{}.backup-request-threshold-ms", name), + option_to_string(entry.backup.threshold_ms()), + )); + entries.push(( + format!("cluster.{}.backup-request-multiplier", name), + entry.backup.multiplier().to_string(), + )); } } entries.sort_by(|a, b| a.0.cmp(&b.0)); @@ -707,6 +836,9 @@ fn parse_key(key: &str) -> Result<(String, ClusterField)> { "hotkey-sketch-depth" => ClusterField::HotkeySketchDepth, "hotkey-capacity" => ClusterField::HotkeyCapacity, "hotkey-decay" => ClusterField::HotkeyDecay, + "backup-request-enabled" => ClusterField::BackupRequestEnabled, + "backup-request-threshold-ms" => ClusterField::BackupRequestThreshold, + "backup-request-multiplier" => ClusterField::BackupRequestMultiplier, unknown => bail!("unknown cluster field '{}'", unknown), }; Ok((cluster.to_string(), field)) @@ -800,12 +932,39 @@ fn parse_hotkey_decay(value: &str) -> Result { Ok(parsed) } +fn parse_bool_flag(value: &str) -> Result { + match value.trim().to_ascii_lowercase().as_str() { + "yes" | "true" | "1" => Ok(true), + "no" | "false" | "0" => Ok(false), + other => bail!("invalid boolean value '{}'", other), + } +} + +fn parse_backup_multiplier(value: &str) -> Result { + let parsed: f64 = value + .trim() + .parse() + .with_context(|| format!("invalid backup-request-multiplier value '{}'", value))?; + if parsed <= 0.0 { + bail!("backup-request-multiplier must be > 0"); + } + Ok(parsed) +} + fn option_to_string(value: Option) -> String { value .map(|v| v.to_string()) .unwrap_or_else(|| DUMP_VALUE_DEFAULT.to_string()) } +fn bool_to_string(value: bool) -> String { + if value { + "yes".to_string() + } else { + "no".to_string() + } +} + fn flatten_pairs(entries: Vec<(String, String)>) -> Vec { let mut values = Vec::with_capacity(entries.len() * 2); for (key, value) in entries { @@ -830,6 +989,9 @@ enum ClusterField { HotkeySketchDepth, HotkeyCapacity, HotkeyDecay, + BackupRequestEnabled, + BackupRequestThreshold, + BackupRequestMultiplier, } fn wildcard_match(pattern: &str, target: &str) -> bool { From faf6038cbb9ef25058896fb63246716f8470bf5f Mon Sep 17 00:00:00 2001 From: wayslog Date: Fri, 14 Nov 2025 19:30:33 +0800 Subject: [PATCH 2/2] feat: add backup requests --- src/cluster/mod.rs | 63 +++++++++++++++++++++++++++++++++++++++++++--- src/metrics/mod.rs | 18 +++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 4add461..6da80d1 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -779,6 +780,7 @@ impl ClusterProxy { slowlog, hotkey, backup, + cluster.clone(), command, ) .await @@ -1481,6 +1483,7 @@ async fn dispatch_with_context( slowlog: Arc, hotkey: Arc, backup: Arc, + cluster: Arc, command: RedisCommand, ) -> Result { let command_snapshot = command.clone(); @@ -1495,6 +1498,7 @@ async fn dispatch_with_context( fetch_trigger, client_id, backup, + cluster, multi, ) .await @@ -1507,6 +1511,7 @@ async fn dispatch_with_context( fetch_trigger, client_id, backup, + cluster, command, ) .await @@ -1524,6 +1529,7 @@ async fn dispatch_multi( fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, backup: Arc, + cluster: Arc, multi: MultiDispatch, ) -> Result { let mut tasks: FuturesOrdered>> = FuturesOrdered::new(); @@ -1533,6 +1539,7 @@ async fn dispatch_multi( let pool = pool.clone(); let fetch_trigger = fetch_trigger.clone(); let backup = backup.clone(); + let cluster = cluster.clone(); let SubCommand { positions, command } = sub; tasks.push_back(Box::pin(async move { let response = dispatch_single( @@ -1543,6 +1550,7 @@ async fn dispatch_multi( fetch_trigger, client_id, backup, + cluster, command, ) .await?; @@ -1568,6 +1576,7 @@ async fn dispatch_single( fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, backup: Arc, + cluster: Arc, command: RedisCommand, ) -> Result { let blocking = command.as_blocking(); @@ -1623,12 +1632,16 @@ async fn dispatch_single( } else { None }; + if backup_plan.is_some() { + metrics::backup_event(cluster.as_ref(), "planned"); + } let resp = execute_with_backup( pool.clone(), client_id, &command, target.clone(), + cluster.clone(), backup_plan, backup.clone(), ) @@ -1667,6 +1680,7 @@ async fn execute_with_backup( client_id: ClientId, command: &RedisCommand, target: BackendNode, + cluster: Arc, plan: Option, controller: Arc, ) -> Result { @@ -1675,7 +1689,14 @@ async fn execute_with_backup( .await?; if let Some(plan) = plan { race_with_backup( - pool, client_id, command, target, primary_rx, plan, controller, + pool, + client_id, + command, + target, + primary_rx, + cluster, + plan, + controller, ) .await } else { @@ -1694,6 +1715,7 @@ async fn race_with_backup( command: &RedisCommand, master: BackendNode, primary_rx: oneshot::Receiver>, + cluster: Arc, plan: BackupPlan, controller: Arc, ) -> Result { @@ -1708,12 +1730,29 @@ async fn race_with_backup( if result.is_ok() { controller.record_primary(&master, master_start.elapsed()); } + metrics::backup_event(cluster.as_ref(), "primary-before"); return result?; } - let backup_rx = pool + let backup_rx = match pool .dispatch(plan.replica.clone(), client_id, command.clone()) - .await?; + .await + { + Ok(rx) => { + metrics::backup_event(cluster.as_ref(), "dispatched"); + rx + } + Err(err) => { + metrics::backup_event(cluster.as_ref(), "dispatch-fail"); + warn!( + master = %master.as_str(), + replica = %plan.replica.as_str(), + error = %err, + "failed to dispatch backup request; falling back to primary" + ); + return await_primary_only(primary_future, controller, master, master_start).await; + } + }; let mut backup_future = Box::pin(backup_rx); tokio::select! { @@ -1721,9 +1760,11 @@ async fn race_with_backup( if res.is_ok() { controller.record_primary(&master, master_start.elapsed()); } + metrics::backup_event(cluster.as_ref(), "primary-after"); res? } res = backup_future.as_mut() => { + metrics::backup_event(cluster.as_ref(), "replica-win"); let remaining = primary_future; let controller_clone = controller.clone(); let master_clone = master.clone(); @@ -1738,6 +1779,22 @@ async fn race_with_backup( } } +async fn await_primary_only( + mut primary_future: Pin>>>, + controller: Arc, + master: BackendNode, + master_start: Instant, +) -> Result { + match primary_future.as_mut().await { + Ok(Ok(resp)) => { + controller.record_primary(&master, master_start.elapsed()); + Ok(resp) + } + Ok(Err(err)) => Err(err), + Err(_) => Err(anyhow!("backend session closed")), + } +} + fn select_node_for_slot( slots: &watch::Sender, read_from_slave: bool, diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index c7b852c..aeff374 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -254,6 +254,17 @@ static REMOTE_TIMER: Lazy = Lazy::new(|| { .expect("remote timer histogram registration must succeed") }); +static BACKUP_REQUEST_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + opts!( + "aster_backup_request_total", + "backup request events grouped by cluster and event" + ), + &["cluster", "event"] + ) + .expect("backup request counter registration must succeed") +}); + /// Register the running version with metrics. pub fn register_version(version: &str) { VERSION_GAUGE.with_label_values(&[version]).set(1.0); @@ -403,6 +414,13 @@ pub fn backend_request_result(cluster: &str, backend: &str, result: &str) { .inc(); } +/// Record a backup request related event for observability. +pub fn backup_event(cluster: &str, event: &str) { + BACKUP_REQUEST_EVENTS + .with_label_values(&[cluster, event]) + .inc(); +} + /// Record the outcome of a backend heartbeat check. pub fn backend_heartbeat(cluster: &str, backend: &str, ok: bool) { let status = if ok { "ok" } else { "fail" };