From 90c96f491dd0249d3e91bd78f488fd0f7c5414a5 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 18 Jul 2025 16:38:58 +0800 Subject: [PATCH 1/7] Micro-benchmark for NLJ --- benchmarks/bench.sh | 18 +++ benchmarks/src/bin/dfbench.rs | 4 +- benchmarks/src/lib.rs | 1 + benchmarks/src/nlj.rs | 244 ++++++++++++++++++++++++++++++++++ 4 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 benchmarks/src/nlj.rs diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 7339aba78f20..10b55ce0bbd4 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -115,6 +115,7 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver # Micro-Benchmarks (specific operators and features) cancellation: How long cancelling a query takes +nlj: Benchmark for simple nested loop joins, testing various join scenarios ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Supported Configuration (Environment Variables) @@ -187,6 +188,7 @@ main() { data_clickbench_1 data_clickbench_partitioned data_imdb + # nlj uses range() function, no data generation needed ;; tpch) data_tpch "1" @@ -261,6 +263,10 @@ main() { # same data as for tpch data_tpch "1" ;; + nlj) + # nlj uses range() function, no data generation needed + echo "NLJ benchmark does not require data generation" + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -317,6 +323,7 @@ main() { run_h2o_join "BIG" "PARQUET" "join" run_imdb run_external_aggr + run_nlj ;; tpch) run_tpch "1" "parquet" @@ -393,6 +400,9 @@ main() { topk_tpch) run_topk_tpch ;; + nlj) + run_nlj + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -1020,6 +1030,14 @@ run_topk_tpch() { $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} } +# Runs the nlj benchmark +run_nlj() { + RESULTS_FILE="${RESULTS_DIR}/nlj.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running nlj benchmark..." + debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} +} + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index e92fd115c7d8..88378492b726 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, sort_tpch, tpch}; +use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, nlj, sort_tpch, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -42,6 +42,7 @@ enum Options { Clickbench(clickbench::RunOpt), H2o(h2o::RunOpt), Imdb(imdb::RunOpt), + Nlj(nlj::RunOpt), SortTpch(sort_tpch::RunOpt), Tpch(tpch::RunOpt), TpchConvert(tpch::ConvertOpt), @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> { Options::Clickbench(opt) => opt.run().await, Options::H2o(opt) => opt.run().await, Options::Imdb(opt) => Box::pin(opt.run()).await, + Options::Nlj(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, Options::Tpch(opt) => Box::pin(opt.run()).await, Options::TpchConvert(opt) => opt.run().await, diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index e7657c4078d1..5d982fad6f77 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,6 +20,7 @@ pub mod cancellation; pub mod clickbench; pub mod h2o; pub mod imdb; +pub mod nlj; pub mod sort_tpch; pub mod tpch; pub mod util; diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs new file mode 100644 index 000000000000..6cfa062a1643 --- /dev/null +++ b/benchmarks/src/nlj.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; +use datafusion::{error::Result, prelude::SessionContext}; +use datafusion_common::exec_datafusion_err; +use datafusion_common::instant::Instant; +use structopt::StructOpt; + +/// Run the Nested Loop Join (NLJ) benchmark +/// +/// This micro-benchmark focuses on the performance characteristics of NLJs. +/// +/// It always tries to use fast scanners (without decoding overhead) and +/// efficient predicate expressions to ensure it can reflect the performance +/// of the NLJ operator itself. +/// +/// In this micro-benchmark, the following workload characteristics will be +/// varied: +/// - Join type: Inner/Left/Right/Full (all for the NestedLoopJoin physical +/// operator) +/// TODO: Include special join types (Semi/Anti/Mark joins) +/// - Input size: Different combinations of left (build) side and right (probe) +/// side sizes +/// - Selectivity of join filters +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + /// Query number (between 1 and 10). If not specified, runs all queries + #[structopt(short, long)] + query_name: Option, + + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} + +/// Inline SQL queries for NLJ benchmarks +/// +/// Each query's comment includes: +/// - Left (build) side row count × Right (probe) side row count +/// - Join predicate selectivity (1% means the output size is 1% * input size) +const NLJ_QUERIES: &[&str] = &[ + // Q1: INNER 10K x 10K | LOW 0.1% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q2: INNER 10K x 10K | Medium 20% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 5 = 0; + "#, + // Q3: INNER 10K x 10K | High 90% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 10 <> 0; + "#, + // Q4: INNER 30K x 30K | Medium 20% + r#" + SELECT * + FROM range(30000) AS t1 + JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 5 = 0; + "#, + // Q5: INNER 10K x 200K | LOW 0.1% (small to large) + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(200000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q6: INNER 200K x 10K | LOW 0.1% (large to small) + r#" + SELECT * + FROM range(200000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q7: RIGHT OUTER 10K x 200K | LOW 0.1% + r#" + SELECT * + FROM range(10000) AS t1 + RIGHT JOIN range(200000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q8: LEFT OUTER 200K x 10K | LOW 0.1% + r#" + SELECT * + FROM range(200000) AS t1 + LEFT JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q9: FULL OUTER 30K x 30K | LOW 0.1% + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q10: FULL OUTER 30K x 30K | High 90% + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 10 <> 0; + "#, +]; + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running NLJ benchmarks with the following options: {self:#?}\n"); + + // Define available queries + let available_queries = + vec!["q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10"]; + let query_list = match &self.query_name { + Some(query_name) => { + if available_queries.contains(&query_name.as_str()) { + vec![query_name.as_str()] + } else { + return Err(exec_datafusion_err!( + "Query '{}' not found. Available queries: {:?}", + query_name, + available_queries + )); + } + } + None => available_queries, + }; + + let config = self.common.config()?; + let rt_builder = self.common.runtime_env_builder()?; + let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + + let mut benchmark_run = BenchmarkRun::new(); + for query_name in query_list { + let query_index = match query_name { + "q1" => 0, + "q2" => 1, + "q3" => 2, + "q4" => 3, + "q5" => 4, + "q6" => 5, + "q7" => 6, + "q8" => 7, + "q9" => 8, + "q10" => 9, + _ => { + if self.query_name.is_some() { + return Err(exec_datafusion_err!( + "Could not find query '{}'.", + query_name + )); + } + continue; + } + }; + + let sql = NLJ_QUERIES[query_index]; + benchmark_run.start_new_case(&format!("Query {query_name}")); + let query_run = self.benchmark_query(sql, query_name, &ctx).await; + match query_run { + Ok(query_results) => { + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + Err(e) => { + eprintln!("Query {query_name} failed: {e}"); + benchmark_run.write_iter(std::time::Duration::from_secs(0), 0); + } + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + Ok(()) + } + + /// Validates that the query's physical plan uses a NestedLoopJoin (NLJ), + /// then executes the query and collects execution times. + /// + /// TODO: ensure the optimizer won't change the join order (it's not at + /// v48.0.0). + async fn benchmark_query( + &self, + sql: &str, + query_name: &str, + ctx: &SessionContext, + ) -> Result> { + let mut query_results = vec![]; + + // Validate that the query plan includes a Nested Loop Join + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let plan_string = format!("{:#?}", physical_plan); + + if !plan_string.contains("NestedLoopJoinExec") { + return Err(exec_datafusion_err!( + "Query {query_name} does not use Nested Loop Join. Physical plan: {plan_string}" + )); + } + + for i in 0..self.common.iterations { + let start = Instant::now(); + let df = ctx.sql(sql).await?; + let batches = df.collect().await?; + let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; + + let row_count = batches.iter().map(|b| b.num_rows()).sum(); + println!( + "Query {query_name} iteration {i} returned {row_count} rows in {elapsed:?}" + ); + + query_results.push(QueryResult { elapsed, row_count }); + } + + Ok(query_results) + } +} From 305826cf4b71a5774d2056b9b1ceaa0f9e49e8b0 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 18 Jul 2025 19:37:26 +0800 Subject: [PATCH 2/7] clippy --- benchmarks/src/nlj.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index 6cfa062a1643..9589749d9dc5 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -217,7 +217,7 @@ impl RunOpt { // Validate that the query plan includes a Nested Loop Join let df = ctx.sql(sql).await?; let physical_plan = df.create_physical_plan().await?; - let plan_string = format!("{:#?}", physical_plan); + let plan_string = format!("{physical_plan:#?}"); if !plan_string.contains("NestedLoopJoinExec") { return Err(exec_datafusion_err!( From a3f5d05c77713bf99ee2f865942a9f8e4ab92903 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 19 Jul 2025 11:41:11 +0800 Subject: [PATCH 3/7] review --- benchmarks/src/nlj.rs | 60 +++++++++++++++---------------------------- 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index 9589749d9dc5..39b517c32c84 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -17,8 +17,8 @@ use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; use datafusion::{error::Result, prelude::SessionContext}; -use datafusion_common::exec_datafusion_err; use datafusion_common::instant::Instant; +use datafusion_common::{exec_datafusion_err, DataFusionError}; use structopt::StructOpt; /// Run the Nested Loop Join (NLJ) benchmark @@ -42,7 +42,7 @@ use structopt::StructOpt; pub struct RunOpt { /// Query number (between 1 and 10). If not specified, runs all queries #[structopt(short, long)] - query_name: Option, + query: Option, /// Common options #[structopt(flatten)] @@ -135,22 +135,20 @@ impl RunOpt { pub async fn run(self) -> Result<()> { println!("Running NLJ benchmarks with the following options: {self:#?}\n"); - // Define available queries - let available_queries = - vec!["q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10"]; - let query_list = match &self.query_name { - Some(query_name) => { - if available_queries.contains(&query_name.as_str()) { - vec![query_name.as_str()] + // Define query range + let query_range = match self.query { + Some(query_id) => { + if query_id >= 1 && query_id <= NLJ_QUERIES.len() { + query_id..=query_id } else { return Err(exec_datafusion_err!( - "Query '{}' not found. Available queries: {:?}", - query_name, - available_queries + "Query {} not found. Available queries: 1 to {}", + query_id, + NLJ_QUERIES.len() )); } } - None => available_queries, + None => 1..=NLJ_QUERIES.len(), }; let config = self.common.config()?; @@ -158,32 +156,12 @@ impl RunOpt { let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); let mut benchmark_run = BenchmarkRun::new(); - for query_name in query_list { - let query_index = match query_name { - "q1" => 0, - "q2" => 1, - "q3" => 2, - "q4" => 3, - "q5" => 4, - "q6" => 5, - "q7" => 6, - "q8" => 7, - "q9" => 8, - "q10" => 9, - _ => { - if self.query_name.is_some() { - return Err(exec_datafusion_err!( - "Could not find query '{}'.", - query_name - )); - } - continue; - } - }; + for query_id in query_range { + let query_index = query_id - 1; // Convert 1-based to 0-based index let sql = NLJ_QUERIES[query_index]; - benchmark_run.start_new_case(&format!("Query {query_name}")); - let query_run = self.benchmark_query(sql, query_name, &ctx).await; + benchmark_run.start_new_case(&format!("Query {query_id}")); + let query_run = self.benchmark_query(sql, &query_id.to_string(), &ctx).await; match query_run { Ok(query_results) => { for iter in query_results { @@ -191,8 +169,10 @@ impl RunOpt { } } Err(e) => { - eprintln!("Query {query_name} failed: {e}"); - benchmark_run.write_iter(std::time::Duration::from_secs(0), 0); + return Err(DataFusionError::Context( + "NLJ benchmark Q{query_id} failed with error:".to_string(), + Box::new(e), + )); } } } @@ -229,7 +209,7 @@ impl RunOpt { let start = Instant::now(); let df = ctx.sql(sql).await?; let batches = df.collect().await?; - let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; + let elapsed = start.elapsed(); let row_count = batches.iter().map(|b| b.num_rows()).sum(); println!( From 25ffcf3949804a2f472f8dac1587318f4c893f99 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 19 Jul 2025 11:44:28 +0800 Subject: [PATCH 4/7] small clean-up --- benchmarks/src/nlj.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index 39b517c32c84..1a34dda69119 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -18,7 +18,7 @@ use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; use datafusion::{error::Result, prelude::SessionContext}; use datafusion_common::instant::Instant; -use datafusion_common::{exec_datafusion_err, DataFusionError}; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError}; use structopt::StructOpt; /// Run the Nested Loop Join (NLJ) benchmark @@ -141,11 +141,15 @@ impl RunOpt { if query_id >= 1 && query_id <= NLJ_QUERIES.len() { query_id..=query_id } else { - return Err(exec_datafusion_err!( - "Query {} not found. Available queries: 1 to {}", - query_id, + return exec_err!( + "Query {query_id} not found. Available queries: 1 to {}", NLJ_QUERIES.len() - )); + ); + // return Err(exec_datafusion_err!( + // "Query {} not found. Available queries: 1 to {}", + // query_id, + // NLJ_QUERIES.len() + // )); } } None => 1..=NLJ_QUERIES.len(), From 569007894af91fbf51dd498ee0ea18b966be7b4a Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 19 Jul 2025 16:41:56 +0800 Subject: [PATCH 5/7] Update benchmarks/src/nlj.rs --- benchmarks/src/nlj.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index 1a34dda69119..4466641170ca 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -145,11 +145,6 @@ impl RunOpt { "Query {query_id} not found. Available queries: 1 to {}", NLJ_QUERIES.len() ); - // return Err(exec_datafusion_err!( - // "Query {} not found. Available queries: 1 to {}", - // query_id, - // NLJ_QUERIES.len() - // )); } } None => 1..=NLJ_QUERIES.len(), From fe02cc3c36ca696b9c3969956fc92a622e5681a7 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 25 Jul 2025 11:03:50 +0800 Subject: [PATCH 6/7] add doc to ./benchmark/README.md --- benchmarks/README.md | 78 ++++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index d0f413b2e97b..f7266eea9cb1 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -379,37 +379,6 @@ Your benchmark should create and use an instance of `BenchmarkRun` defined in `b The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience. -## Cancellation - -Test performance of cancelling queries. - -Queries in DataFusion should stop executing "quickly" after they are -cancelled (the output stream is dropped). - -The queries are executed on a synthetic dataset generated during -the benchmark execution that is an anonymized version of a -real-world data set. - -The query is an anonymized version of a real-world query, and the -test starts the query then cancels it and reports how long it takes -for the runtime to fully exit. - -Example output: - -``` -Using 7 files found on disk -Starting to load data into in-memory object store -Done loading data into in-memory object store -in main, sleeping -Starting spawned -Creating logical plan... -Creating physical plan... -Executing physical plan... -Getting results... -cancelling thread -done dropping runtime in 83.531417ms -``` - ## ClickBench The ClickBench[1] benchmarks are widely cited in the industry and @@ -680,3 +649,50 @@ For example, to run query 1 with the small data generated above: ```bash cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/window.sql --query 1 ``` + +# Micro-Benchmarks + +## Nested Loop Join + +This benchmark focuses on the performance of queries with nested loop joins, minimizing other overheads such as scanning data sources or evaluating predicates. + +Different queries are included to test nested loop joins under various workloads. + +### Example Run + +```bash +# No need to generate data: this benchmark uses table function `range()` as the data source + +./bench.sh run nlj +``` + +## Cancellation + +Test performance of cancelling queries. + +Queries in DataFusion should stop executing "quickly" after they are +cancelled (the output stream is dropped). + +The queries are executed on a synthetic dataset generated during +the benchmark execution that is an anonymized version of a +real-world data set. + +The query is an anonymized version of a real-world query, and the +test starts the query then cancels it and reports how long it takes +for the runtime to fully exit. + +Example output: + +``` +Using 7 files found on disk +Starting to load data into in-memory object store +Done loading data into in-memory object store +in main, sleeping +Starting spawned +Creating logical plan... +Creating physical plan... +Executing physical plan... +Getting results... +cancelling thread +done dropping runtime in 83.531417ms +``` From 97415fbb089d2d61b3d88d14df95755faa67e1a9 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Thu, 31 Jul 2025 22:37:40 +0800 Subject: [PATCH 7/7] don't buffer result in benchmark runner --- benchmarks/src/nlj.rs | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index 4466641170ca..cf19d041d63a 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -16,11 +16,14 @@ // under the License. use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; +use datafusion::physical_plan::execute_stream; use datafusion::{error::Result, prelude::SessionContext}; use datafusion_common::instant::Instant; use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError}; use structopt::StructOpt; +use futures::StreamExt; + /// Run the Nested Loop Join (NLJ) benchmark /// /// This micro-benchmark focuses on the performance characteristics of NLJs. @@ -206,11 +209,11 @@ impl RunOpt { for i in 0..self.common.iterations { let start = Instant::now(); - let df = ctx.sql(sql).await?; - let batches = df.collect().await?; + + let row_count = Self::execute_sql_without_result_buffering(sql, ctx).await?; + let elapsed = start.elapsed(); - let row_count = batches.iter().map(|b| b.num_rows()).sum(); println!( "Query {query_name} iteration {i} returned {row_count} rows in {elapsed:?}" ); @@ -220,4 +223,28 @@ impl RunOpt { Ok(query_results) } + + /// Executes the SQL query and drops each result batch after evaluation, to + /// minimizes memory usage by not buffering results. + /// + /// Returns the total result row count + async fn execute_sql_without_result_buffering( + sql: &str, + ctx: &SessionContext, + ) -> Result { + let mut row_count = 0; + + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let mut stream = execute_stream(physical_plan, ctx.task_ctx())?; + + while let Some(batch) = stream.next().await { + row_count += batch?.num_rows(); + + // Evaluate the result and do nothing, the result will be dropped + // to reduce memory pressure + } + + Ok(row_count) + } }