Skip to content

Feat; Split request job into benchmark jobs #2207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion collector/src/bin/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,12 @@ fn main_result() -> anyhow::Result<i32> {

let compile_config = CompileBenchmarkConfig {
benchmarks,
profiles: Profile::default_profiles(),
profiles: vec![
Profile::Check,
Profile::Debug,
Profile::Doc,
Profile::Opt,
],
scenarios: Scenario::all(),
backends,
iterations: runs.map(|v| v as usize),
Expand Down
5 changes: 0 additions & 5 deletions collector/src/compile/benchmark/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ impl Profile {
]
}

/// Set of default profiles that should be benchmarked for a master/try artifact.
pub fn default_profiles() -> Vec<Self> {
vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt]
}

pub fn is_doc(&self) -> bool {
match self {
Profile::Doc | Profile::DocJson => true,
Expand Down
12 changes: 12 additions & 0 deletions collector/src/compile/benchmark/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ impl Default for Target {
Self::X86_64UnknownLinuxGnu
}
}

impl Target {
pub fn all() -> Vec<Self> {
vec![Self::X86_64UnknownLinuxGnu]
}

pub fn from_db_target(target: &database::Target) -> Target {
match target {
database::Target::X86_64UnknownLinuxGnu => Self::X86_64UnknownLinuxGnu,
}
}
}
99 changes: 99 additions & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ impl Profile {
Profile::Clippy => "clippy",
}
}

/// Set of default profiles that should be benchmarked for a master/try artifact.
pub fn default_profiles() -> Vec<Self> {
vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt]
}
}

impl std::str::FromStr for Profile {
Expand Down Expand Up @@ -365,6 +370,10 @@ impl Target {
Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu",
}
}

pub fn all() -> Vec<Self> {
vec![Self::X86_64UnknownLinuxGnu]
}
}

impl FromStr for Target {
Expand Down Expand Up @@ -988,6 +997,35 @@ impl BenchmarkRequest {
pub fn is_release(&self) -> bool {
matches!(self.commit_type, BenchmarkRequestType::Release { .. })
}

/// Get the codegen backends for the request
pub fn backends(&self) -> anyhow::Result<Vec<CodegenBackend>> {
// Empty string; default to LLVM.
if self.backends.trim().is_empty() {
return Ok(vec![CodegenBackend::Llvm]);
}

self.backends
.split(',')
.map(|s| {
CodegenBackend::from_str(s).map_err(|_| anyhow::anyhow!("Invalid backend: {s}"))
})
.collect()
}

/// Get the profiles for the request
pub fn profiles(&self) -> anyhow::Result<Vec<Profile>> {
// No profile string; fall back to the library defaults.
if self.profiles.trim().is_empty() {
return Ok(Profile::default_profiles());
}

self.profiles
.split(',')
.map(Profile::from_str)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| anyhow::anyhow!("Invalid backend: {e}"))
}
}

/// Cached information about benchmark requests in the DB
Expand All @@ -1010,3 +1048,64 @@ impl BenchmarkRequestIndex {
&self.completed
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum BenchmarkJobStatus {
Queued,
InProgress {
started_at: DateTime<Utc>,
},
Completed {
started_at: DateTime<Utc>,
completed_at: DateTime<Utc>,
success: bool,
},
}

const BENCHMARK_JOB_STATUS_QUEUED_STR: &str = "queued";
const BENCHMARK_JOB_STATUS_IN_PROGRESS_STR: &str = "in_progress";
const BENCHMARK_JOB_STATUS_SUCCESS_STR: &str = "success";
const BENCHMARK_JOB_STATUS_FAILURE_STR: &str = "failure";

impl BenchmarkJobStatus {
pub fn as_str(&self) -> &str {
match self {
BenchmarkJobStatus::Queued => BENCHMARK_JOB_STATUS_QUEUED_STR,
BenchmarkJobStatus::InProgress { .. } => BENCHMARK_JOB_STATUS_IN_PROGRESS_STR,
BenchmarkJobStatus::Completed { success, .. } => {
if *success {
BENCHMARK_JOB_STATUS_SUCCESS_STR
} else {
BENCHMARK_JOB_STATUS_FAILURE_STR
}
}
}
}
}

impl fmt::Display for BenchmarkJobStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkSet(u32);

/// A single unit of work generated from a benchmark request. Split by profiles
/// and backends
///
/// Each request is split into several `BenchmarkJob`s. Collectors poll the
/// queue and claim a job only when its `benchmark_set` matches one of the sets
/// they are responsible for.
#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkJob {
target: Target,
backend: CodegenBackend,
profile: Profile,
request_tag: String,
benchmark_set: BenchmarkSet,
created_at: DateTime<Utc>,
status: BenchmarkJobStatus,
retry: u32,
}
41 changes: 41 additions & 0 deletions database/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ pub trait Connection: Send + Sync {
sha: &str,
parent_sha: &str,
) -> anyhow::Result<()>;

/// Add a benchmark job to the job queue.
async fn enqueue_benchmark_job(
&self,
request_tag: &str,
target: &Target,
backend: &CodegenBackend,
profile: &Profile,
benchmark_set: u32,
) -> anyhow::Result<()>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -584,4 +594,35 @@ mod tests {
})
.await;
}

#[tokio::test]
async fn enqueue_benchmark_job() {
run_postgres_test(|ctx| async {
let db = ctx.db_client();
let db = db.connection().await;
let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap();
let benchmark_request =
BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time);

// Insert the request so we don't violate the foreign key
db.insert_benchmark_request(&benchmark_request)
.await
.unwrap();

// Now we can insert the job
let result = db
.enqueue_benchmark_job(
benchmark_request.tag().unwrap(),
&Target::X86_64UnknownLinuxGnu,
&CodegenBackend::Llvm,
&Profile::Opt,
0u32,
)
.await;
assert!(result.is_ok());

Ok(ctx)
})
.await;
}
}
84 changes: 80 additions & 4 deletions database/src/pool/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction};
use crate::{
ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest,
BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend,
CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit,
Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR,
ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJobStatus,
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType,
CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile,
QueuedCommit, Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR,
BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR,
BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_TRY_STR,
};
Expand Down Expand Up @@ -324,6 +324,42 @@ static MIGRATIONS: &[&str] = &[
CREATE UNIQUE INDEX collector_config_target_bench_active_uniq ON collector_config
(target, benchmark_set, is_active) WHERE is_active = TRUE;
"#,
r#"
CREATE TABLE IF NOT EXISTS job_queue (
id SERIAL PRIMARY KEY,
request_tag TEXT NOT NULL,
target TEXT NOT NULL,
backend TEXT NOT NULL,
profile TEXT NOT NULL,
benchmark_set INTEGER NOT NULL,
collector_id INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
status TEXT NOT NULL,
retry INTEGER DEFAULT 0,

CONSTRAINT job_queue_request_fk
FOREIGN KEY (request_tag)
REFERENCES benchmark_request(tag)
ON DELETE CASCADE,

CONSTRAINT job_queue_collector
FOREIGN KEY (collector_id)
REFERENCES collector_config(id)
ON DELETE CASCADE,

CONSTRAINT job_queue_unique
UNIQUE (
request_tag,
target,
backend,
profile,
benchmark_set
)
);
CREATE INDEX IF NOT EXISTS job_queue_request_tag_idx ON job_queue (request_tag);
"#,
];

#[async_trait::async_trait]
Expand Down Expand Up @@ -1608,6 +1644,42 @@ where
.collect();
Ok(requests)
}

async fn enqueue_benchmark_job(
&self,
request_tag: &str,
target: &Target,
backend: &CodegenBackend,
profile: &Profile,
benchmark_set: u32,
) -> anyhow::Result<()> {
self.conn()
.execute(
r#"
INSERT INTO job_queue(
request_tag,
target,
backend,
profile,
benchmark_set,
status
)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT DO NOTHING
"#,
&[
&request_tag,
&target,
&backend,
&profile,
&(benchmark_set as i32),
&BenchmarkJobStatus::Queued,
],
)
.await
.context("failed to insert benchmark_job")?;
Ok(())
}
}

fn parse_artifact_id(ty: &str, sha: &str, date: Option<DateTime<Utc>>) -> ArtifactId {
Expand Down Expand Up @@ -1653,6 +1725,10 @@ macro_rules! impl_to_postgresql_via_to_string {

impl_to_postgresql_via_to_string!(BenchmarkRequestType);
impl_to_postgresql_via_to_string!(BenchmarkRequestStatus);
impl_to_postgresql_via_to_string!(Target);
impl_to_postgresql_via_to_string!(CodegenBackend);
impl_to_postgresql_via_to_string!(Profile);
impl_to_postgresql_via_to_string!(BenchmarkJobStatus);

#[cfg(test)]
mod tests {
Expand Down
11 changes: 11 additions & 0 deletions database/src/pool/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,17 @@ impl Connection for SqliteConnection {
) -> anyhow::Result<()> {
no_queue_implementation_abort!()
}

async fn enqueue_benchmark_job(
&self,
_request_tag: &str,
_target: &Target,
_backend: &CodegenBackend,
_profile: &Profile,
_benchmark_set: u32,
) -> anyhow::Result<()> {
no_queue_implementation_abort!()
}
}

fn parse_artifact_id(ty: &str, sha: &str, date: Option<i64>) -> ArtifactId {
Expand Down
Loading
Loading