diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index 386bc98b9..3d53ce885 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -1042,7 +1042,12 @@ fn main_result() -> anyhow::Result { let compile_config = CompileBenchmarkConfig { benchmarks, - profiles: Profile::default_profiles(), + profiles: vec![ + Profile::Check, + Profile::Debug, + Profile::Doc, + Profile::Opt, + ], scenarios: Scenario::all(), backends, iterations: runs.map(|v| v as usize), diff --git a/collector/src/compile/benchmark/profile.rs b/collector/src/compile/benchmark/profile.rs index 409a3a401..0946905e0 100644 --- a/collector/src/compile/benchmark/profile.rs +++ b/collector/src/compile/benchmark/profile.rs @@ -34,11 +34,6 @@ impl Profile { ] } - /// Set of default profiles that should be benchmarked for a master/try artifact. - pub fn default_profiles() -> Vec { - vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt] - } - pub fn is_doc(&self) -> bool { match self { Profile::Doc | Profile::DocJson => true, diff --git a/collector/src/compile/benchmark/target.rs b/collector/src/compile/benchmark/target.rs index 7af2213b6..8c88d85a7 100644 --- a/collector/src/compile/benchmark/target.rs +++ b/collector/src/compile/benchmark/target.rs @@ -14,3 +14,15 @@ impl Default for Target { Self::X86_64UnknownLinuxGnu } } + +impl Target { + pub fn all() -> Vec { + vec![Self::X86_64UnknownLinuxGnu] + } + + pub fn from_db_target(target: &database::Target) -> Target { + match target { + database::Target::X86_64UnknownLinuxGnu => Self::X86_64UnknownLinuxGnu, + } + } +} diff --git a/database/src/lib.rs b/database/src/lib.rs index b171f47ee..fc2510d99 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -227,6 +227,11 @@ impl Profile { Profile::Clippy => "clippy", } } + + /// Set of default profiles that should be benchmarked for a master/try artifact. + pub fn default_profiles() -> Vec { + vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt] + } } impl std::str::FromStr for Profile { @@ -365,6 +370,10 @@ impl Target { Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu", } } + + pub fn all() -> Vec { + vec![Self::X86_64UnknownLinuxGnu] + } } impl FromStr for Target { @@ -988,6 +997,35 @@ impl BenchmarkRequest { pub fn is_release(&self) -> bool { matches!(self.commit_type, BenchmarkRequestType::Release { .. }) } + + /// Get the codegen backends for the request + pub fn backends(&self) -> anyhow::Result> { + // Empty string; default to LLVM. + if self.backends.trim().is_empty() { + return Ok(vec![CodegenBackend::Llvm]); + } + + self.backends + .split(',') + .map(|s| { + CodegenBackend::from_str(s).map_err(|_| anyhow::anyhow!("Invalid backend: {s}")) + }) + .collect() + } + + /// Get the profiles for the request + pub fn profiles(&self) -> anyhow::Result> { + // No profile string; fall back to the library defaults. + if self.profiles.trim().is_empty() { + return Ok(Profile::default_profiles()); + } + + self.profiles + .split(',') + .map(Profile::from_str) + .collect::, _>>() + .map_err(|e| anyhow::anyhow!("Invalid backend: {e}")) + } } /// Cached information about benchmark requests in the DB @@ -1010,3 +1048,64 @@ impl BenchmarkRequestIndex { &self.completed } } + +#[derive(Debug, Clone, PartialEq)] +pub enum BenchmarkJobStatus { + Queued, + InProgress { + started_at: DateTime, + }, + Completed { + started_at: DateTime, + completed_at: DateTime, + success: bool, + }, +} + +const BENCHMARK_JOB_STATUS_QUEUED_STR: &str = "queued"; +const BENCHMARK_JOB_STATUS_IN_PROGRESS_STR: &str = "in_progress"; +const BENCHMARK_JOB_STATUS_SUCCESS_STR: &str = "success"; +const BENCHMARK_JOB_STATUS_FAILURE_STR: &str = "failure"; + +impl BenchmarkJobStatus { + pub fn as_str(&self) -> &str { + match self { + BenchmarkJobStatus::Queued => BENCHMARK_JOB_STATUS_QUEUED_STR, + BenchmarkJobStatus::InProgress { .. } => BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, + BenchmarkJobStatus::Completed { success, .. } => { + if *success { + BENCHMARK_JOB_STATUS_SUCCESS_STR + } else { + BENCHMARK_JOB_STATUS_FAILURE_STR + } + } + } + } +} + +impl fmt::Display for BenchmarkJobStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkSet(u32); + +/// A single unit of work generated from a benchmark request. Split by profiles +/// and backends +/// +/// Each request is split into several `BenchmarkJob`s. Collectors poll the +/// queue and claim a job only when its `benchmark_set` matches one of the sets +/// they are responsible for. +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkJob { + target: Target, + backend: CodegenBackend, + profile: Profile, + request_tag: String, + benchmark_set: BenchmarkSet, + created_at: DateTime, + status: BenchmarkJobStatus, + retry: u32, +} diff --git a/database/src/pool.rs b/database/src/pool.rs index ac2fe31ac..dca4587c3 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -210,6 +210,16 @@ pub trait Connection: Send + Sync { sha: &str, parent_sha: &str, ) -> anyhow::Result<()>; + + /// Add a benchmark job to the job queue. + async fn enqueue_benchmark_job( + &self, + request_tag: &str, + target: &Target, + backend: &CodegenBackend, + profile: &Profile, + benchmark_set: u32, + ) -> anyhow::Result<()>; } #[async_trait::async_trait] @@ -584,4 +594,35 @@ mod tests { }) .await; } + + #[tokio::test] + async fn enqueue_benchmark_job() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + let benchmark_request = + BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time); + + // Insert the request so we don't violate the foreign key + db.insert_benchmark_request(&benchmark_request) + .await + .unwrap(); + + // Now we can insert the job + let result = db + .enqueue_benchmark_job( + benchmark_request.tag().unwrap(), + &Target::X86_64UnknownLinuxGnu, + &CodegenBackend::Llvm, + &Profile::Opt, + 0u32, + ) + .await; + assert!(result.is_ok()); + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 21c0d3717..0001285de 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,9 +1,9 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest, - BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, - CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, - Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, + ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJobStatus, + BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, + CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, + QueuedCommit, Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR, BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_TRY_STR, }; @@ -324,6 +324,42 @@ static MIGRATIONS: &[&str] = &[ CREATE UNIQUE INDEX collector_config_target_bench_active_uniq ON collector_config (target, benchmark_set, is_active) WHERE is_active = TRUE; "#, + r#" + CREATE TABLE IF NOT EXISTS job_queue ( + id SERIAL PRIMARY KEY, + request_tag TEXT NOT NULL, + target TEXT NOT NULL, + backend TEXT NOT NULL, + profile TEXT NOT NULL, + benchmark_set INTEGER NOT NULL, + collector_id INTEGER, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + status TEXT NOT NULL, + retry INTEGER DEFAULT 0, + + CONSTRAINT job_queue_request_fk + FOREIGN KEY (request_tag) + REFERENCES benchmark_request(tag) + ON DELETE CASCADE, + + CONSTRAINT job_queue_collector + FOREIGN KEY (collector_id) + REFERENCES collector_config(id) + ON DELETE CASCADE, + + CONSTRAINT job_queue_unique + UNIQUE ( + request_tag, + target, + backend, + profile, + benchmark_set + ) + ); + CREATE INDEX IF NOT EXISTS job_queue_request_tag_idx ON job_queue (request_tag); + "#, ]; #[async_trait::async_trait] @@ -1608,6 +1644,42 @@ where .collect(); Ok(requests) } + + async fn enqueue_benchmark_job( + &self, + request_tag: &str, + target: &Target, + backend: &CodegenBackend, + profile: &Profile, + benchmark_set: u32, + ) -> anyhow::Result<()> { + self.conn() + .execute( + r#" + INSERT INTO job_queue( + request_tag, + target, + backend, + profile, + benchmark_set, + status + ) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT DO NOTHING + "#, + &[ + &request_tag, + &target, + &backend, + &profile, + &(benchmark_set as i32), + &BenchmarkJobStatus::Queued, + ], + ) + .await + .context("failed to insert benchmark_job")?; + Ok(()) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { @@ -1653,6 +1725,10 @@ macro_rules! impl_to_postgresql_via_to_string { impl_to_postgresql_via_to_string!(BenchmarkRequestType); impl_to_postgresql_via_to_string!(BenchmarkRequestStatus); +impl_to_postgresql_via_to_string!(Target); +impl_to_postgresql_via_to_string!(CodegenBackend); +impl_to_postgresql_via_to_string!(Profile); +impl_to_postgresql_via_to_string!(BenchmarkJobStatus); #[cfg(test)] mod tests { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index ecc28484f..d3ac2cab6 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1296,6 +1296,17 @@ impl Connection for SqliteConnection { ) -> anyhow::Result<()> { no_queue_implementation_abort!() } + + async fn enqueue_benchmark_job( + &self, + _request_tag: &str, + _target: &Target, + _backend: &CodegenBackend, + _profile: &Profile, + _benchmark_set: u32, + ) -> anyhow::Result<()> { + no_queue_implementation_abort!() + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 43ac10986..d7cd44b11 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -5,7 +5,8 @@ use std::{str::FromStr, sync::Arc}; use crate::job_queue::utils::{parse_release_string, ExtractIf}; use crate::load::{partition_in_place, SiteCtxt}; use chrono::Utc; -use database::{BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus}; +use collector::benchmark_set::benchmark_set_count; +use database::{BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, Target}; use hashbrown::HashSet; use parking_lot::RwLock; use tokio::time::{self, Duration}; @@ -26,7 +27,7 @@ async fn create_benchmark_request_master_commits( ) -> anyhow::Result<()> { let master_commits = &ctxt.get_master_commits().commits; // TODO; delete at some point in the future - let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-06-01T00:00:00.000Z")?; + let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-07-24T00:00:00.000Z")?; for master_commit in master_commits { // We don't want to add masses of obsolete data @@ -38,6 +39,7 @@ async fn create_benchmark_request_master_commits( pr, master_commit.time, ); + log::info!("Inserting master benchmark request {benchmark:?}"); if let Err(error) = conn.insert_benchmark_request(&benchmark).await { log::error!("Failed to insert master benchmark request: {error:?}"); } @@ -69,6 +71,7 @@ async fn create_benchmark_request_releases( for (name, date_time) in releases { if date_time >= cutoff && !index.contains_tag(&name) { let release_request = BenchmarkRequest::create_release(&name, date_time); + log::info!("Inserting release benchmark request {release_request:?}"); if let Err(error) = conn.insert_benchmark_request(&release_request).await { log::error!("Failed to insert release benchmark request: {error}"); } @@ -103,9 +106,9 @@ fn sort_benchmark_requests(index: &BenchmarkRequestIndex, request_queue: &mut [B // just won't have a parent result available. if level_len == 0 { if cfg!(test) { - panic!("No commit is ready for benchmarking"); + panic!("No master/try commit is ready for benchmarking"); } else { - log::warn!("No commit is ready for benchmarking"); + log::warn!("No master/try commit is ready for benchmarking"); return; } } @@ -174,24 +177,105 @@ pub async fn build_queue( Ok(queue) } -/// Enqueue the job into the job_queue -async fn enqueue_next_job( - conn: &dyn database::pool::Connection, +/// Create all necessary jobs for the given benchmark request +/// and mark it as being in progress. +/// This is performed atomically, in a transaction. +pub async fn enqueue_benchmark_request( + conn: &mut dyn database::pool::Connection, + benchmark_request: &BenchmarkRequest, +) -> anyhow::Result<()> { + let mut tx = conn.transaction().await; + + let Some(request_tag) = benchmark_request.tag() else { + panic!("Benchmark request {benchmark_request:?} has no tag"); + }; + + log::info!("Enqueuing jobs for request {benchmark_request:?}"); + + let backends = benchmark_request.backends()?; + let profiles = benchmark_request.profiles()?; + + // Target x benchmark_set x backend x profile -> BenchmarkJob + for target in Target::all() { + for benchmark_set in 0..benchmark_set_count( + collector::compile::benchmark::target::Target::from_db_target(&target), + ) { + for backend in backends.iter() { + for profile in profiles.iter() { + tx.conn() + .enqueue_benchmark_job( + request_tag, + &target, + backend, + profile, + benchmark_set as u32, + ) + .await?; + // If there is a parent, we create a job for it too. The + // database will ignore it if there is already a job there. + // If the parent job has been deleted from the database + // but was already benchmarked then the collector will ignore + // it as it will see it already has results. + if let Some(parent_sha) = benchmark_request.parent_sha() { + tx.conn() + .enqueue_benchmark_job( + parent_sha, + &target, + backend, + profile, + benchmark_set as u32, + ) + .await?; + } + } + } + } + } + + tx.conn() + .update_benchmark_request_status(request_tag, BenchmarkRequestStatus::InProgress) + .await?; + tx.commit().await?; + Ok(()) +} + +/// Try to find a benchmark request that should be enqueue next, and if such request is found, +/// enqueue it. +async fn try_enqueue_next_benchmark_request( + conn: &mut dyn database::pool::Connection, index: &mut BenchmarkRequestIndex, ) -> anyhow::Result<()> { - let _queue = build_queue(conn, index).await?; + let queue = build_queue(conn, index).await?; + + #[allow(clippy::never_loop)] + for request in queue { + match request.status() { + BenchmarkRequestStatus::ArtifactsReady => { + enqueue_benchmark_request(conn, &request).await?; + break; + } + BenchmarkRequestStatus::InProgress => { + // TODO: Try to mark as completed + break; + } + BenchmarkRequestStatus::WaitingForArtifacts + | BenchmarkRequestStatus::Completed { .. } => { + unreachable!("Unexpected request {request:?} found in request queue"); + } + } + } Ok(()) } /// For queueing jobs, add the jobs you want to queue to this function async fn cron_enqueue_jobs(site_ctxt: &Arc) -> anyhow::Result<()> { - let conn = site_ctxt.conn().await; + let mut conn = site_ctxt.conn().await; let mut index = conn.load_benchmark_request_index().await?; // Put the master commits into the `benchmark_requests` queue create_benchmark_request_master_commits(site_ctxt, &*conn, &index).await?; // Put the releases into the `benchmark_requests` queue create_benchmark_request_releases(&*conn, &index).await?; - enqueue_next_job(&*conn, &mut index).await?; + try_enqueue_next_benchmark_request(&mut *conn, &mut index).await?; Ok(()) } @@ -208,8 +292,8 @@ pub async fn cron_main(site_ctxt: Arc>>>, seconds: u guard.as_ref().cloned() } { match cron_enqueue_jobs(&ctxt_clone).await { - Ok(_) => log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()), - Err(e) => log::error!("Cron job failed to execute {}", e), + Ok(_) => log::info!("Cron job finished"), + Err(e) => log::error!("Cron job failed to execute: {e:?}"), } } } diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index becbd1b27..6ccb450fd 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -86,6 +86,7 @@ async fn record_try_benchmark_request_without_artifacts( if run_new_queue() { let try_request = BenchmarkRequest::create_try_without_artifacts(pr, chrono::Utc::now(), backends, ""); + log::info!("Inserting try benchmark request {try_request:?}"); if let Err(e) = conn.insert_benchmark_request(&try_request).await { log::error!("Failed to insert try benchmark request: {}", e); }