diff --git a/justfile b/justfile index 30d1aee..e9ffa21 100644 --- a/justfile +++ b/justfile @@ -3,6 +3,7 @@ _default: alias r := ready alias qc := quick-commit +alias rg := reset-git # Installs the tools needed to develop install-tools: diff --git a/src/metrics/failover.rs b/src/metrics/failover.rs index 9d30c8e..332405a 100644 --- a/src/metrics/failover.rs +++ b/src/metrics/failover.rs @@ -16,9 +16,6 @@ const STREAM_FAILOVER_RECOVERED_TOTAL: &str = "stream_failover_recovered_total"; /// Metric name for recording the time spent in failover mode. const STREAM_FAILOVER_DURATION_SECONDS: &str = "stream_failover_duration_seconds"; -/// Metric name for tracking the age of the checkpoint event being retried during failover. -const STREAM_FAILOVER_CHECKPOINT_AGE_SECONDS: &str = "stream_failover_checkpoint_age_seconds"; - /// Label key for stream identifier. const STREAM_ID_LABEL: &str = "stream_id"; @@ -44,11 +41,6 @@ pub(crate) fn register_failover_metrics() { Unit::Seconds, "Time spent in failover mode before recovery" ); - describe_gauge!( - STREAM_FAILOVER_CHECKPOINT_AGE_SECONDS, - Unit::Seconds, - "Age of the checkpoint event currently being retried during failover" - ); } /// Records that the stream has entered failover mode. @@ -83,12 +75,3 @@ pub fn record_failover_recovered(stream_id: u64, duration_seconds: f64) { ) .record(duration_seconds); } - -/// Records the age of the checkpoint event during failover. -pub fn record_checkpoint_age(stream_id: u64, age_seconds: f64) { - gauge!( - STREAM_FAILOVER_CHECKPOINT_AGE_SECONDS, - STREAM_ID_LABEL => stream_id.to_string() - ) - .set(age_seconds); -} diff --git a/src/stream.rs b/src/stream.rs index bc5366a..1a19b4c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -8,6 +8,7 @@ use etl::{ }; use futures::StreamExt; use tokio::pin; +use tracing::info; use crate::{ concurrency::TimeoutBatchStream, @@ -91,8 +92,14 @@ where None => return Ok(()), }; + let last_event_timestamp = events.last().map(|e| e.id.created_at); + let result = self.sink.publish_events(events).await; if result.is_err() { + info!( + "Publishing events failed, entering failover at checkpoint event id: {:?}", + checkpoint_id + ); metrics::record_failover_entered(self.config.id); self.store .store_stream_status(StreamStatus::Failover { @@ -102,6 +109,12 @@ where return Ok(()); } + // Record processing lag based on the last event's timestamp + if let Some(timestamp) = last_event_timestamp { + let lag_seconds = (Utc::now() - timestamp).num_seconds() as f64; + metrics::record_processing_lag(self.config.id, lag_seconds); + } + Ok(()) } @@ -112,10 +125,9 @@ where // Schedule next run 24h after the SCHEDULED time, not execution time let next_run = next_maintenance_at + chrono::Duration::hours(24); self.store.store_next_maintenance_at(next_run).await?; - tracing::info!( + info!( "Maintenance completed at {}, next scheduled: {}", - completed_at, - next_run + completed_at, next_run ); } @@ -125,7 +137,7 @@ where let store = self.store.clone(); tokio::spawn(async move { - tracing::info!("Starting background maintenance task"); + info!("Starting background maintenance task"); match run_maintenance(&store).await { Ok(ts) => { let _ = tx.send(ts); @@ -139,7 +151,7 @@ where }); } else { // Task already running, skip - tracing::debug!("Maintenance already running, skipping"); + info!("Maintenance already running, skipping"); } } @@ -155,10 +167,9 @@ where let checkpoint_event = self.store.get_checkpoint_event(checkpoint_event_id).await?; - // Record checkpoint age - let checkpoint_age_seconds = - (Utc::now() - checkpoint_event.id.created_at).num_seconds() as f64; - metrics::record_checkpoint_age(self.config.id, checkpoint_age_seconds); + // Record processing lag for checkpoint event + let lag_seconds = (Utc::now() - checkpoint_event.id.created_at).num_seconds() as f64; + metrics::record_processing_lag(self.config.id, lag_seconds); let result = self .sink @@ -169,6 +180,11 @@ where return Ok(()); } + info!( + "Sink recovered, starting failover replay from checkpoint event id: {:?}", + checkpoint_event.id + ); + let failover = FailoverClient::connect(self.config.id, self.config.pg_connection.clone()).await?; let table_schema = self.store.get_events_table_schema().await?; @@ -187,10 +203,16 @@ where let table_rows = table_rows.into_iter().collect::, _>>()?; let events = convert_events_from_table_rows(table_rows, &table_schema.column_schemas)?; + let last_event_id = events.last().unwrap().id.clone(); + let last_event_timestamp = events.last().unwrap().id.created_at; self.sink.publish_events(events).await?; + // Record processing lag during failover replay + let lag_seconds = (Utc::now() - last_event_timestamp).num_seconds() as f64; + metrics::record_processing_lag(self.config.id, lag_seconds); + failover.update_checkpoint(&last_event_id).await?; } @@ -201,6 +223,9 @@ where self.store .store_stream_status(StreamStatus::Healthy) .await?; + + info!("Failover recovery completed"); + Ok(()) } }