Skip to content

Commit 67cadfa

Browse files
committed
add e2e test & comments
1 parent a233c8b commit 67cadfa

File tree

3 files changed

+136
-8
lines changed

3 files changed

+136
-8
lines changed

benchmarks/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,15 +328,17 @@ The `mem_profile` program wraps benchmark execution to measure memory usage stat
328328
Subcommands supported by mem_profile are the subset of those in `dfbench`.
329329
Currently supported benchmarks include: Clickbench, H2o, Imdb, SortTpch, Tpch
330330

331-
Before running benchmarks, `mem_profile` automatically compiles the benchmark binary (`dfbench`) using `cargo build` with the same cargo profile (e.g., --release) as mem_profile itself. By prebuilding the binary and running each query in a separate process, we can ensure accurate memory statistics.
331+
Before running benchmarks, `mem_profile` automatically compiles the benchmark binary (`dfbench`) using `cargo build`. Note that the build profile used for `dfbench` is not tied to the profile used for running `mem_profile` itself. We can explicitly specify the desired build profile using the `--bench-profile` option (e.g. release-nonlto). By prebuilding the binary and running each query in a separate process, we can ensure accurate memory statistics.
332332

333333
Currently, `mem_profile` only supports `mimalloc` as the memory allocator, since it relies on `mimalloc`'s API to collect memory statistics.
334334

335335
Because it runs the compiled binary directly from the target directory, make sure your working directory is the top-level datafusion/ directory, where the target/ is also located.
336336

337+
The benchmark subcommand (e.g., `tpch`) and all following arguments are passed directly to `dfbench`. Be sure to specify `--bench-profile` before the benchmark subcommand.
338+
337339
Example:
338340
```shell
339-
datafusion$ cargo run --profile release-nonlto --bin mem_profile -- tpch --path benchmarks/data/tpch_sf1 --partitions 4 --format parquet
341+
datafusion$ cargo run --profile release-nonlto --bin mem_profile -- --bench-profile release-nonlto tpch --path benchmarks/data/tpch_sf1 --partitions 4 --format parquet
340342
```
341343
Example Output:
342344
```

benchmarks/src/bin/mem_profile.rs

Lines changed: 129 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use datafusion::error::Result;
2020
use std::{
2121
env,
2222
io::{BufRead, BufReader},
23+
path::Path,
2324
process::{Command, Stdio},
2425
};
2526
use structopt::StructOpt;
@@ -31,7 +32,18 @@ use datafusion_benchmarks::{
3132
};
3233

3334
#[derive(Debug, StructOpt)]
34-
#[structopt(about = "benchmark command")]
35+
#[structopt(name = "Memory Profiling Utility")]
36+
struct MemProfileOpt {
37+
/// Cargo profile to use in dfbench (e.g. release, release-nonlto)
38+
#[structopt(long, default_value = "release")]
39+
bench_profile: String,
40+
41+
#[structopt(subcommand)]
42+
command: Options,
43+
}
44+
45+
#[derive(Debug, StructOpt)]
46+
#[structopt(about = "Benchmark command")]
3547
enum Options {
3648
Clickbench(clickbench::RunOpt),
3749
H2o(h2o::RunOpt),
@@ -43,9 +55,9 @@ enum Options {
4355
#[tokio::main]
4456
pub async fn main() -> Result<()> {
4557
// 1. Parse args and check which benchmarks should be run
46-
let profile = env::var("PROFILE").unwrap_or_else(|_| "release".to_string());
47-
let args = env::args().skip(1);
48-
let query_range = match Options::from_args() {
58+
let mem_profile_opt = MemProfileOpt::from_args();
59+
let profile = mem_profile_opt.bench_profile;
60+
let query_range = match mem_profile_opt.command {
4961
Options::Clickbench(opt) => {
5062
let entries = std::fs::read_dir(&opt.queries_path)?
5163
.filter_map(Result::ok)
@@ -102,7 +114,18 @@ pub async fn main() -> Result<()> {
102114
println!("Benchmark binary built successfully.");
103115

104116
// 3. Create a new process per each benchmark query and print summary
105-
let mut dfbench_args: Vec<String> = args.collect();
117+
// Find position of subcommand to collect args for dfbench
118+
let args: Vec<_> = env::args().collect();
119+
let subcommands = ["tpch", "clickbench", "h2o", "imdb", "sort-tpch"];
120+
let sub_pos = args
121+
.iter()
122+
.position(|s| subcommands.iter().any(|&cmd| s == cmd))
123+
.expect("No benchmark subcommand found");
124+
125+
// Args starting from subcommand become dfbench args
126+
let mut dfbench_args: Vec<String> =
127+
args[sub_pos..].iter().map(|s| s.to_string()).collect();
128+
106129
run_benchmark_as_child_process(&profile, query_range, &mut dfbench_args)?;
107130

108131
Ok(())
@@ -118,7 +141,15 @@ fn run_benchmark_as_child_process(
118141
query_strings.push(i.to_string());
119142
}
120143

121-
let command = format!("target/{profile}/dfbench");
144+
let target_dir =
145+
env::var("CARGO_TARGET_DIR").unwrap_or_else(|_| "target".to_string());
146+
let command = format!("{target_dir}/{profile}/dfbench");
147+
// Check whether benchmark binary exists
148+
if !Path::new(&command).exists() {
149+
panic!(
150+
"Benchmark binary not found: `{command}`\nRun this command from the top-level `datafusion/` directory so `target/{profile}/dfbench` can be found.",
151+
);
152+
}
122153
args.insert(0, command);
123154
let mut results = vec![];
124155

@@ -235,3 +266,95 @@ fn print_summary_table(results: &[QueryResult]) {
235266
);
236267
}
237268
}
269+
270+
#[cfg(test)]
271+
// Only run with "ci" mode when we have the data
272+
#[cfg(feature = "ci")]
273+
mod tests {
274+
use datafusion::common::exec_err;
275+
use datafusion::error::Result;
276+
use std::path::{Path, PathBuf};
277+
use std::process::Command;
278+
279+
fn get_tpch_data_path() -> Result<String> {
280+
let path =
281+
std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
282+
if !Path::new(&path).exists() {
283+
return exec_err!(
284+
"Benchmark data not found (set TPCH_DATA env var to override): {}",
285+
path
286+
);
287+
}
288+
Ok(path)
289+
}
290+
291+
// Try to find target/ dir upward
292+
fn find_target_dir(start: &Path) -> Option<PathBuf> {
293+
let mut dir = start;
294+
295+
while let Some(current) = Some(dir) {
296+
if current.join("target").is_dir() {
297+
return Some(current.join("target"));
298+
}
299+
300+
dir = match current.parent() {
301+
Some(parent) => parent,
302+
None => break,
303+
};
304+
}
305+
306+
None
307+
}
308+
309+
#[test]
310+
// This test checks whether `mem_profile` runs successfully and produces expected output
311+
// using TPC-H query 6 (which runs quickly).
312+
fn mem_profile_e2e_tpch_q6() -> Result<()> {
313+
let profile = "ci";
314+
let tpch_data = get_tpch_data_path()?;
315+
316+
// The current working directory may not be the top-level datafusion/ directory,
317+
// so we manually walkdir upward, locate the target directory
318+
// and set it explicitly via CARGO_TARGET_DIR for the mem_profile command.
319+
let target_dir = find_target_dir(&std::env::current_dir()?);
320+
let output = Command::new("cargo")
321+
.env("CARGO_TARGET_DIR", target_dir.unwrap())
322+
.args([
323+
"run",
324+
"--profile",
325+
profile,
326+
"--bin",
327+
"mem_profile",
328+
"--",
329+
"--bench-profile",
330+
profile,
331+
"tpch",
332+
"--query",
333+
"6",
334+
"--path",
335+
&tpch_data,
336+
"--format",
337+
"tbl",
338+
])
339+
.output()
340+
.expect("Failed to run mem_profile");
341+
342+
let stdout = String::from_utf8_lossy(&output.stdout);
343+
let stderr = String::from_utf8_lossy(&output.stderr);
344+
345+
if !output.status.success() {
346+
panic!(
347+
"mem_profile failed\nstdout:\n{stdout}\nstderr:\n{stderr}---------------------",
348+
);
349+
}
350+
351+
assert!(
352+
stdout.contains("Peak RSS")
353+
&& stdout.contains("Query")
354+
&& stdout.contains("Time"),
355+
"Unexpected output:\n{stdout}",
356+
);
357+
358+
Ok(())
359+
}
360+
}

benchmarks/src/util/memory.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub fn print_memory_stats() {
3636
);
3737
}
3838

39+
// When modifying this output format, make sure to update the corresponding
40+
// parsers in `mem_profile.rs`, specifically `parse_vm_line` and `parse_query_time`,
41+
// to keep the log output and parser logic in sync.
3942
println!(
4043
"Peak RSS: {}, Peak Commit: {}, Page Faults: {}",
4144
if peak_rss == 0 {

0 commit comments

Comments
 (0)