Skip to content

Commit 2ed0967

Browse files
authored
chore: extract comparison into separate tool (#2632)
1 parent 0b33b05 commit 2ed0967

File tree

4 files changed

+258
-48
lines changed

4 files changed

+258
-48
lines changed

dev/benchmarks/tpcbench.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,22 @@
2121
from pyspark.sql import SparkSession
2222
import time
2323

24+
# rename same columns aliases
25+
# a, a, b, b -> a, a_1, b, b_1
26+
#
27+
# Important for writing data where column name uniqueness is required
28+
def dedup_columns(df):
29+
counts = {}
30+
new_cols = []
31+
for c in df.columns:
32+
if c not in counts:
33+
counts[c] = 0
34+
new_cols.append(c)
35+
else:
36+
counts[c] += 1
37+
new_cols.append(f"{c}_{counts[c]}")
38+
return df.toDF(*new_cols)
39+
2440
def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None):
2541

2642
# Initialize a SparkSession
@@ -91,9 +107,19 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
91107
df.explain()
92108

93109
if write_path is not None:
94-
output_path = f"{write_path}/q{query}"
95-
df.coalesce(1).write.mode("overwrite").parquet(output_path)
96-
print(f"Query {query} results written to {output_path}")
110+
# skip results with empty schema
111+
# coming across for running DDL stmt
112+
if len(df.columns) > 0:
113+
output_path = f"{write_path}/q{query}"
114+
# rename same column names for output
115+
# a, a, b, b => a, a_1, b, b_1
116+
# output doesn't allow non unique column names
117+
deduped = dedup_columns(df)
118+
# sort by all columns to have predictable output dataset for comparison
119+
deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path)
120+
print(f"Query {query} results written to {output_path}")
121+
else:
122+
print(f"Skipping write: DataFrame has no schema for {output_path}")
97123
else:
98124
rows = df.collect()
99125
print(f"Query {query} returned {len(rows)} rows")
@@ -132,4 +158,5 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
132158
parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.")
133159
args = parser.parse_args()
134160

135-
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)
161+
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)
162+

fuzz-testing/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,19 @@ $SPARK_HOME/bin/spark-submit \
103103
```
104104

105105
Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md`
106+
107+
### Compare existing datasets
108+
109+
To compare a pair of existing datasets you can use a comparison tool.
110+
The example below is for TPC-H queries results generated by pure Spark and Comet
111+
112+
113+
```shell
114+
$SPARK_HOME/bin/spark-submit \
115+
--master $SPARK_MASTER \
116+
--class org.apache.comet.fuzz.ComparisonTool
117+
target/comet-fuzz-spark3.5_2.12-0.12.0-SNAPSHOT-jar-with-dependencies.jar \
118+
compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet
119+
```
120+
121+
The tool takes a pair of existing folders of the same layout and compares subfolders treating them as parquet based datasets
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.fuzz
21+
22+
import java.io.File
23+
24+
import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand}
25+
26+
import org.apache.spark.sql.{functions, SparkSession}
27+
28+
class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) {
29+
object compareParquet extends Subcommand("compareParquet") {
30+
val inputSparkFolder: ScallopOption[String] =
31+
opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
32+
val inputCometFolder: ScallopOption[String] =
33+
opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
34+
}
35+
addSubcommand(compareParquet)
36+
verify()
37+
}
38+
39+
object ComparisonTool {
40+
41+
lazy val spark: SparkSession = SparkSession
42+
.builder()
43+
.getOrCreate()
44+
45+
def main(args: Array[String]): Unit = {
46+
val conf = new ComparisonToolConf(args.toIndexedSeq)
47+
conf.subcommand match {
48+
case Some(conf.compareParquet) =>
49+
compareParquetFolders(
50+
spark,
51+
conf.compareParquet.inputSparkFolder(),
52+
conf.compareParquet.inputCometFolder())
53+
54+
case _ =>
55+
// scalastyle:off println
56+
println("Invalid subcommand")
57+
// scalastyle:on println
58+
sys.exit(-1)
59+
}
60+
}
61+
62+
private def compareParquetFolders(
63+
spark: SparkSession,
64+
sparkFolderPath: String,
65+
cometFolderPath: String): Unit = {
66+
67+
val output = QueryRunner.createOutputMdFile()
68+
69+
try {
70+
val sparkFolder = new File(sparkFolderPath)
71+
val cometFolder = new File(cometFolderPath)
72+
73+
if (!sparkFolder.exists() || !sparkFolder.isDirectory) {
74+
throw new IllegalArgumentException(
75+
s"Spark folder does not exist or is not a directory: $sparkFolderPath")
76+
}
77+
78+
if (!cometFolder.exists() || !cometFolder.isDirectory) {
79+
throw new IllegalArgumentException(
80+
s"Comet folder does not exist or is not a directory: $cometFolderPath")
81+
}
82+
83+
// Get all subdirectories from the Spark folder
84+
val sparkSubfolders = sparkFolder
85+
.listFiles()
86+
.filter(_.isDirectory)
87+
.map(_.getName)
88+
.sorted
89+
90+
output.write("# Comparing Parquet Folders\n\n")
91+
output.write(s"Spark folder: $sparkFolderPath\n")
92+
output.write(s"Comet folder: $cometFolderPath\n")
93+
output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n")
94+
95+
// Compare each subfolder
96+
sparkSubfolders.foreach { subfolderName =>
97+
val sparkSubfolderPath = new File(sparkFolder, subfolderName)
98+
val cometSubfolderPath = new File(cometFolder, subfolderName)
99+
100+
if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) {
101+
output.write(s"## Subfolder: $subfolderName\n")
102+
output.write(
103+
s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n")
104+
} else {
105+
output.write(s"## Comparing subfolder: $subfolderName\n\n")
106+
107+
try {
108+
// Read Spark parquet files
109+
spark.conf.set("spark.comet.enabled", "false")
110+
val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath)
111+
val sparkRows = sparkDf.orderBy(sparkDf.columns.map(functions.col): _*).collect()
112+
113+
// Read Comet parquet files
114+
val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath)
115+
val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect()
116+
117+
// Compare the results
118+
if (QueryComparison.assertSameRows(sparkRows, cometRows, output)) {
119+
output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
120+
}
121+
} catch {
122+
case e: Exception =>
123+
output.write(
124+
s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n")
125+
val sw = new java.io.StringWriter()
126+
val p = new java.io.PrintWriter(sw)
127+
e.printStackTrace(p)
128+
p.close()
129+
output.write(s"```\n${sw.toString}\n```\n\n")
130+
}
131+
}
132+
133+
output.flush()
134+
}
135+
136+
output.write("\n# Comparison Complete\n")
137+
output.write(s"Compared ${sparkSubfolders.length} subfolders\n")
138+
139+
} finally {
140+
output.close()
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)