Skip to content

Commit 483444a

Browse files
committed
cherry pick nlj benchmark
1 parent 6fef6af commit 483444a

File tree

4 files changed

+245
-1
lines changed

4 files changed

+245
-1
lines changed

benchmarks/bench.sh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver
124124
125125
# Micro-Benchmarks (specific operators and features)
126126
cancellation: How long cancelling a query takes
127+
nlj: Benchmark for simple nested loop joins, testing various join scenarios
127128
128129
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
129130
Supported Configuration (Environment Variables)
@@ -196,6 +197,7 @@ main() {
196197
data_clickbench_1
197198
data_clickbench_partitioned
198199
data_imdb
200+
# nlj uses range() function, no data generation needed
199201
;;
200202
tpch)
201203
data_tpch "1"
@@ -298,6 +300,10 @@ main() {
298300
# same data as for tpch
299301
data_tpch "1"
300302
;;
303+
nlj)
304+
# nlj uses range() function, no data generation needed
305+
echo "NLJ benchmark does not require data generation"
306+
;;
301307
*)
302308
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
303309
usage
@@ -354,6 +360,7 @@ main() {
354360
run_h2o_join "BIG" "PARQUET" "join"
355361
run_imdb
356362
run_external_aggr
363+
run_nlj
357364
;;
358365
tpch)
359366
run_tpch "1" "parquet"
@@ -458,6 +465,9 @@ main() {
458465
topk_tpch)
459466
run_topk_tpch
460467
;;
468+
nlj)
469+
run_nlj
470+
;;
461471
*)
462472
echo "Error: unknown benchmark '$BENCHMARK' for run"
463473
usage
@@ -1085,6 +1095,14 @@ run_topk_tpch() {
10851095
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
10861096
}
10871097

1098+
# Runs the nlj benchmark
1099+
run_nlj() {
1100+
RESULTS_FILE="${RESULTS_DIR}/nlj.json"
1101+
echo "RESULTS_FILE: ${RESULTS_FILE}"
1102+
echo "Running nlj benchmark..."
1103+
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
1104+
}
1105+
10881106

10891107
compare_benchmarks() {
10901108
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"

benchmarks/src/bin/dfbench.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3333
#[global_allocator]
3434
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
3535

36-
use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, sort_tpch, tpch};
36+
use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, nlj, sort_tpch, tpch};
3737

3838
#[derive(Debug, StructOpt)]
3939
#[structopt(about = "benchmark command")]
@@ -42,6 +42,7 @@ enum Options {
4242
Clickbench(clickbench::RunOpt),
4343
H2o(h2o::RunOpt),
4444
Imdb(imdb::RunOpt),
45+
Nlj(nlj::RunOpt),
4546
SortTpch(sort_tpch::RunOpt),
4647
Tpch(tpch::RunOpt),
4748
TpchConvert(tpch::ConvertOpt),
@@ -57,6 +58,7 @@ pub async fn main() -> Result<()> {
5758
Options::Clickbench(opt) => opt.run().await,
5859
Options::H2o(opt) => opt.run().await,
5960
Options::Imdb(opt) => Box::pin(opt.run()).await,
61+
Options::Nlj(opt) => opt.run().await,
6062
Options::SortTpch(opt) => opt.run().await,
6163
Options::Tpch(opt) => Box::pin(opt.run()).await,
6264
Options::TpchConvert(opt) => opt.run().await,

benchmarks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod cancellation;
2020
pub mod clickbench;
2121
pub mod h2o;
2222
pub mod imdb;
23+
pub mod nlj;
2324
pub mod sort_tpch;
2425
pub mod tpch;
2526
pub mod util;

benchmarks/src/nlj.rs

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
19+
use datafusion::{error::Result, prelude::SessionContext};
20+
use datafusion_common::instant::Instant;
21+
use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError};
22+
use structopt::StructOpt;
23+
24+
/// Run the Nested Loop Join (NLJ) benchmark
25+
///
26+
/// This micro-benchmark focuses on the performance characteristics of NLJs.
27+
///
28+
/// It always tries to use fast scanners (without decoding overhead) and
29+
/// efficient predicate expressions to ensure it can reflect the performance
30+
/// of the NLJ operator itself.
31+
///
32+
/// In this micro-benchmark, the following workload characteristics will be
33+
/// varied:
34+
/// - Join type: Inner/Left/Right/Full (all for the NestedLoopJoin physical
35+
/// operator)
36+
/// TODO: Include special join types (Semi/Anti/Mark joins)
37+
/// - Input size: Different combinations of left (build) side and right (probe)
38+
/// side sizes
39+
/// - Selectivity of join filters
40+
#[derive(Debug, StructOpt, Clone)]
41+
#[structopt(verbatim_doc_comment)]
42+
pub struct RunOpt {
43+
/// Query number (between 1 and 10). If not specified, runs all queries
44+
#[structopt(short, long)]
45+
query: Option<usize>,
46+
47+
/// Common options
48+
#[structopt(flatten)]
49+
common: CommonOpt,
50+
51+
/// If present, write results json here
52+
#[structopt(parse(from_os_str), short = "o", long = "output")]
53+
output_path: Option<std::path::PathBuf>,
54+
}
55+
56+
/// Inline SQL queries for NLJ benchmarks
57+
///
58+
/// Each query's comment includes:
59+
/// - Left (build) side row count × Right (probe) side row count
60+
/// - Join predicate selectivity (1% means the output size is 1% * input size)
61+
const NLJ_QUERIES: &[&str] = &[
62+
// Q1: INNER 10K x 10K | LOW 0.1%
63+
r#"
64+
SELECT *
65+
FROM range(10000) AS t1
66+
JOIN range(10000) AS t2
67+
ON (t1.value + t2.value) % 1000 = 0;
68+
"#,
69+
// Q2: INNER 10K x 10K | Medium 20%
70+
r#"
71+
SELECT *
72+
FROM range(10000) AS t1
73+
JOIN range(10000) AS t2
74+
ON (t1.value + t2.value) % 5 = 0;
75+
"#,
76+
// Q3: INNER 10K x 10K | High 90%
77+
r#"
78+
SELECT *
79+
FROM range(10000) AS t1
80+
JOIN range(10000) AS t2
81+
ON (t1.value + t2.value) % 10 <> 0;
82+
"#,
83+
// Q4: INNER 30K x 30K | Medium 20%
84+
r#"
85+
SELECT *
86+
FROM range(30000) AS t1
87+
JOIN range(30000) AS t2
88+
ON (t1.value + t2.value) % 5 = 0;
89+
"#,
90+
// Q5: INNER 10K x 200K | LOW 0.1% (small to large)
91+
r#"
92+
SELECT *
93+
FROM range(10000) AS t1
94+
JOIN range(200000) AS t2
95+
ON (t1.value + t2.value) % 1000 = 0;
96+
"#,
97+
// Q6: INNER 200K x 10K | LOW 0.1% (large to small)
98+
r#"
99+
SELECT *
100+
FROM range(200000) AS t1
101+
JOIN range(10000) AS t2
102+
ON (t1.value + t2.value) % 1000 = 0;
103+
"#,
104+
// Q7: RIGHT OUTER 10K x 200K | LOW 0.1%
105+
r#"
106+
SELECT *
107+
FROM range(10000) AS t1
108+
RIGHT JOIN range(200000) AS t2
109+
ON (t1.value + t2.value) % 1000 = 0;
110+
"#,
111+
// Q8: LEFT OUTER 200K x 10K | LOW 0.1%
112+
r#"
113+
SELECT *
114+
FROM range(200000) AS t1
115+
LEFT JOIN range(10000) AS t2
116+
ON (t1.value + t2.value) % 1000 = 0;
117+
"#,
118+
// Q9: FULL OUTER 30K x 30K | LOW 0.1%
119+
r#"
120+
SELECT *
121+
FROM range(30000) AS t1
122+
FULL JOIN range(30000) AS t2
123+
ON (t1.value + t2.value) % 1000 = 0;
124+
"#,
125+
// Q10: FULL OUTER 30K x 30K | High 90%
126+
r#"
127+
SELECT *
128+
FROM range(30000) AS t1
129+
FULL JOIN range(30000) AS t2
130+
ON (t1.value + t2.value) % 10 <> 0;
131+
"#,
132+
];
133+
134+
impl RunOpt {
135+
pub async fn run(self) -> Result<()> {
136+
println!("Running NLJ benchmarks with the following options: {self:#?}\n");
137+
138+
// Define query range
139+
let query_range = match self.query {
140+
Some(query_id) => {
141+
if query_id >= 1 && query_id <= NLJ_QUERIES.len() {
142+
query_id..=query_id
143+
} else {
144+
return exec_err!(
145+
"Query {query_id} not found. Available queries: 1 to {}",
146+
NLJ_QUERIES.len()
147+
);
148+
}
149+
}
150+
None => 1..=NLJ_QUERIES.len(),
151+
};
152+
153+
let config = self.common.config()?;
154+
let rt_builder = self.common.runtime_env_builder()?;
155+
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
156+
157+
let mut benchmark_run = BenchmarkRun::new();
158+
for query_id in query_range {
159+
let query_index = query_id - 1; // Convert 1-based to 0-based index
160+
161+
let sql = NLJ_QUERIES[query_index];
162+
benchmark_run.start_new_case(&format!("Query {query_id}"));
163+
let query_run = self.benchmark_query(sql, &query_id.to_string(), &ctx).await;
164+
match query_run {
165+
Ok(query_results) => {
166+
for iter in query_results {
167+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
168+
}
169+
}
170+
Err(e) => {
171+
return Err(DataFusionError::Context(
172+
"NLJ benchmark Q{query_id} failed with error:".to_string(),
173+
Box::new(e),
174+
));
175+
}
176+
}
177+
}
178+
179+
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
180+
Ok(())
181+
}
182+
183+
/// Validates that the query's physical plan uses a NestedLoopJoin (NLJ),
184+
/// then executes the query and collects execution times.
185+
///
186+
/// TODO: ensure the optimizer won't change the join order (it's not at
187+
/// v48.0.0).
188+
async fn benchmark_query(
189+
&self,
190+
sql: &str,
191+
query_name: &str,
192+
ctx: &SessionContext,
193+
) -> Result<Vec<QueryResult>> {
194+
let mut query_results = vec![];
195+
196+
// Validate that the query plan includes a Nested Loop Join
197+
let df = ctx.sql(sql).await?;
198+
let physical_plan = df.create_physical_plan().await?;
199+
let plan_string = format!("{physical_plan:#?}");
200+
201+
if !plan_string.contains("NestedLoopJoinExec") {
202+
return Err(exec_datafusion_err!(
203+
"Query {query_name} does not use Nested Loop Join. Physical plan: {plan_string}"
204+
));
205+
}
206+
207+
for i in 0..self.common.iterations {
208+
let start = Instant::now();
209+
let df = ctx.sql(sql).await?;
210+
let batches = df.collect().await?;
211+
let elapsed = start.elapsed();
212+
213+
let row_count = batches.iter().map(|b| b.num_rows()).sum();
214+
println!(
215+
"Query {query_name} iteration {i} returned {row_count} rows in {elapsed:?}"
216+
);
217+
218+
query_results.push(QueryResult { elapsed, row_count });
219+
}
220+
221+
Ok(query_results)
222+
}
223+
}

0 commit comments

Comments
 (0)