Skip to content
Merged
35 changes: 31 additions & 4 deletions dev/benchmarks/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@
from pyspark.sql import SparkSession
import time

# rename same columns aliases
# a, a, b, b -> a, a_1, b, b_1
#
# Important for writing data where column name uniqueness is required
def dedup_columns(df):
counts = {}
new_cols = []
for c in df.columns:
if c not in counts:
counts[c] = 0
new_cols.append(c)
else:
counts[c] += 1
new_cols.append(f"{c}_{counts[c]}")
return df.toDF(*new_cols)

def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None):

# Initialize a SparkSession
Expand Down Expand Up @@ -91,9 +107,19 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
df.explain()

if write_path is not None:
output_path = f"{write_path}/q{query}"
df.coalesce(1).write.mode("overwrite").parquet(output_path)
print(f"Query {query} results written to {output_path}")
# skip results with empty schema
# coming across for running DDL stmt
if len(df.columns) > 0:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark complains on saving df with empty schema, this can happen for DDL statements which came across in TPC sets

output_path = f"{write_path}/q{query}"
# sort by all columns to have predictable output dataset for comparison
df_sorted = df.orderBy(*df.columns)
# rename same column names for output
# output doesn't allow non unique column names
# a, a, b, b => a, a_1, b, b_1
dedup_columns(df_sorted).coalesce(1).write.mode("overwrite").parquet(output_path)
print(f"Query {query} results written to {output_path}")
else:
print(f"Skipping write: DataFrame has no schema for {output_path}")
else:
rows = df.collect()
print(f"Query {query} returned {len(rows)} rows")
Expand Down Expand Up @@ -132,4 +158,5 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.")
args = parser.parse_args()

main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)

16 changes: 16 additions & 0 deletions fuzz-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,19 @@ $SPARK_HOME/bin/spark-submit \
```

Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md`

### Compare existing datasets

To compare a pair of existing datasets you can use a comparison tool.
The example below is for TPC-H queries results generated by pure Spark and Comet


```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class org.apache.comet.fuzz.ComparisonToolMain
target/comet-fuzz-spark3.5_2.12-0.12.0-SNAPSHOT-jar-with-dependencies.jar \
compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet
```

The tool takes a pair of existing folders of the same layout and compares subfolders treating them as parquet based datasets
152 changes: 152 additions & 0 deletions fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.fuzz

import java.io.File

import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand}

import org.apache.spark.sql.SparkSession

class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) {
object compareParquet extends Subcommand("compareParquet") {
val inputSparkFolder: ScallopOption[String] =
opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
val inputCometFolder: ScallopOption[String] =
opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
}
addSubcommand(compareParquet)
verify()
}

object ComparisonToolMain {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just be named ComparisonTool?


lazy val spark: SparkSession = SparkSession
.builder()
.getOrCreate()

def main(args: Array[String]): Unit = {
val conf = new ComparisonToolConf(args.toIndexedSeq)
conf.subcommand match {
case Some(conf.compareParquet) =>
compareParquetFolders(
spark,
conf.compareParquet.inputSparkFolder(),
conf.compareParquet.inputCometFolder())

case _ =>
// scalastyle:off println
println("Invalid subcommand")
// scalastyle:on println
sys.exit(-1)
}
}

private def compareParquetFolders(
spark: SparkSession,
sparkFolderPath: String,
cometFolderPath: String): Unit = {

val output = QueryRunner.createOutputMdFile()

try {
val sparkFolder = new File(sparkFolderPath)
val cometFolder = new File(cometFolderPath)

if (!sparkFolder.exists() || !sparkFolder.isDirectory) {
throw new IllegalArgumentException(
s"Spark folder does not exist or is not a directory: $sparkFolderPath")
}

if (!cometFolder.exists() || !cometFolder.isDirectory) {
throw new IllegalArgumentException(
s"Comet folder does not exist or is not a directory: $cometFolderPath")
}

// Get all subdirectories from the Spark folder
val sparkSubfolders = sparkFolder
.listFiles()
.filter(_.isDirectory)
.map(_.getName)
.sorted

output.write("# Comparing Parquet Folders\n\n")
output.write(s"Spark folder: $sparkFolderPath\n")
output.write(s"Comet folder: $cometFolderPath\n")
output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n")

// Compare each subfolder
sparkSubfolders.foreach { subfolderName =>
val sparkSubfolderPath = new File(sparkFolder, subfolderName)
val cometSubfolderPath = new File(cometFolder, subfolderName)

if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) {
output.write(s"## Subfolder: $subfolderName\n")
output.write(
s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n")
} else {
output.write(s"## Comparing subfolder: $subfolderName\n\n")

try {
// Read Spark parquet files
spark.conf.set("spark.comet.enabled", "false")
val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath)
val sparkRows = sparkDf.collect()
val sparkPlan = sparkDf.queryExecution.executedPlan.toString

// Read Comet parquet files
val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath)
val cometRows = cometDf.collect()
val cometPlan = cometDf.queryExecution.executedPlan.toString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need to do anything with the plans for reading the Parquet files. Shouldn't we just be comparing the data in the Parquet files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparison has nothing to do with plans it is true. Plans needed to be displayed when assertion happening down the road. Let me think if I can get rid of it


// Compare the results
QueryComparison.assertSameRows(
sparkRows,
cometRows,
sqlText = s"Reading parquet from subfolder: $subfolderName",
sparkPlan,
cometPlan,
output)

output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")

} catch {
case e: Exception =>
output.write(
s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n")
val sw = new java.io.StringWriter()
val p = new java.io.PrintWriter(sw)
e.printStackTrace(p)
p.close()
output.write(s"```\n${sw.toString}\n```\n\n")
}
}

output.flush()
}

output.write("\n# Comparison Complete\n")
output.write(s"Compared ${sparkSubfolders.length} subfolders\n")

} finally {
output.close()
}
}
}
Loading
Loading