diff --git a/Cargo.lock b/Cargo.lock index 4940515df..c6e1d36f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2480,7 +2480,7 @@ dependencies = [ "bstr", "bytes", "cityhash-rs", - "clickhouse-derive", + "clickhouse-derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures", "futures-channel", "http-body-util", @@ -2489,7 +2489,7 @@ dependencies = [ "hyper-util", "lz4_flex", "replace_with", - "sealed", + "sealed 0.5.0", "serde", "static_assertions", "thiserror 1.0.69", @@ -2499,6 +2499,35 @@ dependencies = [ "uuid", ] +[[package]] +name = "clickhouse" +version = "0.13.3" +source = "git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2#8cf3d2e138dd121367fa10e875d3f91374b075b2" +dependencies = [ + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-derive 0.2.0 (git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2)", + "clickhouse-types", + "futures-channel", + "futures-util", + "http-body-util", + "hyper 1.7.0", + "hyper-tls 0.6.0", + "hyper-util", + "lz4_flex", + "quanta", + "replace_with", + "sealed 0.6.0", + "serde", + "static_assertions", + "thiserror 2.0.17", + "time", + "tokio", + "url", + "uuid", +] + [[package]] name = "clickhouse-derive" version = "0.2.0" @@ -2511,6 +2540,26 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "clickhouse-derive" +version = "0.2.0" +source = "git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2#8cf3d2e138dd121367fa10e875d3f91374b075b2" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.41", + "serde_derive_internals", + "syn 2.0.106", +] + +[[package]] +name = "clickhouse-types" +version = "0.1.0" +source = "git+https://github.com/ClickHouse/clickhouse-rs?rev=8cf3d2e138dd121367fa10e875d3f91374b075b2#8cf3d2e138dd121367fa10e875d3f91374b075b2" +dependencies = [ + "bytes", + "thiserror 2.0.17", +] + [[package]] name = "cloudabi" version = "0.0.3" @@ -9341,7 +9390,7 @@ dependencies = [ "alloy-transport-http", "bid-scraper", "built", - "clickhouse", + "clickhouse 0.12.2", "ctor", "derivative", "exponential-backoff", @@ -9451,6 +9500,44 @@ dependencies = [ "tracing", ] +[[package]] +name = "rbuilder-utils" +version = "0.1.0" +dependencies = [ + "ahash", + "alloy-primitives 1.4.1", + "auto_impl", + "clickhouse 0.13.3", + "clickhouse-derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "derivative", + "derive_more 2.0.1", + "dyn-clone", + "eyre", + "futures", + "futures-util", + "governor", + "integer-encoding", + "rand 0.9.2", + "redb", + "reqwest 0.12.24", + "reth-tasks", + "serde", + "serde_json", + "serde_with", + "sha2 0.10.9", + "strum 0.27.2", + "strum_macros 0.27.2", + "tempfile", + "thiserror 1.0.69", + "time", + "tokio", + "toml 0.8.23", + "tracing", + "tracing-futures", + "tracing-subscriber 0.3.20", + "uuid", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -9466,6 +9553,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" +[[package]] +name = "redb" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" +dependencies = [ + "libc", +] + [[package]] name = "redis" version = "0.25.4" @@ -13094,6 +13190,17 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "sealed" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.41", + "syn 2.0.106", +] + [[package]] name = "sec1" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 0acf086bd..8085b02f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ exclude = [".github/"] [workspace] members = [ "crates/rbuilder-primitives", + "crates/rbuilder-utils", "crates/rbuilder-config", "crates/rbuilder", "crates/rbuilder-operator", @@ -28,6 +29,7 @@ default-members = [ "crates/reth-rbuilder", "crates/rbuilder-rebalancer", "crates/rbuilder-primitives", + "crates/rbuilder-utils", "crates/test-relay", "crates/bid-scraper", ] @@ -197,6 +199,7 @@ eth-sparse-mpt = { path = "crates/eth-sparse-mpt" } bid-scraper = { path = "crates/bid-scraper" } rbuilder = { path = "crates/rbuilder" } rbuilder-primitives = { path = "crates/rbuilder-primitives" } +rbuilder-utils = { path = "crates/rbuilder-utils" } rbuilder-config = { path = "crates/rbuilder-config" } sysperf = { path = "crates/sysperf" } metrics_macros = { path = "crates/rbuilder/src/telemetry/metrics_macros" } diff --git a/crates/rbuilder-utils/Cargo.toml b/crates/rbuilder-utils/Cargo.toml new file mode 100644 index 000000000..af8576010 --- /dev/null +++ b/crates/rbuilder-utils/Cargo.toml @@ -0,0 +1,69 @@ +[package] +name = "rbuilder-utils" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +[dependencies] +reth-tasks = { git = "https://github.com/paradigmxyz/reth", rev = "9c30bf7af5e0d45deaf5917375c9922c16654b28" } + +# misc +derivative.workspace = true +integer-encoding = "4.0.0" +sha2 = { workspace = true, features = ["asm"] } +uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] } +governor = "0.6.3" +ahash.workspace = true +reqwest = { workspace = true, features = ["blocking"] } +serde_with = { workspace = true, features = ["time_0_3"] } +toml.workspace = true +tracing.workspace = true +time.workspace = true +thiserror.workspace = true +eyre.workspace = true +serde.workspace = true +derive_more.workspace = true +serde_json.workspace = true + + +# alloy +alloy-primitives.workspace = true + +strum = "0.27" +strum_macros = "0.27" +tokio = { version = "1.40.0", default-features = false, features = [ + "sync", + "time", + "rt-multi-thread", + "macros", + "test-util" +] } + +clickhouse = { git = "https://github.com/ClickHouse/clickhouse-rs", rev = "8cf3d2e138dd121367fa10e875d3f91374b075b2", features = [ + "inserter", + "time", + "uuid", + "native-tls" +] } +clickhouse-derive = { version = "0.2.0" } +redb = { version = "3.1.0" } +tempfile = { version = "3.23.0" } +rand = "0.9.2" +futures = { version = "0.3" } +futures-util = { version = "0.3.31" } + +# tracing +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tracing-futures = "0.2.5" + +# misc +auto_impl = "1.3.0" +dyn-clone = "1.0.20" + +[features] +default = [] +test-utils = [] diff --git a/crates/rbuilder-utils/src/backoff.rs b/crates/rbuilder-utils/src/backoff.rs new file mode 100644 index 000000000..542eba7aa --- /dev/null +++ b/crates/rbuilder-utils/src/backoff.rs @@ -0,0 +1,325 @@ +//! Time-related utilies. + +use std::{ + future::{poll_fn, Future as _}, + iter::Iterator, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +/// A random number generator for applying jitter to [`std::time::Duration`]. +#[derive(Debug, Clone)] +pub(crate) struct Jitter; + +impl Jitter { + /// Apply jitter to provided duration, by multiplying it for a random number between 0 and 2. + pub(crate) fn apply_to(duration: Duration) -> Duration { + duration.mul_f64(rand::random::() * 2_f64) + } +} + +/// A retry strategy driven by exponential back-off. +/// +/// The power corresponds to the number of past attempts. +/// +/// Taken from +#[derive(Debug, Clone)] +pub(crate) struct ExponentialBackoff { + current: u64, + base: u64, + factor: u64, + max_delay: Option, +} + +#[allow(dead_code)] +impl ExponentialBackoff { + /// Constructs a new exponential back-off strategy, + /// given a base duration in milliseconds. + /// + /// The resulting duration is calculated by taking the base to the `n`-th power, + /// where `n` denotes the number of past attempts. + pub(crate) fn from_millis(base: u64) -> ExponentialBackoff { + ExponentialBackoff { + current: base, + base, + factor: 1u64, + max_delay: None, + } + } + + /// A multiplicative factor that will be applied to the retry delay. + /// + /// For example, using a factor of `1000` will make each delay in units of seconds. + /// + /// Default factor is `1`. + pub(crate) fn factor(mut self, factor: u64) -> ExponentialBackoff { + self.factor = factor; + self + } + + /// Apply a maximum delay. No retry delay will be longer than this `Duration`. + pub(crate) fn max_delay(mut self, duration: Duration) -> ExponentialBackoff { + self.max_delay = Some(duration); + self + } + + /// Reset the backoff to the initial state. + pub(crate) fn reset(&mut self) { + self.current = self.base; + } +} + +impl Iterator for ExponentialBackoff { + type Item = Duration; + + // TODO: change this logic, so that we can always multiply base by a factor. + // e.g. base = 8, factor = 2 yields to: 8ms, 16ms, 32ms, 64ms, ... + fn next(&mut self) -> Option { + // set delay duration by applying factor + let duration = if let Some(duration) = self.current.checked_mul(self.factor) { + Duration::from_millis(duration) + } else { + Duration::from_millis(u64::MAX) + }; + + // check if we reached max delay + if let Some(ref max_delay) = self.max_delay { + if duration > *max_delay { + return Some(*max_delay); + } + } + + if let Some(next) = self.current.checked_mul(self.base) { + self.current = next; + } else { + self.current = u64::MAX; + } + + Some(duration) + } +} + +/// An interval heavily inspired by [`tokio::time::Interval`], that supports exponential back-off +/// and jitter. +#[derive(Debug)] +pub(crate) struct BackoffInterval { + /// Future that completes the next time the `Interval` yields a value. + delay: Pin>, + + /// The exponential backoff configuration. + backoff: ExponentialBackoff, + + /// An optional jitter to apply to the ticks. + jitter: bool, +} + +impl BackoffInterval { + /// Creates a new interval that ticks immediately. + pub(crate) fn new(backoff: ExponentialBackoff) -> Self { + let start = tokio::time::Instant::now(); + let delay = Box::pin(tokio::time::sleep_until(start)); + Self { + delay, + backoff, + jitter: false, + } + } + + pub(crate) fn with_jitter(mut self) -> Self { + self.jitter = true; + self + } + + pub(crate) fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll { + // Wait for the delay to be done + std::task::ready!(Pin::new(&mut self.delay).poll(cx)); + + // Get the time when we were schedulued to tick + let timeout = self.delay.deadline(); + + // CHANGE: use custom logic that takes into a account backoff and jitter to calculate new + // instant. + let next = self.next(); + + // CHANGE: Unfortunately, [`tokio::time::Sleep::reset_without_reregister`] isn't + // pub(crate)lic so we have to register the waker again. + self.delay.as_mut().reset(next); + + Poll::Ready(timeout) + } + + /// Completes when the next instant in the interval has been reached. + pub(crate) async fn tick(&mut self) -> tokio::time::Instant { + let instant = poll_fn(|cx| self.poll_tick(cx)); + + instant.await + } + + /// Resets backoff to the initial state, and the next tick will happen after the initial period + /// returned by [`ExponentialBackoff`]. + pub(crate) fn reset(&mut self) { + self.backoff.reset(); + let next = self.next(); + self.delay.as_mut().reset(next); + } + + /// Return the next instant at which the interval should tick. + fn next(&mut self) -> tokio::time::Instant { + let now = tokio::time::Instant::now(); + // We provide a [`tokio::time::MissedTickBehavior::Delay`] behavior but we also add backoff + // and jitter if the user configured it. + let mut period = self + .backoff + .next() + .expect("ExponentialBackoff never returns None"); + if self.jitter { + period = Jitter::apply_to(period); + } + now.checked_add(period).expect("no overflow") + } +} + +impl Default for BackoffInterval { + fn default() -> Self { + // So will return 4, 16, 64, 256, 1024, ... milliseconds with jitter. + Self::new(ExponentialBackoff::from_millis(4).max_delay(Duration::from_millis(8192))) + .with_jitter() + } +} + +#[cfg(test)] +mod tests { + use tokio::time::{Duration, Instant}; + + use super::*; + + #[test] + fn exp_backoff_returns_some_exponential_base_10() { + let mut s = ExponentialBackoff::from_millis(10); + + assert_eq!(s.next(), Some(Duration::from_millis(10))); + assert_eq!(s.next(), Some(Duration::from_millis(100))); + assert_eq!(s.next(), Some(Duration::from_millis(1000))); + } + + #[test] + fn exp_backoff_returns_some_exponential_base_2() { + let mut s = ExponentialBackoff::from_millis(2); + + assert_eq!(s.next(), Some(Duration::from_millis(2))); + assert_eq!(s.next(), Some(Duration::from_millis(4))); + assert_eq!(s.next(), Some(Duration::from_millis(8))); + } + + #[test] + fn exp_backoff_saturates_at_maximum_value() { + let mut s = ExponentialBackoff::from_millis(u64::MAX - 1); + + assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX - 1))); + assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX))); + assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX))); + } + + #[test] + fn exp_backoff_can_use_factor_to_get_seconds() { + let factor = 1000; + let mut s = ExponentialBackoff::from_millis(2).factor(factor); + + assert_eq!(s.next(), Some(Duration::from_secs(2))); + assert_eq!(s.next(), Some(Duration::from_secs(4))); + assert_eq!(s.next(), Some(Duration::from_secs(8))); + } + + #[test] + fn exp_backoff_stops_increasing_at_max_delay() { + let mut s = ExponentialBackoff::from_millis(2).max_delay(Duration::from_millis(4)); + + assert_eq!(s.next(), Some(Duration::from_millis(2))); + assert_eq!(s.next(), Some(Duration::from_millis(4))); + assert_eq!(s.next(), Some(Duration::from_millis(4))); + } + + #[test] + fn exp_backoff_returns_max_when_max_less_than_base() { + let mut s = ExponentialBackoff::from_millis(20).max_delay(Duration::from_millis(10)); + + assert_eq!(s.next(), Some(Duration::from_millis(10))); + assert_eq!(s.next(), Some(Duration::from_millis(10))); + } + + // Tests with `start_paused = true` consists of tests with [`tokio::time::pause`] and + // require manual advancement of time with [`tokio::time::advance`] or with sleeps. + + #[tokio::test(start_paused = true)] + async fn backoff_interval_ticks_as_expected() { + let backoff = ExponentialBackoff::from_millis(2); + let mut backoff_clone = backoff.clone(); + let mut interval = BackoffInterval::new(backoff); + + let before = Instant::now(); + let t1 = interval.tick().await; + assert_eq!(t1, before); + let t2 = interval.tick().await; + assert_eq!(t2, t1 + backoff_clone.next().unwrap()); + let t3 = interval.tick().await; + assert_eq!(t3, t2 + backoff_clone.next().unwrap()); + let t4 = interval.tick().await; + assert_eq!(t4, t3 + backoff_clone.next().unwrap()); + } + + #[tokio::test(start_paused = true)] + async fn backoff_interval_resets_properly() { + let backoff = ExponentialBackoff::from_millis(2); + let mut backoff_clone = backoff.clone(); + let mut interval = BackoffInterval::new(backoff); + + interval.tick().await; + interval.tick().await; + interval.tick().await; + interval.tick().await; + + interval.reset(); + let now = Instant::now(); + let expected_delay = backoff_clone.next().unwrap(); + let actual = interval.tick().await; + + assert_eq!(now + expected_delay, actual); + } + + #[tokio::test(start_paused = true)] + async fn backoff_interval_with_jitter_works() { + // No jitter + { + let beginning = Instant::now(); + + let backoff = ExponentialBackoff::from_millis(5); + let mut backoff_clone = backoff.clone(); + let mut interval = BackoffInterval::new(backoff); + + let t1 = interval.tick().await; + assert_eq!(t1, beginning); // First tick is immediate + + let t2 = interval.tick().await; + assert_eq!(t2, t1 + backoff_clone.next().unwrap()); + + let t3 = interval.tick().await; + assert_eq!(t3, t2 + backoff_clone.next().unwrap()); + } + + // Jitter + { + let beginning = Instant::now(); + + let backoff = ExponentialBackoff::from_millis(5); + let mut backoff_clone = backoff.clone(); + let mut interval = BackoffInterval::new(backoff).with_jitter(); + let t1 = interval.tick().await; + assert_eq!(t1, beginning); // First tick is immediate + + // Next tick will be 5ms later, but jitter changes it. + let t2 = interval.tick().await; + assert_ne!(t2, t1 + backoff_clone.next().unwrap()); + } + } +} diff --git a/crates/rbuilder-utils/src/clickhouse/backup/macros.rs b/crates/rbuilder-utils/src/clickhouse/backup/macros.rs new file mode 100644 index 000000000..5b08591c4 --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/backup/macros.rs @@ -0,0 +1,54 @@ +//! Helpful macros spawning clickhouse indexer tasks. + +// Rationale: a simple text-replacement macro was much more effective compared to fighting the +// compiler with additional trait bounds on the [`clickhouse::Row`] trait. + +#[macro_export] +macro_rules! spawn_clickhouse_inserter { + ($executor:ident, $runner:ident, $name:expr, $target:expr) => {{ + $executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { + let mut shutdown_guard = None; + tokio::select! { + _ = $runner.run_loop() => { + tracing::info!(target: $target, "clickhouse {} indexer channel closed", $name); + } + guard = shutdown => { + tracing::info!(target: $target, "Received shutdown for {} indexer, performing cleanup", $name); + shutdown_guard = Some(guard); + }, + } + + match $runner.end().await { + Ok(quantities) => { + tracing::info!(target: $target, ?quantities, "finalized clickhouse {} inserter", $name); + } + Err(e) => { + tracing::error!(target: $target, ?e, "failed to write end insertion of {} to indexer", $name); + } + } + + drop(shutdown_guard); + }); + }}; +} + +#[macro_export] +macro_rules! spawn_clickhouse_backup { + ($executor:ident, $backup:ident, $name: expr, $target:expr) => {{ + $executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { + let mut shutdown_guard = None; + tokio::select! { + _ = $backup.run() => { + tracing::info!(target: $target, "clickhouse {} backup channel closed", $name); + } + guard = shutdown => { + tracing::info!(target: $target, "Received shutdown for {} backup, performing cleanup", $name); + shutdown_guard = Some(guard); + }, + } + + $backup.end().await; + drop(shutdown_guard); + }); + }}; +} diff --git a/crates/rbuilder-utils/src/clickhouse/backup/metrics.rs b/crates/rbuilder-utils/src/clickhouse/backup/metrics.rs new file mode 100644 index 000000000..03293fb5a --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/backup/metrics.rs @@ -0,0 +1,66 @@ +use crate::clickhouse::Quantities; +use std::time::Duration; + +/// Metrics updated by the clickhouse_with_backup mod. +pub trait Metrics { + fn increment_write_failures(err: String); + fn process_quantities(quantities: &Quantities); + fn record_batch_commit_time(duration: Duration); + fn increment_commit_failures(err: String); + fn set_queue_size(size: usize, order: &'static str); + fn set_disk_backup_size(size_bytes: u64, batches: usize, order: &'static str); + fn increment_backup_disk_errors(order: &'static str, error: &str); + fn set_memory_backup_size(size_bytes: u64, batches: usize, order: &'static str); + fn process_backup_data_lost_quantities(quantities: &Quantities); + fn process_backup_data_quantities(quantities: &Quantities); + fn set_backup_empty_size(order: &'static str); +} + +/// Feeling lazy? Grafana is too expensive for you? +/// Use NullMetrics! +pub struct NullMetrics {} +impl Metrics for NullMetrics { + fn increment_write_failures(_err: String) { + // No-op + } + + fn process_quantities(_quantities: &Quantities) { + // No-op + } + + fn record_batch_commit_time(_duration: Duration) { + // No-op + } + + fn increment_commit_failures(_err: String) { + // No-op + } + + fn set_queue_size(_size: usize, _order: &'static str) { + // No-op + } + + fn set_disk_backup_size(_size_bytes: u64, _batches: usize, _order: &'static str) { + // No-op + } + + fn increment_backup_disk_errors(_order: &'static str, _error: &str) { + // No-op + } + + fn set_memory_backup_size(_size_bytes: u64, _batches: usize, _order: &'static str) { + // No-op + } + + fn process_backup_data_lost_quantities(_quantities: &Quantities) { + // No-op + } + + fn process_backup_data_quantities(_quantities: &Quantities) { + // No-op + } + + fn set_backup_empty_size(_order: &'static str) { + // No-op + } +} diff --git a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs new file mode 100644 index 000000000..67d800bd3 --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs @@ -0,0 +1,766 @@ +pub mod macros; +pub mod metrics; +pub mod primitives; + +use std::{ + collections::VecDeque, + marker::PhantomData, + path::PathBuf, + sync::{Arc, RwLock}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use clickhouse::inserter::Inserter; +use derive_more::{Deref, DerefMut}; +use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata}; +use strum::AsRefStr; +use tokio::sync::mpsc; + +use crate::{ + backoff::BackoffInterval, + clickhouse::{ + backup::{ + metrics::Metrics, + primitives::{ClickhouseIndexableOrder, ClickhouseRowExt}, + }, + indexer::{ + default_disk_backup_database_path, MAX_DISK_BACKUP_SIZE_BYTES, + MAX_MEMORY_BACKUP_SIZE_BYTES, + }, + Quantities, + }, + format::FormatBytes, + tasks::TaskExecutor, +}; + +const TARGET: &str = "clickhouse_with_backup::backup"; + +/// A type alias for disk backup keys. +type DiskBackupKey = u128; +/// A type alias for disk backup tables. +type Table<'a> = redb::TableDefinition<'a, DiskBackupKey, Vec>; + +/// The source of a backed-up failed commit. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BackupSource { + Disk(DiskBackupKey), + Memory, +} + +/// Generates a new unique key for disk backup entries, based on current system time in +/// milliseconds. +fn new_disk_backup_key() -> DiskBackupKey { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_micros() +} + +/// Represents data we failed to commit to clickhouse, including the rows and some information +/// about the size of such data. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct FailedCommit { + /// The actual rows we were trying to commit. + rows: Vec, + /// The quantities related to such commit, like the total size in bytes. + quantities: Quantities, +} + +impl FailedCommit { + pub fn new(rows: Vec, quantities: Quantities) -> Self { + Self { rows, quantities } + } +} + +impl Default for FailedCommit { + fn default() -> Self { + Self { + rows: Vec::new(), + quantities: Quantities::ZERO, + } + } +} + +/// A [`FailedCommit`] along with its source (disk or memory). +struct RetrievedFailedCommit { + source: BackupSource, + commit: FailedCommit, +} + +/// A wrapper over a [`VecDeque`] of [`FailedCommit`] with added functionality. +/// +/// Newly failed commits are pushed to the front of the queue, so the oldest are at the back. +#[derive(Deref, DerefMut)] +struct FailedCommits(VecDeque>); + +impl FailedCommits { + /// Get the aggregated quantities of the failed commits; + #[inline] + fn quantities(&self) -> Quantities { + let total_size_bytes = self.iter().map(|c| c.quantities.bytes).sum::(); + let total_rows = self.iter().map(|c| c.quantities.rows).sum::(); + let total_transactions = self.iter().map(|c| c.quantities.transactions).sum::(); + + Quantities { + bytes: total_size_bytes, + rows: total_rows, + transactions: total_transactions, + } + } +} + +impl Default for FailedCommits { + fn default() -> Self { + Self(VecDeque::default()) + } +} + +/// Configuration for the [`DiskBackup`] of failed commits. +#[derive(Debug)] +pub struct DiskBackupConfig { + /// The path where the backup database is stored. + path: PathBuf, + /// The maximum size in bytes for holding past failed commits on disk. + max_size_bytes: u64, + /// The interval at which buffered writes are flushed to disk. + flush_interval: tokio::time::Interval, +} + +impl DiskBackupConfig { + pub fn new() -> Self { + Self { + path: default_disk_backup_database_path().into(), + max_size_bytes: MAX_DISK_BACKUP_SIZE_BYTES, + flush_interval: tokio::time::interval(Duration::from_secs(30)), + } + } + + pub fn with_path>(mut self, path: Option

) -> Self { + if let Some(p) = path { + self.path = p.into(); + } + self + } + + pub fn with_max_size_bytes(mut self, max_size_bytes: Option) -> Self { + if let Some(max_size_bytes) = max_size_bytes { + self.max_size_bytes = max_size_bytes; + } + self + } + + #[allow(dead_code)] + pub fn with_immediate_commit_interval(mut self, interval: Option) -> Self { + if let Some(interval) = interval { + self.flush_interval = tokio::time::interval(interval); + } + self + } +} + +impl Default for DiskBackupConfig { + fn default() -> Self { + Self::new() + } +} + +impl Clone for DiskBackupConfig { + fn clone(&self) -> Self { + Self { + path: self.path.clone(), + max_size_bytes: self.max_size_bytes, + flush_interval: tokio::time::interval(self.flush_interval.period()), + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MemoryBackupConfig { + /// The maximum size in bytes for holding past failed commits in-memory. Once we go over this + /// threshold, pressure is applied and old commits are dropped. + pub max_size_bytes: u64, +} + +impl MemoryBackupConfig { + pub fn new(max_size_bytes: u64) -> Self { + Self { max_size_bytes } + } +} + +impl Default for MemoryBackupConfig { + fn default() -> Self { + Self { + max_size_bytes: MAX_MEMORY_BACKUP_SIZE_BYTES, + } + } +} + +/// Data retrieved from disk, along with its key and some stats. +pub(crate) struct DiskRetrieval { + pub(crate) key: K, + pub(crate) value: V, + pub(crate) stats: BackupSourceStats, +} + +/// Errors that can occur during disk backup operations. Mostly wrapping redb and serde errors. +#[derive(Debug, thiserror::Error, AsRefStr)] +pub(crate) enum DiskBackupError { + #[error(transparent)] + Database(#[from] redb::DatabaseError), + #[error(transparent)] + Transactions(#[from] redb::TransactionError), + #[error(transparent)] + Table(#[from] redb::TableError), + #[error(transparent)] + Storage(#[from] redb::StorageError), + #[error(transparent)] + Commit(#[from] redb::CommitError), + #[error(transparent)] + Durability(#[from] redb::SetDurabilityError), + #[error(transparent)] + Compaction(#[from] redb::CompactionError), + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + #[error("backup size limit exceeded: {0} bytes")] + SizeExceeded(u64), + #[error("failed to join flushing task")] + JoinTask, +} + +/// A disk backup for failed commits. This handle to a database allows to write only to one table +/// for scoped access. If you want to write to another table, clone it using +/// [`Self::clone_with_table`]. +#[derive(Debug)] +pub struct DiskBackup { + db: Arc>, + config: DiskBackupConfig, + + _marker: PhantomData, +} + +impl DiskBackup { + pub fn new( + config: DiskBackupConfig, + task_executor: &TaskExecutor, + ) -> Result { + // Ensure all parent directories exist, so that the database can be initialized correctly. + if let Some(parent) = config.path.parent() { + std::fs::create_dir_all(parent)?; + } + + let db = redb::Database::create(&config.path)?; + + let disk_backup = Self { + db: Arc::new(RwLock::new(db)), + config, + _marker: Default::default(), + }; + + task_executor.spawn({ + let disk_backup: Self = disk_backup.clone(); + async move { + disk_backup.flush_routine().await; + } + }); + + Ok(disk_backup) + } + + /// Like `clone`, but allows to change the type parameter `U`. + pub fn clone_to(&self) -> DiskBackup { + DiskBackup { + db: self.db.clone(), + config: self.config.clone(), + _marker: Default::default(), + } + } +} + +impl Clone for DiskBackup { + fn clone(&self) -> Self { + Self { + db: self.db.clone(), + config: self.config.clone(), + _marker: Default::default(), + } + } +} + +impl DiskBackup { + /// Saves a new failed commit to disk. `commit_immediately` indicates whether to force + /// durability on write. + fn save(&mut self, data: &FailedCommit) -> Result { + let table_def = Table::new(T::ORDER); + // NOTE: not efficient, but we don't expect to store a lot of data here. + let bytes = serde_json::to_vec(&data)?; + + let writer = self.db.write().expect("not poisoned").begin_write()?; + let (stored_bytes, rows) = { + let mut table = writer.open_table(table_def)?; + if table.stats()?.stored_bytes() > self.config.max_size_bytes { + return Err(DiskBackupError::SizeExceeded(self.config.max_size_bytes)); + } + + table.insert(new_disk_backup_key(), bytes)?; + + (table.stats()?.stored_bytes(), table.len()?) + }; + writer.commit()?; + + Ok(BackupSourceStats { + size_bytes: stored_bytes, + total_batches: rows as usize, + }) + } + + /// Retrieves the oldest failed commit from disk, if any. + fn retrieve_oldest( + &mut self, + ) -> Result>>, DiskBackupError> { + let table_def = Table::new(T::ORDER); + + let reader = self.db.read().expect("not poisoned").begin_read()?; + let table = match reader.open_table(table_def) { + Ok(t) => t, + Err(redb::TableError::TableDoesNotExist(_)) => { + // No table means no data. + return Ok(None); + } + Err(e) => { + return Err(e.into()); + } + }; + + let stored_bytes = table.stats()?.stored_bytes(); + let rows = table.len()? as usize; + let stats = BackupSourceStats { + size_bytes: stored_bytes, + total_batches: rows, + }; + + // Retreives in sorted order. + let Some(entry_res) = table.iter()?.next() else { + return Ok(None); + }; + let (key, rows_raw) = entry_res?; + let commit: FailedCommit = serde_json::from_slice(&rows_raw.value())?; + + Ok(Some(DiskRetrieval { + key: key.value(), + value: commit, + stats, + })) + } + + /// Deletes the failed commit with the given key from disk. + fn delete(&mut self, key: DiskBackupKey) -> Result { + let table_def = Table::new(T::ORDER); + + let mut writer = self.db.write().expect("not poisoned").begin_write()?; + writer.set_durability(redb::Durability::Immediate)?; + + let (stored_bytes, rows) = { + let mut table = writer.open_table(table_def)?; + table.remove(key)?; + (table.stats()?.stored_bytes(), table.len()?) + }; + writer.commit()?; + + Ok(BackupSourceStats { + size_bytes: stored_bytes, + total_batches: rows as usize, + }) + } + + /// Explicity flushes any pending writes to disk. This is async to avoid blocking the main + /// thread. + async fn flush(&mut self) -> Result<(), DiskBackupError> { + let db = self.db.clone(); + + // Since this can easily block by a second or two, send it to a blocking thread. + tokio::task::spawn_blocking(move || { + let mut db = db.write().expect("not poisoned"); + let mut writer = db.begin_write()?; + + // If there is no data to flush, don't do anything. + if writer.stats()?.stored_bytes() == 0 { + return Ok(()); + } + + writer.set_durability(redb::Durability::Immediate)?; + writer.commit()?; + + db.compact()?; + Ok(()) + }) + .await + .map_err(|_| DiskBackupError::JoinTask)? + } + + /// Takes an instance of self and performs a flush routine if the immediate flush interval has + /// ticked. + async fn flush_routine(mut self) { + loop { + self.config.flush_interval.tick().await; + let start = Instant::now(); + match self.flush().await { + Ok(_) => { + tracing::debug!(target: TARGET, elapsed = ?start.elapsed(), "flushed backup write buffer to disk"); + } + Err(e) => { + tracing::error!(target: TARGET, ?e, "failed to flush backup write buffer to disk"); + } + } + } + } +} + +/// Statistics about the Clickhouse data stored in a certain backup source (disk or memory). +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct BackupSourceStats { + /// The total size in bytes of failed commit batches stored. + size_bytes: u64, + /// The total number of failed commit batches stored. + total_batches: usize, +} + +/// An in-memory backup for failed commits. +#[derive(Deref, DerefMut)] +struct MemoryBackup { + /// The in-memory cache of failed commits. + #[deref] + #[deref_mut] + failed_commits: FailedCommits, + /// The configuration for the in-memory backup. + config: MemoryBackupConfig, + /// The statistics about the in-memory backup. + stats: BackupSourceStats, +} + +impl MemoryBackup { + /// Updates the internal statistics and returns them. + fn update_stats(&mut self) -> BackupSourceStats { + let quantities = self.failed_commits.quantities(); + let new_len = self.failed_commits.len(); + + self.stats = BackupSourceStats { + size_bytes: quantities.bytes, + total_batches: new_len, + }; + self.stats + } + + /// Checks whether the threshold for maximum size has been exceeded. + fn threshold_exceeded(&self) -> bool { + self.stats.size_bytes > self.config.max_size_bytes && self.failed_commits.len() > 1 + } + + /// Drops the oldest failed commit if the threshold has been exceeded, returning the updated + /// stats + fn drop_excess(&mut self) -> Option<(BackupSourceStats, Quantities)> { + if self.threshold_exceeded() { + self.failed_commits.pop_back(); + Some((self.update_stats(), self.failed_commits.quantities())) + } else { + None + } + } + + /// Saves a new failed commit into memory, updating the stats. + fn save(&mut self, data: FailedCommit) -> BackupSourceStats { + self.failed_commits.push_front(data); + self.update_stats() + } + + /// Retrieves the oldest failed commit from memory, updating the stats. + fn retrieve_oldest(&mut self) -> Option> { + let oldest = self.failed_commits.pop_back(); + self.update_stats(); + oldest + } +} + +// Needed otherwise requires T: Default +impl Default for MemoryBackup { + fn default() -> Self { + Self { + failed_commits: FailedCommits::default(), + config: MemoryBackupConfig::default(), + stats: BackupSourceStats::default(), + } + } +} + +/// An backup actor for Clickhouse data. This actor receives [`FailedCommit`]s and saves them on +/// disk and in memory in case of failure of the former, and periodically tries to commit them back +/// again to Clickhouse. Since memory is finite, there is an upper bound on how much memory this +/// data structure holds. Once this has been hit, pressure applies, meaning that we try again a +/// certain failed commit for a finite number of times, and then we discard it to accomdate new +/// data. +pub struct Backup { + /// The receiver of failed commit attempts. + /// + /// Rationale for sending multiple rows instead of sending rows: the backup abstraction must + /// periodically block to write data to the inserter and try to commit it to clickhouse. Each + /// attempt results in doing the previous step. This could clog the channel which will receive + /// individual rows, leading to potential row losses. + /// + /// By sending backup data less often, we give time gaps for these operation to be performed. + rx: mpsc::Receiver>, + /// The disk cache of failed commits. + disk_backup: DiskBackup, + /// The in-memory cache of failed commits. + memory_backup: MemoryBackup, + /// A clickhouse inserter for committing again the data. + inserter: Inserter, + /// The interval at which we try to backup data. + interval: BackoffInterval, + + /// A failed commit retrieved from either disk or memory, waiting to be retried. + last_cached: Option>, + + /// Whether to use only the in-memory backup (for testing purposes). + #[cfg(any(test, feature = "test-utils"))] + use_only_memory_backup: bool, + _metrics_phantom: std::marker::PhantomData, +} + +impl Backup { + pub fn new( + rx: mpsc::Receiver>, + inserter: Inserter, + disk_backup: DiskBackup, + ) -> Self { + Self { + rx, + inserter, + interval: Default::default(), + memory_backup: MemoryBackup::default(), + disk_backup, + last_cached: None, + #[cfg(any(test, feature = "test-utils"))] + use_only_memory_backup: false, + _metrics_phantom: std::marker::PhantomData, + } + } + + /// Override the default memory backup configuration. + pub fn with_memory_backup_config(mut self, config: MemoryBackupConfig) -> Self { + self.memory_backup.config = config; + self + } + + /// Backs up a failed commit, first trying to write to disk, then to memory. + fn backup(&mut self, failed_commit: FailedCommit) { + let quantities = failed_commit.quantities; + tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit"); + + #[cfg(any(test, feature = "test-utils"))] + if self.use_only_memory_backup { + self.memory_backup.save(failed_commit); + self.last_cached = self + .last_cached + .take() + .filter(|cached| cached.source != BackupSource::Memory); + return; + } + + let start = Instant::now(); + match self.disk_backup.save(&failed_commit) { + Ok(stats) => { + tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk"); + MetricsType::set_disk_backup_size(stats.size_bytes, stats.total_batches, T::ORDER); + + return; + } + Err(e) => { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit, trying in-memory"); + MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + } + }; + + let stats = self.memory_backup.save(failed_commit); + MetricsType::set_memory_backup_size(stats.size_bytes, stats.total_batches, T::ORDER); + tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory"); + + if let Some((stats, oldest_quantities)) = self.memory_backup.drop_excess() { + tracing::warn!(target: TARGET, order = T::ORDER, ?stats, "failed commits exceeded max memory backup size, dropping oldest"); + MetricsType::process_backup_data_lost_quantities(&oldest_quantities); + // Clear the cached last commit if it was from memory and we just dropped it. + self.last_cached = self + .last_cached + .take() + .filter(|cached| cached.source != BackupSource::Memory); + } + } + + /// Retrieves the oldest failed commit, first trying from memory, then from disk. + fn retrieve_oldest(&mut self) -> Option> { + if let Some(cached) = self.last_cached.take() { + tracing::debug!(target: TARGET, order = T::ORDER, rows = cached.commit.rows.len(), "retrieved last cached failed commit"); + return Some(cached); + } + + if let Some(commit) = self.memory_backup.retrieve_oldest() { + tracing::debug!(target: TARGET, order = T::ORDER, rows = commit.rows.len(), "retrieved oldest failed commit from memory"); + return Some(RetrievedFailedCommit { + source: BackupSource::Memory, + commit, + }); + } + + match self.disk_backup.retrieve_oldest() { + Ok(maybe_commit) => { + maybe_commit.inspect(|data| { + tracing::debug!(target: TARGET, order = T::ORDER, rows = data.stats.total_batches, "retrieved oldest failed commit from disk"); + }) + .map(|data| RetrievedFailedCommit { + source: BackupSource::Disk(data.key), + commit: data.value, + }) + } + Err(e) => { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to retrieve oldest failed commit from disk"); + MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + None + } + } + } + + /// Populates the inserter with the rows from the given failed commit. + async fn populate_inserter(&mut self, commit: &FailedCommit) { + for row in &commit.rows { + let value_ref = T::to_row_ref(row); + + if let Err(e) = self.inserter.write(value_ref).await { + MetricsType::increment_write_failures(e.to_string()); + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter"); + continue; + } + } + } + + /// Purges a committed failed commit from disk, if applicable. + async fn purge_commit(&mut self, retrieved: &RetrievedFailedCommit) { + if let BackupSource::Disk(key) = retrieved.source { + let start = Instant::now(); + match self.disk_backup.delete(key) { + Ok(stats) => { + tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); + MetricsType::set_disk_backup_size( + stats.size_bytes, + stats.total_batches, + T::ORDER, + ); + } + Err(e) => { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to purge failed commit from disk"); + } + } + tracing::debug!(target: TARGET, order = T::ORDER, "purged committed failed commit from disk"); + } + } + + /// Run the backup actor until it is possible to receive messages. + /// + /// If some data were stored on disk previously, they will be retried first. + pub async fn run(&mut self) { + loop { + tokio::select! { + maybe_failed_commit = self.rx.recv() => { + let Some(failed_commit) = maybe_failed_commit else { + tracing::error!(target: TARGET, order = T::ORDER, "backup channel closed"); + break; + }; + + self.backup(failed_commit); + } + _ = self.interval.tick() => { + let Some(oldest) = self.retrieve_oldest() else { + self.interval.reset(); + MetricsType::set_backup_empty_size(T::ORDER); + continue // Nothing to do! + }; + + self.populate_inserter(&oldest.commit).await; + + let start = Instant::now(); + match self.inserter.force_commit().await { + Ok(quantities) => { + tracing::info!(target: TARGET, order = T::ORDER, ?quantities, "successfully backed up"); + MetricsType::process_backup_data_quantities(&quantities.into()); + MetricsType::record_batch_commit_time(start.elapsed()); + self.interval.reset(); + self.purge_commit(&oldest).await; + } + Err(e) => { + tracing::error!(target: TARGET, order = T::ORDER, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); + MetricsType::increment_commit_failures(e.to_string()); + self.last_cached = Some(oldest); + continue; + } + } + } + } + } + } + + /// To call on shutdown, tries make a last-resort attempt to post back to Clickhouse all + /// in-memory data. + pub async fn end(mut self) { + for failed_commit in self.memory_backup.failed_commits.drain(..) { + for row in &failed_commit.rows { + let value_ref = T::to_row_ref(row); + + if let Err(e) = self.inserter.write(value_ref).await { + tracing::error!( target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter during shutdown"); + MetricsType::increment_write_failures(e.to_string()); + continue; + } + } + if let Err(e) = self.inserter.force_commit().await { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit backup to CH during shutdown, trying disk"); + MetricsType::increment_commit_failures(e.to_string()); + } + + if let Err(e) = self.disk_backup.save(&failed_commit) { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit to disk backup during shutdown"); + MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + } + } + + if let Err(e) = self.disk_backup.flush().await { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to flush disk backup during shutdown"); + MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); + } else { + tracing::info!(target: TARGET, order = T::ORDER, "flushed disk backup during shutdown"); + } + + if let Err(e) = self.inserter.end().await { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to end backup inserter during shutdown"); + } else { + tracing::info!(target: TARGET, order = T::ORDER, "successfully ended backup inserter during shutdown"); + } + } +} + +#[cfg(any(test, feature = "test-utils"))] +impl Backup { + pub fn new_test( + rx: mpsc::Receiver>, + inserter: Inserter, + disk_backup: DiskBackup, + use_only_memory_backup: bool, + ) -> Self { + Self { + rx, + inserter, + interval: Default::default(), + memory_backup: MemoryBackup::default(), + disk_backup, + last_cached: None, + use_only_memory_backup, + _metrics_phantom: PhantomData, + } + } +} diff --git a/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs b/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs new file mode 100644 index 000000000..9bc53031b --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/backup/primitives.rs @@ -0,0 +1,34 @@ +use alloy_primitives::B256; +use clickhouse::{Row, RowWrite}; +use serde::{de::DeserializeOwned, Serialize}; + +pub trait ClickhouseRowExt: + Row + RowWrite + Serialize + DeserializeOwned + Sync + Send + 'static +{ + /// The type of such row, e.g. "bundles" or "bundle_receipts". Used as backup db table name and + /// for informational purposes. + const ORDER: &'static str; + + /// An identifier of such row. + fn hash(&self) -> B256; + + /// Internal function that takes the inner row types and extracts the reference needed for + /// Clickhouse inserter functions like `Inserter::write`. While a default implementation is not + /// provided, it should suffice to simply return `row`. + fn to_row_ref(row: &Self) -> &::Value<'_>; +} + +/// An high-level order type that can be indexed in clickhouse. +pub trait ClickhouseIndexableOrder: Sized { + /// The associated inner row type that can be serialized into Clickhouse data. + type ClickhouseRowType: ClickhouseRowExt; + + /// The type of such order, e.g. "bundles" or "transactions". For informational purposes. + const ORDER: &'static str; + + /// An identifier of such order. + fn hash(&self) -> B256; + + /// Converts such order into the associated Clickhouse row type. + fn to_row(self, builder_name: String) -> Self::ClickhouseRowType; +} diff --git a/crates/rbuilder-utils/src/clickhouse/indexer.rs b/crates/rbuilder-utils/src/clickhouse/indexer.rs new file mode 100644 index 000000000..bfc732ed8 --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/indexer.rs @@ -0,0 +1,233 @@ +//! Indexing functionality powered by Clickhouse. + +use std::{ + fmt::Debug, + time::{Duration, Instant}, +}; + +/// The tracing target for this indexer crate. @PendingDX REMOVE +const TARGET: &str = "indexer"; + +use clickhouse::{ + error::Result as ClickhouseResult, inserter::Inserter, Client as ClickhouseClient, Row, +}; +use tokio::sync::mpsc; + +use crate::{ + clickhouse::{ + backup::{ + metrics::Metrics, + primitives::{ClickhouseIndexableOrder, ClickhouseRowExt}, + FailedCommit, + }, + Quantities, + }, + metrics::Sampler, +}; + +/// A default maximum size in bytes for the in-memory backup of failed commits. +pub const MAX_MEMORY_BACKUP_SIZE_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB +/// A default maximum size in bytes for the disk backup of failed commits. +pub const MAX_DISK_BACKUP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB + +/// The default path where the backup database is stored. For tests, a temporary file is used. +pub fn default_disk_backup_database_path() -> String { + #[cfg(test)] + return tempfile::NamedTempFile::new() + .unwrap() + .path() + .to_string_lossy() + .to_string(); + #[cfg(not(test))] + { + use std::path::PathBuf; + + let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + PathBuf::from(home) + .join(".buildernet-orderflow-proxy") + .join("clickhouse_backup.db") + .to_string_lossy() + .to_string() + } +} + +/// An clickhouse inserter with some sane defaults. +pub fn default_inserter(client: &ClickhouseClient, table_name: &str) -> Inserter { + // TODO: make this configurable. + let send_timeout = Duration::from_secs(2); + let end_timeout = Duration::from_secs(3); + + client + .inserter::(table_name) + .with_period(Some(Duration::from_secs(4))) // Dump every 4s + .with_period_bias(0.1) // 4±(0.1*4) + .with_max_bytes(128 * 1024 * 1024) // 128MiB + .with_max_rows(65_536) + .with_timeouts(Some(send_timeout), Some(end_timeout)) +} + +/// A wrapper over a Clickhouse [`Inserter`] that supports a backup mechanism. +pub struct ClickhouseInserter { + /// The inner Clickhouse inserter client. + inner: Inserter, + /// A small in-memory backup of the current data we're trying to commit. In case this fails to + /// be inserted into Clickhouse, it is sent to the backup actor. + rows_backup: Vec, + /// The channel where to send data to be backed up. + backup_tx: mpsc::Sender>, + _metrics_phantom: std::marker::PhantomData, +} + +impl ClickhouseInserter { + pub fn new(inner: Inserter, backup_tx: mpsc::Sender>) -> Self { + let rows_backup = Vec::new(); + Self { + inner, + rows_backup, + backup_tx, + _metrics_phantom: std::marker::PhantomData, + } + } + + /// Writes the provided order into the inner Clickhouse writer buffer. + async fn write(&mut self, row: T) { + let hash = row.hash(); + let value_ref = ClickhouseRowExt::to_row_ref(&row); + + if let Err(e) = self.inner.write(value_ref).await { + MetricsType::increment_write_failures(e.to_string()); + tracing::error!(target: TARGET, order = T::ORDER, ?e, %hash, "failed to write to clickhouse inserter"); + return; + } + + // NOTE: we don't backup if writing failes. The reason is that if this fails, then the same + // writing to the backup inserter should fail. + self.rows_backup.push(row); + } + + /// Tries to commit to Clickhouse if the conditions are met. In case of failures, data is sent + /// to the backup actor for retries. + async fn commit(&mut self) { + let pending = self.inner.pending().clone().into(); // This is cheap to clone. + + let start = Instant::now(); + match self.inner.commit().await { + Ok(quantities) => { + if quantities == Quantities::ZERO.into() { + tracing::trace!(target: TARGET, order = T::ORDER, "committed to inserter"); + } else { + tracing::debug!(target: TARGET, order = T::ORDER, ?quantities, "inserted batch to clickhouse"); + MetricsType::process_quantities(&quantities.into()); + MetricsType::record_batch_commit_time(start.elapsed()); + // Clear the backup rows. + self.rows_backup.clear(); + } + } + Err(e) => { + MetricsType::increment_commit_failures(e.to_string()); + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit bundle to clickhouse"); + + let rows = std::mem::take(&mut self.rows_backup); + let failed_commit = FailedCommit::new(rows, pending); + + if let Err(e) = self.backup_tx.try_send(failed_commit) { + tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to send rows backup"); + } + } + } + } + + /// Ends the current `INSERT` and whole `Inserter` unconditionally. + pub async fn end(self) -> ClickhouseResult { + self.inner.end().await.map(Into::into) + } +} + +impl std::fmt::Debug for ClickhouseInserter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClickhouseInserter") + .field("inserter", &T::ORDER.to_string()) + .field("rows_backup_len", &self.rows_backup.len()) + .finish() + } +} + +/// A long-lived actor to run a [`ClickhouseIndexer`] until it possible to receive new order to +/// index. +pub struct InserterRunner { + /// The channel from which we can receive new orders to index. + rx: mpsc::Receiver, + /// The underlying Clickhouse inserter. + inserter: ClickhouseInserter, + /// The name of the local operator to use when adding data to clickhouse. + builder_name: String, +} + +impl std::fmt::Debug + for InserterRunner +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InserterRunner") + .field("inserter", &T::ORDER.to_string()) + .field("rx", &self.rx) + .finish() + } +} + +impl InserterRunner { + pub fn new( + rx: mpsc::Receiver, + inserter: ClickhouseInserter, + builder_name: String, + ) -> Self { + Self { + rx, + inserter, + builder_name, + } + } + + /// Run the inserter until it is possible to receive new orders. + pub async fn run_loop(&mut self) { + let mut sampler = Sampler::default() + .with_sample_size(self.rx.capacity() / 2) + .with_interval(Duration::from_secs(4)); + + while let Some(order) = self.rx.recv().await { + tracing::trace!(target: TARGET, order = T::ORDER, hash = %order.hash(), "received data to index"); + sampler.sample(|| { + MetricsType::set_queue_size(self.rx.len(), T::ORDER); + }); + + let row = order.to_row(self.builder_name.clone()); + self.inserter.write(row).await; + self.inserter.commit().await; + } + tracing::error!(target: TARGET, order = T::ORDER, "tx channel closed, indexer will stop running"); + } + + pub async fn end(self) -> ClickhouseResult { + self.inserter.end().await + } +} + +/// The configuration used in a [`ClickhouseClient`]. +#[derive(Debug, Clone)] +pub struct ClickhouseClientConfig { + pub host: String, + pub database: String, + pub username: String, + pub password: String, + pub validation: bool, +} + +impl From for ClickhouseClient { + fn from(config: ClickhouseClientConfig) -> Self { + ClickhouseClient::default() + .with_url(config.host) + .with_database(config.database) + .with_user(config.username) + .with_password(config.password) + .with_validation(config.validation) + } +} diff --git a/crates/rbuilder-utils/src/clickhouse/mod.rs b/crates/rbuilder-utils/src/clickhouse/mod.rs new file mode 100644 index 000000000..0176f1472 --- /dev/null +++ b/crates/rbuilder-utils/src/clickhouse/mod.rs @@ -0,0 +1,40 @@ +pub mod backup; +pub mod indexer; +use serde::{Deserialize, Serialize}; + +/// Equilalent of `clickhouse::inserter::Quantities` with more traits derived. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +pub struct Quantities { + pub bytes: u64, + pub rows: u64, + pub transactions: u64, +} + +impl Quantities { + /// Just zero quantities, nothing special. + pub const ZERO: Quantities = Quantities { + bytes: 0, + rows: 0, + transactions: 0, + }; +} + +impl From for Quantities { + fn from(value: clickhouse::inserter::Quantities) -> Self { + Self { + bytes: value.bytes, + rows: value.rows, + transactions: value.transactions, + } + } +} + +impl From for clickhouse::inserter::Quantities { + fn from(value: Quantities) -> Self { + Self { + bytes: value.bytes, + rows: value.rows, + transactions: value.transactions, + } + } +} diff --git a/crates/rbuilder-utils/src/format/mod.rs b/crates/rbuilder-utils/src/format/mod.rs new file mode 100644 index 000000000..99fb22a52 --- /dev/null +++ b/crates/rbuilder-utils/src/format/mod.rs @@ -0,0 +1,18 @@ +/// A trait for types that can be formatted as a human-readable size in bytes. +pub trait FormatBytes { + fn format_bytes(&self) -> String; +} + +impl FormatBytes for u64 { + fn format_bytes(&self) -> String { + if *self < 1024 { + format!("{}B", self) + } else if *self < 1024 * 1024 { + format!("{}KiB", self / 1024) + } else if *self < 1024 * 1024 * 1024 { + format!("{}MiB", self / 1024 / 1024) + } else { + format!("{}GiB", self / 1024 / 1024 / 1024) + } + } +} diff --git a/crates/rbuilder-utils/src/lib.rs b/crates/rbuilder-utils/src/lib.rs new file mode 100644 index 000000000..3de0c1df0 --- /dev/null +++ b/crates/rbuilder-utils/src/lib.rs @@ -0,0 +1,7 @@ +pub mod backoff; +pub mod clickhouse; +pub mod format; +pub mod metrics; +pub mod tasks { + pub use reth_tasks::*; +} diff --git a/crates/rbuilder-utils/src/metrics/mod.rs b/crates/rbuilder-utils/src/metrics/mod.rs new file mode 100644 index 000000000..b5c608de9 --- /dev/null +++ b/crates/rbuilder-utils/src/metrics/mod.rs @@ -0,0 +1,46 @@ +use std::time::{Duration, Instant}; + +/// A simple sampler that executes a closure every `sample_size` calls, or if a certain amount of +/// time has passed since last sampling call. +#[derive(Debug, Clone)] +pub struct Sampler { + sample_size: usize, + counter: usize, + start: Instant, + interval: Duration, +} + +impl Default for Sampler { + fn default() -> Self { + Self { + sample_size: 4096, + counter: 0, + start: Instant::now(), + interval: Duration::from_secs(10), + } + } +} + +impl Sampler { + pub fn with_sample_size(mut self, sample_size: usize) -> Self { + self.sample_size = sample_size; + self + } + + pub fn with_interval(mut self, interval: Duration) -> Self { + self.start = Instant::now() - interval; + self + } + + /// Call this function to potentially execute the sample closure if we have reached the sample + /// size, or enough time has passed. Otherwise, it increments the internal counter. + pub fn sample(&mut self, f: impl FnOnce()) { + if self.counter >= self.sample_size || self.start.elapsed() >= self.interval { + self.counter = 0; + self.start = Instant::now(); + f(); + } else { + self.counter += 1; + } + } +}