Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ _default:

alias r := ready
alias qc := quick-commit
alias rg := reset-git

# Installs the tools needed to develop
install-tools:
Expand Down
17 changes: 0 additions & 17 deletions src/metrics/failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ const STREAM_FAILOVER_RECOVERED_TOTAL: &str = "stream_failover_recovered_total";
/// Metric name for recording the time spent in failover mode.
const STREAM_FAILOVER_DURATION_SECONDS: &str = "stream_failover_duration_seconds";

/// Metric name for tracking the age of the checkpoint event being retried during failover.
const STREAM_FAILOVER_CHECKPOINT_AGE_SECONDS: &str = "stream_failover_checkpoint_age_seconds";

/// Label key for stream identifier.
const STREAM_ID_LABEL: &str = "stream_id";

Expand All @@ -44,11 +41,6 @@ pub(crate) fn register_failover_metrics() {
Unit::Seconds,
"Time spent in failover mode before recovery"
);
describe_gauge!(
STREAM_FAILOVER_CHECKPOINT_AGE_SECONDS,
Unit::Seconds,
"Age of the checkpoint event currently being retried during failover"
);
}

/// Records that the stream has entered failover mode.
Expand Down Expand Up @@ -83,12 +75,3 @@ pub fn record_failover_recovered(stream_id: u64, duration_seconds: f64) {
)
.record(duration_seconds);
}

/// Records the age of the checkpoint event during failover.
pub fn record_checkpoint_age(stream_id: u64, age_seconds: f64) {
gauge!(
STREAM_FAILOVER_CHECKPOINT_AGE_SECONDS,
STREAM_ID_LABEL => stream_id.to_string()
)
.set(age_seconds);
}
43 changes: 34 additions & 9 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use etl::{
};
use futures::StreamExt;
use tokio::pin;
use tracing::info;

use crate::{
concurrency::TimeoutBatchStream,
Expand Down Expand Up @@ -91,8 +92,14 @@ where
None => return Ok(()),
};

let last_event_timestamp = events.last().map(|e| e.id.created_at);

let result = self.sink.publish_events(events).await;
if result.is_err() {
info!(
"Publishing events failed, entering failover at checkpoint event id: {:?}",
checkpoint_id
);
metrics::record_failover_entered(self.config.id);
self.store
.store_stream_status(StreamStatus::Failover {
Expand All @@ -102,6 +109,12 @@ where
return Ok(());
}

// Record processing lag based on the last event's timestamp
if let Some(timestamp) = last_event_timestamp {
let lag_seconds = (Utc::now() - timestamp).num_seconds() as f64;
metrics::record_processing_lag(self.config.id, lag_seconds);
}

Ok(())
}

Expand All @@ -112,10 +125,9 @@ where
// Schedule next run 24h after the SCHEDULED time, not execution time
let next_run = next_maintenance_at + chrono::Duration::hours(24);
self.store.store_next_maintenance_at(next_run).await?;
tracing::info!(
info!(
"Maintenance completed at {}, next scheduled: {}",
completed_at,
next_run
completed_at, next_run
);
}

Expand All @@ -125,7 +137,7 @@ where
let store = self.store.clone();

tokio::spawn(async move {
tracing::info!("Starting background maintenance task");
info!("Starting background maintenance task");
match run_maintenance(&store).await {
Ok(ts) => {
let _ = tx.send(ts);
Expand All @@ -139,7 +151,7 @@ where
});
} else {
// Task already running, skip
tracing::debug!("Maintenance already running, skipping");
info!("Maintenance already running, skipping");
}
}

Expand All @@ -155,10 +167,9 @@ where

let checkpoint_event = self.store.get_checkpoint_event(checkpoint_event_id).await?;

// Record checkpoint age
let checkpoint_age_seconds =
(Utc::now() - checkpoint_event.id.created_at).num_seconds() as f64;
metrics::record_checkpoint_age(self.config.id, checkpoint_age_seconds);
// Record processing lag for checkpoint event
let lag_seconds = (Utc::now() - checkpoint_event.id.created_at).num_seconds() as f64;
metrics::record_processing_lag(self.config.id, lag_seconds);

let result = self
.sink
Expand All @@ -169,6 +180,11 @@ where
return Ok(());
}

info!(
"Sink recovered, starting failover replay from checkpoint event id: {:?}",
checkpoint_event.id
);

let failover =
FailoverClient::connect(self.config.id, self.config.pg_connection.clone()).await?;
let table_schema = self.store.get_events_table_schema().await?;
Expand All @@ -187,10 +203,16 @@ where

let table_rows = table_rows.into_iter().collect::<Result<Vec<_>, _>>()?;
let events = convert_events_from_table_rows(table_rows, &table_schema.column_schemas)?;

let last_event_id = events.last().unwrap().id.clone();
let last_event_timestamp = events.last().unwrap().id.created_at;

self.sink.publish_events(events).await?;

// Record processing lag during failover replay
let lag_seconds = (Utc::now() - last_event_timestamp).num_seconds() as f64;
metrics::record_processing_lag(self.config.id, lag_seconds);

failover.update_checkpoint(&last_event_id).await?;
}

Expand All @@ -201,6 +223,9 @@ where
self.store
.store_stream_status(StreamStatus::Healthy)
.await?;

info!("Failover recovery completed");

Ok(())
}
}
Expand Down