From 24709803a64a84a336013cc30e363d24ec1cc5e9 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 22 Oct 2025 15:04:04 -0700 Subject: [PATCH 01/11] chore: add TPC queries to be run by fuzzer correctness checker --- .../main/scala/org/apache/comet/fuzz/Main.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala index b9e63c76a0..146fe52ecf 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala @@ -61,6 +61,17 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { opt[Int](required = false, descr = "Number of input files to use") } addSubcommand(runQueries) + object runTPCQueries extends Subcommand("runTPC") { + val dataFolder: ScallopOption[String] = + opt[String]( + required = true, + descr = "Folder for input data. Expected folder struct `$dataFolder/tableName/*.parquet`") + val queriesFolder: ScallopOption[String] = + opt[String]( + required = true, + descr = "Folder for test queries. Expected folder struct `$queriesFolder/*.sql`") + } + addSubcommand(runTPCQueries) verify() } @@ -108,6 +119,11 @@ object Main { conf.generateQueries.numQueries()) case Some(conf.runQueries) => QueryRunner.runQueries(spark, conf.runQueries.numFiles(), conf.runQueries.filename()) + case Some(conf.runTPCQueries) => + QueryRunner.runTPCQueries( + spark, + conf.runTPCQueries.dataFolder(), + conf.runTPCQueries.queriesFolder()) case _ => // scalastyle:off println println("Invalid subcommand") From 3aea54e41e96008f1afa81f60fca1a21c1218e5d Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 23 Oct 2025 11:04:26 -0700 Subject: [PATCH 02/11] chore: extract comparison tool from fuzzer --- .../apache/comet/fuzz/ComparisonTool.scala | 157 ++++++++++++++++++ .../scala/org/apache/comet/fuzz/Main.scala | 16 -- 2 files changed, 157 insertions(+), 16 deletions(-) create mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala new file mode 100644 index 0000000000..1f83b9a01f --- /dev/null +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.fuzz + +import java.io.File + +import scala.util.Random + +import org.rogach.scallop.{ScallopConf, Subcommand} +import org.rogach.scallop.ScallopOption + +import org.apache.spark.sql.SparkSession + +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} + +class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { + object compareParquet extends Subcommand("compareParquet") { + val inputSparkFolder: ScallopOption[String] = + opt[String](required = true, descr = "Folder with Spark produced results in Parquet format") + val inputCometFolder: ScallopOption[String] = + opt[String](required = true, descr = "Folder with Comet produced results in Parquet format") + } + addSubcommand(compareParquet) + verify() +} + +object Main { + + lazy val spark: SparkSession = SparkSession + .builder() + .getOrCreate() + + def main(args: Array[String]): Unit = { + val conf = new Conf(args.toIndexedSeq) + conf.subcommand match { + case Some(conf.compareParquet) => + compareParquetFolders( + spark, + conf.compareParquet.inputSparkFolder(), + conf.compareParquet.inputCometFolder()) + + case _ => + // scalastyle:off println + println("Invalid subcommand") + // scalastyle:on println + sys.exit(-1) + } + } + + private def compareParquetFolders( + spark: SparkSession, + sparkFolderPath: String, + cometFolderPath: String): Unit = { + + val output = QueryRunner.createOutputMdFile() + + try { + val sparkFolder = new File(sparkFolderPath) + val cometFolder = new File(cometFolderPath) + + if (!sparkFolder.exists() || !sparkFolder.isDirectory) { + throw new IllegalArgumentException( + s"Spark folder does not exist or is not a directory: $sparkFolderPath") + } + + if (!cometFolder.exists() || !cometFolder.isDirectory) { + throw new IllegalArgumentException( + s"Comet folder does not exist or is not a directory: $cometFolderPath") + } + + // Get all subdirectories from the Spark folder + val sparkSubfolders = sparkFolder + .listFiles() + .filter(_.isDirectory) + .map(_.getName) + .sorted + + output.write(s"# Comparing Parquet Folders\n\n") + output.write(s"Spark folder: $sparkFolderPath\n") + output.write(s"Comet folder: $cometFolderPath\n") + output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n") + + // Compare each subfolder + sparkSubfolders.foreach { subfolderName => + val sparkSubfolderPath = new File(sparkFolder, subfolderName) + val cometSubfolderPath = new File(cometFolder, subfolderName) + + if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) { + output.write(s"## Subfolder: $subfolderName\n") + output.write( + s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n") + } else { + output.write(s"## Comparing subfolder: $subfolderName\n\n") + + try { + // Read Spark parquet files + spark.conf.set("spark.comet.enabled", "false") + val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath) + val sparkRows = sparkDf.collect() + val sparkPlan = sparkDf.queryExecution.executedPlan.toString + + // Read Comet parquet files + val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath) + val cometRows = cometDf.collect() + val cometPlan = cometDf.queryExecution.executedPlan.toString + + // Compare the results + QueryComparison.assertSameRows( + sparkRows, + cometRows, + sqlText = s"Reading parquet from subfolder: $subfolderName", + sparkPlan, + cometPlan, + output) + + output.write(s"✓ Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") + + } catch { + case e: Exception => + output.write( + s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n") + val sw = new java.io.StringWriter() + val p = new java.io.PrintWriter(sw) + e.printStackTrace(p) + p.close() + output.write(s"```\n${sw.toString}\n```\n\n") + } + } + + output.flush() + } + + output.write(s"\n# Comparison Complete\n") + output.write(s"Compared ${sparkSubfolders.length} subfolders\n") + + } finally { + output.close() + } + } +} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala index 146fe52ecf..b9e63c76a0 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala @@ -61,17 +61,6 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { opt[Int](required = false, descr = "Number of input files to use") } addSubcommand(runQueries) - object runTPCQueries extends Subcommand("runTPC") { - val dataFolder: ScallopOption[String] = - opt[String]( - required = true, - descr = "Folder for input data. Expected folder struct `$dataFolder/tableName/*.parquet`") - val queriesFolder: ScallopOption[String] = - opt[String]( - required = true, - descr = "Folder for test queries. Expected folder struct `$queriesFolder/*.sql`") - } - addSubcommand(runTPCQueries) verify() } @@ -119,11 +108,6 @@ object Main { conf.generateQueries.numQueries()) case Some(conf.runQueries) => QueryRunner.runQueries(spark, conf.runQueries.numFiles(), conf.runQueries.filename()) - case Some(conf.runTPCQueries) => - QueryRunner.runTPCQueries( - spark, - conf.runTPCQueries.dataFolder(), - conf.runTPCQueries.queriesFolder()) case _ => // scalastyle:off println println("Invalid subcommand") From ced0c0da523630abf129e8cefe79ae96067037bf Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 23 Oct 2025 11:43:33 -0700 Subject: [PATCH 03/11] chore: extract comparison tool from fuzzer --- .../org/apache/comet/fuzz/ComparisonTool.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala index 1f83b9a01f..cc816dfe00 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala @@ -21,16 +21,11 @@ package org.apache.comet.fuzz import java.io.File -import scala.util.Random - -import org.rogach.scallop.{ScallopConf, Subcommand} -import org.rogach.scallop.ScallopOption +import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} import org.apache.spark.sql.SparkSession -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} - -class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { +class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) { object compareParquet extends Subcommand("compareParquet") { val inputSparkFolder: ScallopOption[String] = opt[String](required = true, descr = "Folder with Spark produced results in Parquet format") @@ -41,14 +36,14 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { verify() } -object Main { +object ComparisonToolMain { lazy val spark: SparkSession = SparkSession .builder() .getOrCreate() def main(args: Array[String]): Unit = { - val conf = new Conf(args.toIndexedSeq) + val conf = new ComparisonToolConf(args.toIndexedSeq) conf.subcommand match { case Some(conf.compareParquet) => compareParquetFolders( @@ -92,7 +87,7 @@ object Main { .map(_.getName) .sorted - output.write(s"# Comparing Parquet Folders\n\n") + output.write("# Comparing Parquet Folders\n\n") output.write(s"Spark folder: $sparkFolderPath\n") output.write(s"Comet folder: $cometFolderPath\n") output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n") @@ -147,7 +142,7 @@ object Main { output.flush() } - output.write(s"\n# Comparison Complete\n") + output.write("\n# Comparison Complete\n") output.write(s"Compared ${sparkSubfolders.length} subfolders\n") } finally { From 3631b54dfa1dbe890e1fc28ab099acda975ef5e8 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 23 Oct 2025 12:22:14 -0700 Subject: [PATCH 04/11] chore: extract comparison tool from fuzzer --- .../src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala index cc816dfe00..a60f8218f4 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala @@ -125,7 +125,7 @@ object ComparisonToolMain { cometPlan, output) - output.write(s"✓ Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") + output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") } catch { case e: Exception => From f381c3df046dff60112f5e9c40eb2084a13df59e Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 27 Oct 2025 10:05:41 -0700 Subject: [PATCH 05/11] chore: extract comparison tool from fuzzer --- .../org/apache/comet/fuzz/QueryRunner.scala | 122 +++++++++++------- 1 file changed, 73 insertions(+), 49 deletions(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala index bcc9f98d06..c43eebc99c 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -21,13 +21,22 @@ package org.apache.comet.fuzz import java.io.{BufferedWriter, FileWriter, PrintWriter, StringWriter} -import scala.collection.mutable.WrappedArray +import scala.collection.mutable import scala.io.Source import org.apache.spark.sql.{Row, SparkSession} object QueryRunner { + def createOutputMdFile(): BufferedWriter = { + val outputFilename = s"results-${System.currentTimeMillis()}.md" + // scalastyle:off println + println(s"Writing results to $outputFilename") + // scalastyle:on println + + new BufferedWriter(new FileWriter(outputFilename)) + } + def runQueries( spark: SparkSession, numFiles: Int, @@ -39,12 +48,7 @@ object QueryRunner { var cometFailureCount = 0 var cometSuccessCount = 0 - val outputFilename = s"results-${System.currentTimeMillis()}.md" - // scalastyle:off println - println(s"Writing results to $outputFilename") - // scalastyle:on println - - val w = new BufferedWriter(new FileWriter(outputFilename)) + val w = createOutputMdFile() // register input files for (i <- 0 until numFiles) { @@ -76,42 +80,13 @@ object QueryRunner { val cometRows = df.collect() val cometPlan = df.queryExecution.executedPlan.toString - var success = true - if (sparkRows.length == cometRows.length) { - var i = 0 - while (i < sparkRows.length) { - val l = sparkRows(i) - val r = cometRows(i) - assert(l.length == r.length) - for (j <- 0 until l.length) { - if (!same(l(j), r(j))) { - success = false - showSQL(w, sql) - showPlans(w, sparkPlan, cometPlan) - w.write(s"First difference at row $i:\n") - w.write("Spark: `" + formatRow(l) + "`\n") - w.write("Comet: `" + formatRow(r) + "`\n") - i = sparkRows.length - } - } - i += 1 - } - } else { - success = false - showSQL(w, sql) - showPlans(w, sparkPlan, cometPlan) - w.write( - s"[ERROR] Spark produced ${sparkRows.length} rows and " + - s"Comet produced ${cometRows.length} rows.\n") - } - - // check that the plan contains Comet operators - if (!cometPlan.contains("Comet")) { - success = false - showSQL(w, sql) - showPlans(w, sparkPlan, cometPlan) - w.write("[ERROR] Comet did not accelerate any part of the plan\n") - } + val success = QueryComparison.assertSameRows( + sparkRows, + cometRows, + sqlText = sql, + sparkPlan, + cometPlan, + output = w) if (success) { cometSuccessCount += 1 @@ -123,7 +98,7 @@ object QueryRunner { case e: Exception => // the query worked in Spark but failed in Comet, so this is likely a bug in Comet cometFailureCount += 1 - showSQL(w, sql) + QueryComparison.showSQL(w, sql) w.write("### Spark Plan\n") w.write(s"```\n$sparkPlan\n```\n") @@ -145,7 +120,7 @@ object QueryRunner { // we expect many generated queries to be invalid invalidQueryCount += 1 if (showFailedSparkQueries) { - showSQL(w, sql) + QueryComparison.showSQL(w, sql) w.write(s"Query failed in Spark: ${e.getMessage}\n") } } @@ -161,6 +136,56 @@ object QueryRunner { querySource.close() } } +} + +object QueryComparison { + def assertSameRows( + sparkRows: Array[Row], + cometRows: Array[Row], + sqlText: String, + sparkPlan: String, + cometPlan: String, + output: BufferedWriter): Boolean = { + var success = true + if (sparkRows.length == cometRows.length) { + var i = 0 + while (i < sparkRows.length) { + val l = sparkRows(i) + val r = cometRows(i) + assert(l.length == r.length) + for (j <- 0 until l.length) { + if (!same(l(j), r(j))) { + success = false + showSQL(output, sqlText) + showPlans(output, sparkPlan, cometPlan) + output.write(s"First difference at row $i:\n") + output.write("Spark: `" + formatRow(l) + "`\n") + output.write("Comet: `" + formatRow(r) + "`\n") + i = sparkRows.length + } + } + i += 1 + } + } else { + success = false + showSQL(output, sqlText) + showPlans(output, sparkPlan, cometPlan) + output.write( + s"[ERROR] Spark produced ${sparkRows.length} rows and " + + s"Comet produced ${cometRows.length} rows.\n") + } + + // check that the plan contains Comet operators + if (!cometPlan.contains("Comet")) { + success = false + showSQL(output, sqlText) + showPlans(output, sparkPlan, cometPlan) + output.write("[ERROR] Comet did not accelerate any part of the plan\n") + } + + success + + } private def same(l: Any, r: Any): Boolean = { if (l == null || r == null) { @@ -179,7 +204,7 @@ object QueryRunner { case (a: Double, b: Double) => (a - b).abs <= 0.000001 case (a: Array[_], b: Array[_]) => a.length == b.length && a.zip(b).forall(x => same(x._1, x._2)) - case (a: WrappedArray[_], b: WrappedArray[_]) => + case (a: mutable.WrappedArray[_], b: mutable.WrappedArray[_]) => a.length == b.length && a.zip(b).forall(x => same(x._1, x._2)) case (a: Row, b: Row) => val aa = a.toSeq @@ -192,7 +217,7 @@ object QueryRunner { private def format(value: Any): String = { value match { case null => "NULL" - case v: WrappedArray[_] => s"[${v.map(format).mkString(",")}]" + case v: mutable.WrappedArray[_] => s"[${v.map(format).mkString(",")}]" case v: Array[Byte] => s"[${v.mkString(",")}]" case r: Row => formatRow(r) case other => other.toString @@ -203,7 +228,7 @@ object QueryRunner { row.toSeq.map(format).mkString(",") } - private def showSQL(w: BufferedWriter, sql: String, maxLength: Int = 120): Unit = { + def showSQL(w: BufferedWriter, sql: String, maxLength: Int = 120): Unit = { w.write("## SQL\n") w.write("```\n") val words = sql.split(" ") @@ -229,5 +254,4 @@ object QueryRunner { w.write("### Comet Plan\n") w.write(s"```\n$cometPlan\n```\n") } - } From 720d77e4f4cd829304950702b8330253339aae09 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 27 Oct 2025 13:49:10 -0700 Subject: [PATCH 06/11] chore: extract comparison tool from fuzzer --- dev/benchmarks/tpcbench.py | 9 ++++++--- fuzz-testing/README.md | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py index 39c34ca7ce..a7b16a301c 100644 --- a/dev/benchmarks/tpcbench.py +++ b/dev/benchmarks/tpcbench.py @@ -91,9 +91,12 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu df.explain() if write_path is not None: - output_path = f"{write_path}/q{query}" - df.coalesce(1).write.mode("overwrite").parquet(output_path) - print(f"Query {query} results written to {output_path}") + if len(df.columns) > 0: + output_path = f"{write_path}/q{query}" + df.coalesce(1).write.mode("overwrite").parquet(output_path) + print(f"Query {query} results written to {output_path}") + else: + print(f"Skipping write: DataFrame has no schema for {output_path}") else: rows = df.collect() print(f"Query {query} returned {len(rows)} rows") diff --git a/fuzz-testing/README.md b/fuzz-testing/README.md index c8cea5be82..c3b9a9753d 100644 --- a/fuzz-testing/README.md +++ b/fuzz-testing/README.md @@ -103,3 +103,19 @@ $SPARK_HOME/bin/spark-submit \ ``` Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md` + +### Compare existing datasets + +To compare a pair of existing datasets you can use a comparison tool. +The example below is for TPC-H queries results generated by pure Spark and Comet + + +```shell +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --class org.apache.comet.fuzz.ComparisonToolMain + target/comet-fuzz-spark3.5_2.12-0.12.0-SNAPSHOT-jar-with-dependencies.jar \ + compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet +``` + +The tool takes a pair of existing folders of the same layout and compares subfolders treating them as parquet based datasets \ No newline at end of file From ce2a5b261197d58bc8019648fa0c698cef864631 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 28 Oct 2025 10:24:18 -0700 Subject: [PATCH 07/11] chore: extract comparison tool from fuzzer --- dev/benchmarks/tpcbench.py | 23 +++++++++++++++++-- .../org/apache/comet/fuzz/QueryRunner.scala | 4 ++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py index a7b16a301c..b464033c85 100644 --- a/dev/benchmarks/tpcbench.py +++ b/dev/benchmarks/tpcbench.py @@ -21,6 +21,22 @@ from pyspark.sql import SparkSession import time +# rename same columns aliases +# a, a, b, b -> a, a_1, b, b_1 +# +# Important for writing data where column name uniqueness is required +def dedup_columns(df): + counts = {} + new_cols = [] + for c in df.columns: + if c not in counts: + counts[c] = 0 + new_cols.append(c) + else: + counts[c] += 1 + new_cols.append(f"{c}_{counts[c]}") + return df.toDF(*new_cols) + def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None): # Initialize a SparkSession @@ -91,9 +107,11 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu df.explain() if write_path is not None: + # skip results with empty schema + # coming across for running DDL stmt if len(df.columns) > 0: output_path = f"{write_path}/q{query}" - df.coalesce(1).write.mode("overwrite").parquet(output_path) + dedup_columns(df).coalesce(1).write.mode("overwrite").parquet(output_path) print(f"Query {query} results written to {output_path}") else: print(f"Skipping write: DataFrame has no schema for {output_path}") @@ -135,4 +153,5 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.") args = parser.parse_args() - main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write) \ No newline at end of file + main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write) + diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala index c43eebc99c..387a7cce1c 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -152,6 +152,10 @@ object QueryComparison { while (i < sparkRows.length) { val l = sparkRows(i) val r = cometRows(i) + // Check the schema is equal for first row only + if (i == 0) + assert(l.schema == r.schema) + assert(l.length == r.length) for (j <- 0 until l.length) { if (!same(l(j), r(j))) { From 6d28c0cd0d452e01caade48c4f1cd68ad67d4342 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 28 Oct 2025 11:19:45 -0700 Subject: [PATCH 08/11] chore: extract comparison tool from fuzzer --- dev/benchmarks/tpcbench.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py index b464033c85..dc9b9108b1 100644 --- a/dev/benchmarks/tpcbench.py +++ b/dev/benchmarks/tpcbench.py @@ -111,7 +111,12 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu # coming across for running DDL stmt if len(df.columns) > 0: output_path = f"{write_path}/q{query}" - dedup_columns(df).coalesce(1).write.mode("overwrite").parquet(output_path) + # sort by all columns to have predictable output dataset for comparison + df_sorted = df.orderBy(*df.columns) + # rename same column names for output + # output doesn't allow non unique column names + # a, a, b, b => a, a_1, b, b_1 + dedup_columns(df_sorted).coalesce(1).write.mode("overwrite").parquet(output_path) print(f"Query {query} results written to {output_path}") else: print(f"Skipping write: DataFrame has no schema for {output_path}") From 95955be62021c17a9ffae088693d215e250789bb Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 28 Oct 2025 15:00:52 -0700 Subject: [PATCH 09/11] chore: extract comparison tool from fuzzer --- dev/benchmarks/tpcbench.py | 8 ++-- fuzz-testing/README.md | 2 +- .../apache/comet/fuzz/ComparisonTool.scala | 18 +++------ .../org/apache/comet/fuzz/QueryRunner.scala | 38 +++++++------------ 4 files changed, 24 insertions(+), 42 deletions(-) diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py index dc9b9108b1..0a91bf0339 100644 --- a/dev/benchmarks/tpcbench.py +++ b/dev/benchmarks/tpcbench.py @@ -111,12 +111,12 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu # coming across for running DDL stmt if len(df.columns) > 0: output_path = f"{write_path}/q{query}" - # sort by all columns to have predictable output dataset for comparison - df_sorted = df.orderBy(*df.columns) # rename same column names for output - # output doesn't allow non unique column names # a, a, b, b => a, a_1, b, b_1 - dedup_columns(df_sorted).coalesce(1).write.mode("overwrite").parquet(output_path) + # output doesn't allow non unique column names + deduped = dedup_columns(df) + # sort by all columns to have predictable output dataset for comparison + deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path) print(f"Query {query} results written to {output_path}") else: print(f"Skipping write: DataFrame has no schema for {output_path}") diff --git a/fuzz-testing/README.md b/fuzz-testing/README.md index c3b9a9753d..74141fbf30 100644 --- a/fuzz-testing/README.md +++ b/fuzz-testing/README.md @@ -113,7 +113,7 @@ The example below is for TPC-H queries results generated by pure Spark and Comet ```shell $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ - --class org.apache.comet.fuzz.ComparisonToolMain + --class org.apache.comet.fuzz.ComparisonTool target/comet-fuzz-spark3.5_2.12-0.12.0-SNAPSHOT-jar-with-dependencies.jar \ compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet ``` diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala index a60f8218f4..e02318e1c2 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala @@ -23,7 +23,7 @@ import java.io.File import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{functions, SparkSession} class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) { object compareParquet extends Subcommand("compareParquet") { @@ -36,7 +36,7 @@ class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) verify() } -object ComparisonToolMain { +object ComparisonTool { lazy val spark: SparkSession = SparkSession .builder() @@ -108,22 +108,14 @@ object ComparisonToolMain { // Read Spark parquet files spark.conf.set("spark.comet.enabled", "false") val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath) - val sparkRows = sparkDf.collect() - val sparkPlan = sparkDf.queryExecution.executedPlan.toString + val sparkRows = sparkDf.orderBy(sparkDf.columns.map(functions.col): _*).collect() // Read Comet parquet files val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath) - val cometRows = cometDf.collect() - val cometPlan = cometDf.queryExecution.executedPlan.toString + val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect() // Compare the results - QueryComparison.assertSameRows( - sparkRows, - cometRows, - sqlText = s"Reading parquet from subfolder: $subfolderName", - sparkPlan, - cometPlan, - output) + QueryComparison.assertSameRows(sparkRows, cometRows, output) output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala index 387a7cce1c..bf972b11ac 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -26,6 +26,8 @@ import scala.io.Source import org.apache.spark.sql.{Row, SparkSession} +import org.apache.comet.fuzz.QueryComparison.showPlans + object QueryRunner { def createOutputMdFile(): BufferedWriter = { @@ -80,17 +82,21 @@ object QueryRunner { val cometRows = df.collect() val cometPlan = df.queryExecution.executedPlan.toString - val success = QueryComparison.assertSameRows( - sparkRows, - cometRows, - sqlText = sql, - sparkPlan, - cometPlan, - output = w) + var success = QueryComparison.assertSameRows(sparkRows, cometRows, output = w) + + // check that the plan contains Comet operators + if (!cometPlan.contains("Comet")) { + success = false + w.write("[ERROR] Comet did not accelerate any part of the plan\n") + } + + QueryComparison.showSQL(w, sql) if (success) { cometSuccessCount += 1 } else { + // show plans for failed queries + showPlans(w, sparkPlan, cometPlan) cometFailureCount += 1 } @@ -142,9 +148,6 @@ object QueryComparison { def assertSameRows( sparkRows: Array[Row], cometRows: Array[Row], - sqlText: String, - sparkPlan: String, - cometPlan: String, output: BufferedWriter): Boolean = { var success = true if (sparkRows.length == cometRows.length) { @@ -160,8 +163,6 @@ object QueryComparison { for (j <- 0 until l.length) { if (!same(l(j), r(j))) { success = false - showSQL(output, sqlText) - showPlans(output, sparkPlan, cometPlan) output.write(s"First difference at row $i:\n") output.write("Spark: `" + formatRow(l) + "`\n") output.write("Comet: `" + formatRow(r) + "`\n") @@ -172,23 +173,12 @@ object QueryComparison { } } else { success = false - showSQL(output, sqlText) - showPlans(output, sparkPlan, cometPlan) output.write( s"[ERROR] Spark produced ${sparkRows.length} rows and " + s"Comet produced ${cometRows.length} rows.\n") } - // check that the plan contains Comet operators - if (!cometPlan.contains("Comet")) { - success = false - showSQL(output, sqlText) - showPlans(output, sparkPlan, cometPlan) - output.write("[ERROR] Comet did not accelerate any part of the plan\n") - } - success - } private def same(l: Any, r: Any): Boolean = { @@ -252,7 +242,7 @@ object QueryComparison { w.write("```\n") } - private def showPlans(w: BufferedWriter, sparkPlan: String, cometPlan: String): Unit = { + def showPlans(w: BufferedWriter, sparkPlan: String, cometPlan: String): Unit = { w.write("### Spark Plan\n") w.write(s"```\n$sparkPlan\n```\n") w.write("### Comet Plan\n") From c75ecfcda386d595ec96559a7eb6e50c5c28850f Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 28 Oct 2025 16:09:14 -0700 Subject: [PATCH 10/11] chore: extract comparison tool from fuzzer --- .../src/main/scala/org/apache/comet/fuzz/QueryRunner.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala index bf972b11ac..5d5ac77fab 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -156,8 +156,9 @@ object QueryComparison { val l = sparkRows(i) val r = cometRows(i) // Check the schema is equal for first row only - if (i == 0) + if (i == 0) { assert(l.schema == r.schema) + } assert(l.length == r.length) for (j <- 0 until l.length) { From b81158320bc26162a8797def5df5211a85f3c37e Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 28 Oct 2025 17:40:38 -0700 Subject: [PATCH 11/11] chore: extract comparison tool from fuzzer --- .../org/apache/comet/fuzz/ComparisonTool.scala | 7 +++---- .../org/apache/comet/fuzz/QueryRunner.scala | 17 +++++++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala index e02318e1c2..a4fd011fee 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala @@ -115,10 +115,9 @@ object ComparisonTool { val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect() // Compare the results - QueryComparison.assertSameRows(sparkRows, cometRows, output) - - output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") - + if (QueryComparison.assertSameRows(sparkRows, cometRows, output)) { + output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") + } } catch { case e: Exception => output.write( diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala index 5d5ac77fab..f4f3452962 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -149,37 +149,42 @@ object QueryComparison { sparkRows: Array[Row], cometRows: Array[Row], output: BufferedWriter): Boolean = { - var success = true if (sparkRows.length == cometRows.length) { var i = 0 while (i < sparkRows.length) { val l = sparkRows(i) val r = cometRows(i) // Check the schema is equal for first row only - if (i == 0) { - assert(l.schema == r.schema) + if (i == 0 && l.schema != r.schema) { + output.write( + s"[ERROR] Spark produced schema ${l.schema} and " + + s"Comet produced schema ${r.schema} rows.\n") + + return false } assert(l.length == r.length) for (j <- 0 until l.length) { if (!same(l(j), r(j))) { - success = false output.write(s"First difference at row $i:\n") output.write("Spark: `" + formatRow(l) + "`\n") output.write("Comet: `" + formatRow(r) + "`\n") i = sparkRows.length + + return false } } i += 1 } } else { - success = false output.write( s"[ERROR] Spark produced ${sparkRows.length} rows and " + s"Comet produced ${cometRows.length} rows.\n") + + return false } - success + true } private def same(l: Any, r: Any): Boolean = {