Skip to content

Commit 0b33b05

Browse files
authored
fix: Mark SortOrder with floating-point as incompatible (#2650)
1 parent 8f4e047 commit 0b33b05

File tree

6 files changed

+140
-30
lines changed

6 files changed

+140
-30
lines changed

docs/source/user-guide/latest/compatibility.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`).
9797
functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
9898
So Comet will add additional normalization expression of NaN and zero for comparison.
9999

100+
Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with
101+
Spark if the data contains both zero and negative zero. This is likely an edge case that is not of concern for many users
102+
and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`.
103+
100104
There is a known bug with using count(distinct) within aggregate queries, where each NaN value will be counted
101105
separately [#1824](https://github.com/apache/datafusion-comet/issues/1824).
102106

docs/source/user-guide/latest/tuning.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ Comet Performance
100100

101101
It may be possible to reduce Comet's memory overhead by reducing batch sizes or increasing number of partitions.
102102

103+
## Optimizing Sorting on Floating-Point Values
104+
105+
Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with
106+
Spark if the data contains both zero and negative zero. This is likely an edge case that is not of concern for many users
107+
and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`.
108+
103109
## Optimizing Joins
104110

105111
Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a

spark/src/main/scala/org/apache/comet/serde/CometSort.scala

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,70 @@ package org.apache.comet.serde
2121

2222
import scala.jdk.CollectionConverters._
2323

24+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Descending, NullsFirst, NullsLast, SortOrder}
2425
import org.apache.spark.sql.execution.SortExec
26+
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
2527

2628
import org.apache.comet.{CometConf, ConfigEntry}
2729
import org.apache.comet.CometSparkSessionExtensions.withInfo
2830
import org.apache.comet.serde.OperatorOuterClass.Operator
29-
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, supportedSortType}
31+
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType}
32+
33+
object CometSortOrder extends CometExpressionSerde[SortOrder] {
34+
35+
override def getSupportLevel(expr: SortOrder): SupportLevel = {
36+
37+
def containsFloatingPoint(dt: DataType): Boolean = {
38+
dt match {
39+
case DataTypes.FloatType | DataTypes.DoubleType => true
40+
case ArrayType(elementType, _) => containsFloatingPoint(elementType)
41+
case StructType(fields) => fields.exists(f => containsFloatingPoint(f.dataType))
42+
case MapType(keyType, valueType, _) =>
43+
containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
44+
case _ => false
45+
}
46+
}
47+
48+
if (containsFloatingPoint(expr.child.dataType)) {
49+
Incompatible(Some(
50+
s"Sorting on floating-point is not 100% compatible with Spark. ${CometConf.COMPAT_GUIDE}"))
51+
} else {
52+
Compatible()
53+
}
54+
}
55+
56+
override def convert(
57+
expr: SortOrder,
58+
inputs: Seq[Attribute],
59+
binding: Boolean): Option[ExprOuterClass.Expr] = {
60+
val childExpr = exprToProtoInternal(expr.child, inputs, binding)
61+
62+
if (childExpr.isDefined) {
63+
val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
64+
65+
sortOrderBuilder.setChild(childExpr.get)
66+
67+
expr.direction match {
68+
case Ascending => sortOrderBuilder.setDirectionValue(0)
69+
case Descending => sortOrderBuilder.setDirectionValue(1)
70+
}
71+
72+
expr.nullOrdering match {
73+
case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
74+
case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
75+
}
76+
77+
Some(
78+
ExprOuterClass.Expr
79+
.newBuilder()
80+
.setSortOrder(sortOrderBuilder)
81+
.build())
82+
} else {
83+
withInfo(expr, expr.child)
84+
None
85+
}
86+
}
87+
}
3088

3189
object CometSort extends CometOperatorSerde[SortExec] {
3290

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
221221
classOf[Cast] -> CometCast)
222222

223223
private val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
224-
// TODO SortOrder (?)
225224
// TODO PromotePrecision
226225
// TODO KnownFloatingPointNormalized
227226
// TODO ScalarSubquery
@@ -235,7 +234,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
235234
classOf[Coalesce] -> CometCoalesce,
236235
classOf[Literal] -> CometLiteral,
237236
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
238-
classOf[SparkPartitionID] -> CometSparkPartitionId)
237+
classOf[SparkPartitionID] -> CometSparkPartitionId,
238+
classOf[SortOrder] -> CometSortOrder)
239239

240240
/**
241241
* Mapping of Spark expression class to Comet expression handler.
@@ -698,33 +698,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
698698

699699
versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr match {
700700

701-
case SortOrder(child, direction, nullOrdering, _) =>
702-
val childExpr = exprToProtoInternal(child, inputs, binding)
703-
704-
if (childExpr.isDefined) {
705-
val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
706-
sortOrderBuilder.setChild(childExpr.get)
707-
708-
direction match {
709-
case Ascending => sortOrderBuilder.setDirectionValue(0)
710-
case Descending => sortOrderBuilder.setDirectionValue(1)
711-
}
712-
713-
nullOrdering match {
714-
case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
715-
case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
716-
}
717-
718-
Some(
719-
ExprOuterClass.Expr
720-
.newBuilder()
721-
.setSortOrder(sortOrderBuilder)
722-
.build())
723-
} else {
724-
withInfo(expr, child)
725-
None
726-
}
727-
728701
case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
729702
// `UnaryExpression` includes `PromotePrecision` for Spark 3.3
730703
// `PromotePrecision` is just a wrapper, don't need to serialize it.

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
4242
import org.apache.spark.sql.types._
4343

4444
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
45+
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
4546

4647
class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
4748
import testImplicits._
@@ -60,6 +61,70 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
6061
val DIVIDE_BY_ZERO_EXCEPTION_MSG =
6162
"""Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead"""
6263

64+
test("sort floating point with negative zero") {
65+
val schema = StructType(
66+
Seq(
67+
StructField("c0", DataTypes.FloatType, true),
68+
StructField("c1", DataTypes.DoubleType, true)))
69+
val df = FuzzDataGenerator.generateDataFrame(
70+
new Random(42),
71+
spark,
72+
schema,
73+
1000,
74+
DataGenOptions(generateNegativeZero = true))
75+
df.createOrReplaceTempView("tbl")
76+
77+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") {
78+
checkSparkAnswerAndFallbackReason(
79+
"select * from tbl order by 1, 2",
80+
"unsupported range partitioning sort order")
81+
}
82+
}
83+
84+
test("sort array of floating point with negative zero") {
85+
val schema = StructType(
86+
Seq(
87+
StructField("c0", DataTypes.createArrayType(DataTypes.FloatType), true),
88+
StructField("c1", DataTypes.createArrayType(DataTypes.DoubleType), true)))
89+
val df = FuzzDataGenerator.generateDataFrame(
90+
new Random(42),
91+
spark,
92+
schema,
93+
1000,
94+
DataGenOptions(generateNegativeZero = true))
95+
df.createOrReplaceTempView("tbl")
96+
97+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") {
98+
checkSparkAnswerAndFallbackReason(
99+
"select * from tbl order by 1, 2",
100+
"unsupported range partitioning sort order")
101+
}
102+
}
103+
104+
test("sort struct containing floating point with negative zero") {
105+
val schema = StructType(
106+
Seq(
107+
StructField(
108+
"float_struct",
109+
StructType(Seq(StructField("c0", DataTypes.FloatType, true)))),
110+
StructField(
111+
"float_double",
112+
StructType(Seq(StructField("c0", DataTypes.DoubleType, true))))))
113+
val df = FuzzDataGenerator.generateDataFrame(
114+
new Random(42),
115+
spark,
116+
schema,
117+
1000,
118+
DataGenOptions(generateNegativeZero = true))
119+
df.createOrReplaceTempView("tbl")
120+
121+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") {
122+
checkSparkAnswerAndFallbackReason(
123+
"select * from tbl order by 1, 2",
124+
"unsupported range partitioning sort order")
125+
}
126+
}
127+
63128
test("compare true/false to negative zero") {
64129
Seq(false, true).foreach { dictionary =>
65130
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ abstract class CometTestBase
8686
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
8787
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
8888
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
89+
// SortOrder is incompatible for mixed zero and negative zero floating point values, but
90+
// this is an edge case, and we expect most users to allow sorts on floating point, so we
91+
// enable this for the tests
92+
conf.set(CometConf.getExprAllowIncompatConfigKey("SortOrder"), "true")
8993
conf
9094
}
9195

0 commit comments

Comments
 (0)