Skip to content

Commit aec3191

Browse files
committed
add benchmark utility to profile memory usage
1 parent 8674454 commit aec3191

File tree

8 files changed

+307
-2
lines changed

8 files changed

+307
-2
lines changed

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false }
4545
object_store = { workspace = true }
4646
parquet = { workspace = true, default-features = true }
4747
rand = { workspace = true }
48+
regex.workspace = true
4849
serde = { version = "1.0.219", features = ["derive"] }
4950
serde_json = { workspace = true }
5051
snmalloc-rs = { version = "0.3", optional = true }
@@ -53,5 +54,8 @@ test-utils = { path = "../test-utils/", version = "0.1.0" }
5354
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
5455
tokio-util = { version = "0.7.15" }
5556

57+
[target.'cfg(target_os = "linux")'.dependencies]
58+
procfs = "0.17.0"
59+
5660
[dev-dependencies]
5761
datafusion-proto = { workspace = true }

benchmarks/src/bin/mem_profile.rs

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! mem_profile binary entrypoint
19+
use datafusion::error::Result;
20+
use std::{
21+
io::{BufRead, BufReader},
22+
process::{Command, Stdio},
23+
};
24+
use structopt::StructOpt;
25+
26+
#[derive(Debug, StructOpt)]
27+
#[structopt(about = "memory profile command")]
28+
struct MemProfileOpt {
29+
#[structopt(subcommand)]
30+
command: BenchmarkCommand,
31+
}
32+
33+
#[derive(Debug, StructOpt)]
34+
enum BenchmarkCommand {
35+
Tpch(TpchOpt),
36+
// TODO Add other benchmark commands here
37+
}
38+
39+
#[derive(Debug, StructOpt)]
40+
struct TpchOpt {
41+
#[structopt(long, required = true)]
42+
path: String,
43+
44+
/// Query number. If not specified, runs all queries
45+
#[structopt(short, long)]
46+
query: Option<usize>,
47+
}
48+
49+
#[tokio::main]
50+
pub async fn main() -> Result<()> {
51+
// 1. parse args and check which benchmarks should be run
52+
let opt = MemProfileOpt::from_args();
53+
54+
// 2. prebuild test binary so that memory does not blow up due to build process
55+
// check binary file location
56+
println!("Pre-building benchmark binary...");
57+
let status = Command::new("cargo")
58+
.args(["build", "--profile", "release-nonlto", "--bin", "dfbench"])
59+
.status()
60+
.expect("Failed to build dfbench");
61+
62+
if !status.success() {
63+
panic!("Failed to build dfbench");
64+
}
65+
println!("Benchmark binary built successfully.");
66+
67+
// 3. create a subprocess, run each benchmark with args (1) (2)
68+
match opt.command {
69+
BenchmarkCommand::Tpch(tpch_opt) => {
70+
run_tpch_benchmark(tpch_opt).await?;
71+
}
72+
}
73+
74+
// (maybe we cannot support result file.. and just have to print..)
75+
Ok(())
76+
}
77+
78+
async fn run_tpch_benchmark(opt: TpchOpt) -> Result<()> {
79+
let mut args: Vec<String> = vec![
80+
"./target/release-nonlto/dfbench".to_string(),
81+
"tpch".to_string(),
82+
"--iterations".to_string(),
83+
"1".to_string(),
84+
"--path".to_string(),
85+
opt.path.clone(),
86+
"--format".to_string(),
87+
"parquet".to_string(),
88+
"--partitions".to_string(),
89+
"4".to_string(),
90+
"--memory-stat-enabled".to_string(),
91+
"--query".to_string(),
92+
];
93+
94+
let mut query_strings: Vec<String> = Vec::new();
95+
if let Some(query_id) = opt.query {
96+
query_strings.push(query_id.to_string());
97+
} else {
98+
// run all queries.
99+
for i in 1..=22 {
100+
query_strings.push(i.to_string());
101+
}
102+
}
103+
104+
let mut results = vec![];
105+
for query_str in query_strings {
106+
args.push(query_str);
107+
let _ = run_query(&args, &mut results);
108+
args.pop();
109+
}
110+
111+
print_summary_table(&results);
112+
Ok(())
113+
}
114+
115+
fn run_query(args: &[String], results: &mut Vec<QueryResult>) -> Result<()> {
116+
let exec_path = &args[0];
117+
let exec_args = &args[1..];
118+
119+
let mut child = Command::new(exec_path)
120+
.args(exec_args)
121+
.stdout(Stdio::piped())
122+
.spawn()
123+
.expect("Failed to start benchmark");
124+
125+
let stdout = child.stdout.take().unwrap();
126+
let reader = BufReader::new(stdout);
127+
128+
// buffer stdout
129+
let lines: Result<Vec<String>, std::io::Error> =
130+
reader.lines().collect::<Result<_, _>>();
131+
132+
child
133+
.wait()
134+
.expect("Benchmark process exited with an error");
135+
136+
// parse after child process terminates
137+
let lines = lines?;
138+
let mut iter = lines.iter().peekable();
139+
140+
while let Some(line) = iter.next() {
141+
if let Some((query, duration_ms)) = parse_query_time(line) {
142+
if let Some(next_line) = iter.peek() {
143+
if let Some((vmpeak, vmhwm, resident)) = parse_vm_line(next_line) {
144+
results.push(QueryResult {
145+
query,
146+
duration_ms,
147+
vmpeak,
148+
vmhwm,
149+
resident,
150+
});
151+
break;
152+
}
153+
}
154+
}
155+
}
156+
157+
Ok(())
158+
}
159+
160+
#[derive(Debug)]
161+
struct QueryResult {
162+
query: usize,
163+
duration_ms: f64,
164+
vmpeak: String,
165+
vmhwm: String,
166+
resident: String,
167+
}
168+
169+
fn parse_query_time(line: &str) -> Option<(usize, f64)> {
170+
let re = regex::Regex::new(r"Query (\d+) avg time: ([\d.]+) ms").unwrap();
171+
if let Some(caps) = re.captures(line) {
172+
let query_id = caps[1].parse::<usize>().ok()?;
173+
let avg_time = caps[2].parse::<f64>().ok()?;
174+
Some((query_id, avg_time))
175+
} else {
176+
None
177+
}
178+
}
179+
180+
fn parse_vm_line(line: &str) -> Option<(String, String, String)> {
181+
let re = regex::Regex::new(
182+
r"VmPeak:\s*([\d.]+\s*[A-Z]+),\s*VmHWM:\s*([\d.]+\s*[A-Z]+),\s*RSS:\s*([\d.]+\s*[A-Z]+)"
183+
).ok()?;
184+
let caps = re.captures(line)?;
185+
let vmpeak = caps.get(1)?.as_str().to_string();
186+
let vmhwm = caps.get(2)?.as_str().to_string();
187+
let resident = caps.get(3)?.as_str().to_string();
188+
Some((vmpeak, vmhwm, resident))
189+
}
190+
191+
// Print as simple aligned table
192+
fn print_summary_table(results: &[QueryResult]) {
193+
println!(
194+
"\n{:<8} {:>10} {:>12} {:>12} {:>12}",
195+
"Query", "Time (ms)", "VmPeak", "VmHWM", "RSS"
196+
);
197+
println!("{}", "-".repeat(68));
198+
199+
for r in results {
200+
println!(
201+
"{:<8} {:>10.2} {:>12} {:>12} {:>12}",
202+
r.query, r.duration_ms, r.vmpeak, r.vmhwm, r.resident
203+
);
204+
}
205+
}

benchmarks/src/clickbench.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::fs;
1919
use std::io::ErrorKind;
2020
use std::path::{Path, PathBuf};
2121

22-
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
22+
use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
2323
use datafusion::logical_expr::{ExplainFormat, ExplainOption};
2424
use datafusion::{
2525
error::{DataFusionError, Result},
@@ -192,6 +192,10 @@ impl RunOpt {
192192
}
193193
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
194194
println!("Query {query_id} avg time: {avg:.2} ms");
195+
196+
if self.common.memory_stat_enabled {
197+
print_memory_stats();
198+
}
195199
Ok(query_results)
196200
}
197201

benchmarks/src/tpch/run.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use super::{
2222
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
2323
};
24-
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
24+
use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
2525

2626
use arrow::record_batch::RecordBatch;
2727
use arrow::util::pretty::{self, pretty_format_batches};
@@ -184,6 +184,10 @@ impl RunOpt {
184184
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
185185
println!("Query {query_id} avg time: {avg:.2} ms");
186186

187+
if self.common.memory_stat_enabled {
188+
print_memory_stats();
189+
}
190+
187191
Ok(query_results)
188192
}
189193

benchmarks/src/util/memory.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::execution::memory_pool::human_readable_size;
19+
20+
#[derive(Debug)]
21+
pub struct MemoryStats {
22+
pub vm_rss_kb: Option<u64>,
23+
pub vm_hwm_kb: Option<u64>,
24+
pub vm_size_kb: Option<u64>,
25+
pub vm_peak_kb: Option<u64>,
26+
}
27+
28+
pub fn print_memory_stats() {
29+
#[cfg(target_os = "linux")]
30+
{
31+
use procfs::process::Process;
32+
33+
let pid = std::process::id();
34+
let process = Process::new(pid as i32).unwrap();
35+
let statm = process.statm().unwrap();
36+
let status = process.status().unwrap();
37+
let page_size = procfs::page_size();
38+
39+
let resident_bytes = (statm.resident * page_size) as usize;
40+
let vmpeak_bytes = status.vmpeak.map(|kb| (kb * 1024) as usize);
41+
let vmhwm_bytes = status.vmhwm.map(|kb| (kb * 1024) as usize);
42+
43+
println!(
44+
"VmPeak: {}, VmHWM: {}, RSS: {}",
45+
vmpeak_bytes
46+
.map(human_readable_size)
47+
.unwrap_or_else(|| "N/A".to_string()),
48+
vmhwm_bytes
49+
.map(human_readable_size)
50+
.unwrap_or_else(|| "N/A".to_string()),
51+
human_readable_size(resident_bytes)
52+
);
53+
}
54+
}

benchmarks/src/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
// under the License.
1717

1818
//! Shared benchmark utilities
19+
mod memory;
1920
mod options;
2021
mod run;
2122

23+
pub use memory::{print_memory_stats, MemoryStats};
2224
pub use options::CommonOpt;
2325
pub use run::{BenchQuery, BenchmarkRun, QueryResult};

benchmarks/src/util/options.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ pub struct CommonOpt {
6161
/// Activate debug mode to see more details
6262
#[structopt(short, long)]
6363
pub debug: bool,
64+
65+
/// Enable memory profiling to see VmPeak, VmHwm for running benchmark.
66+
/// See more details in TODO
67+
#[structopt(long)]
68+
pub memory_stat_enabled: bool,
6469
}
6570

6671
impl CommonOpt {

0 commit comments

Comments
 (0)