From 72ae3556bf22d7f6e66c3913b3913eb0601949c7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 13:07:18 -0600 Subject: [PATCH 01/12] failing test --- .../org/apache/comet/CometExpressionSuite.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index ddbe7d14e2..37d25e87fe 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -20,14 +20,11 @@ package org.apache.comet import java.time.{Duration, Period} - import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.Random - import org.scalactic.source.Position import org.scalatest.Tag - import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} @@ -40,8 +37,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types._ - import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus +import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -60,6 +57,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val DIVIDE_BY_ZERO_EXCEPTION_MSG = """Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" + test("sort floating point with negative zero") { + val schema = StructType(Seq( + StructField("c0", DataTypes.FloatType, true), + StructField("c0", DataTypes.DoubleType, true) + )) + val df = FuzzDataGenerator.generateDataFrame(new Random(42), spark, schema, 1000, DataGenOptions(generateNegativeZero = true)) + df.createOrReplaceTempView("tbl") + checkSparkAnswerAndOperator("select * from tbl order by 1, 2") + } + test("compare true/false to negative zero") { Seq(false, true).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { From 19cadc6f1f5e861ffdf58a5d77a7a3e982d2d939 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 13:18:22 -0600 Subject: [PATCH 02/12] make floating-point sort compatible with Spark: --- .../org/apache/comet/serde/CometSort.scala | 20 +++++++++++++++++-- .../apache/comet/CometExpressionSuite.scala | 19 +++++++++++++----- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala index 2dec25c0dd..8994e184f8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala @@ -22,11 +22,12 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ import org.apache.spark.sql.execution.SortExec +import org.apache.spark.sql.types.DataTypes import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.OperatorOuterClass.Operator -import org.apache.comet.serde.QueryPlanSerde.{exprToProto, supportedSortType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType, supportedSortType} object CometSort extends CometOperatorSerde[SortExec] { @@ -42,7 +43,22 @@ object CometSort extends CometOperatorSerde[SortExec] { return None } - val sortOrders = op.sortOrder.map(exprToProto(_, op.child.output)) + val sortOrders: Seq[Option[ExprOuterClass.Expr]] = op.sortOrder.map { + case expr + if expr.dataType == DataTypes.FloatType || expr.dataType == DataTypes.DoubleType => + // handle negative zero correctly + exprToProto(expr, op.child.output).map(exprProto => + ExprOuterClass.Expr + .newBuilder() + .setNormalizeNanAndZero( + ExprOuterClass.NormalizeNaNAndZero + .newBuilder() + .setChild(exprProto) + .setDatatype(serializeDataType(expr.dataType).get)) + .build()) + case expr => + exprToProto(expr, op.child.output) + } if (sortOrders.forall(_.isDefined) && childOp.nonEmpty) { val sortBuilder = OperatorOuterClass.Sort diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 37d25e87fe..e0a3e1d363 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -20,11 +20,14 @@ package org.apache.comet import java.time.{Duration, Period} + import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.Random + import org.scalactic.source.Position import org.scalatest.Tag + import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} @@ -37,6 +40,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types._ + import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} @@ -58,11 +62,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { """Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" test("sort floating point with negative zero") { - val schema = StructType(Seq( - StructField("c0", DataTypes.FloatType, true), - StructField("c0", DataTypes.DoubleType, true) - )) - val df = FuzzDataGenerator.generateDataFrame(new Random(42), spark, schema, 1000, DataGenOptions(generateNegativeZero = true)) + val schema = StructType( + Seq( + StructField("c0", DataTypes.FloatType, true), + StructField("c0", DataTypes.DoubleType, true))) + val df = FuzzDataGenerator.generateDataFrame( + new Random(42), + spark, + schema, + 1000, + DataGenOptions(generateNegativeZero = true)) df.createOrReplaceTempView("tbl") checkSparkAnswerAndOperator("select * from tbl order by 1, 2") } From 2def18f8296873a49b7350e1901e611387b5f170 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 14:51:35 -0600 Subject: [PATCH 03/12] fall back --- .../org/apache/comet/serde/CometSort.scala | 79 ++++++++++++++----- .../apache/comet/serde/QueryPlanSerde.scala | 31 +------- .../apache/comet/CometExpressionSuite.scala | 44 ++++++++++- 3 files changed, 105 insertions(+), 49 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala index 8994e184f8..67b5044dfd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala @@ -21,13 +21,71 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Descending, NullsFirst, NullsLast, SortOrder} import org.apache.spark.sql.execution.SortExec -import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType} import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.OperatorOuterClass.Operator -import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType, supportedSortType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType} + +object CometSortOrder extends CometExpressionSerde[SortOrder] { + + val floatFallbackMessage = "Complex types containing floating-point not supported" + + override def getSupportLevel(expr: SortOrder): SupportLevel = { + + def containsFloatingPoint(dt: DataType): Boolean = { + dt match { + case DataTypes.FloatType | DataTypes.DoubleType => true + case ArrayType(elementType, _) => containsFloatingPoint(elementType) + case StructType(fields) => fields.exists(f => containsFloatingPoint(f.dataType)) + case MapType(keyType, valueType, _) => + containsFloatingPoint(keyType) || containsFloatingPoint(valueType) + case _ => false + } + } + + if (containsFloatingPoint(expr.child.dataType)) { + Incompatible(Some(floatFallbackMessage)) + } else { + Compatible() + } + } + + override def convert( + expr: SortOrder, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(expr.child, inputs, binding) + + if (childExpr.isDefined) { + val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder() + + sortOrderBuilder.setChild(childExpr.get) + + expr.direction match { + case Ascending => sortOrderBuilder.setDirectionValue(0) + case Descending => sortOrderBuilder.setDirectionValue(1) + } + + expr.nullOrdering match { + case NullsFirst => sortOrderBuilder.setNullOrderingValue(0) + case NullsLast => sortOrderBuilder.setNullOrderingValue(1) + } + + Some( + ExprOuterClass.Expr + .newBuilder() + .setSortOrder(sortOrderBuilder) + .build()) + } else { + withInfo(expr, expr.child) + None + } + } +} object CometSort extends CometOperatorSerde[SortExec] { @@ -43,22 +101,7 @@ object CometSort extends CometOperatorSerde[SortExec] { return None } - val sortOrders: Seq[Option[ExprOuterClass.Expr]] = op.sortOrder.map { - case expr - if expr.dataType == DataTypes.FloatType || expr.dataType == DataTypes.DoubleType => - // handle negative zero correctly - exprToProto(expr, op.child.output).map(exprProto => - ExprOuterClass.Expr - .newBuilder() - .setNormalizeNanAndZero( - ExprOuterClass.NormalizeNaNAndZero - .newBuilder() - .setChild(exprProto) - .setDatatype(serializeDataType(expr.dataType).get)) - .build()) - case expr => - exprToProto(expr, op.child.output) - } + val sortOrders = op.sortOrder.map(exprToProto(_, op.child.output)) if (sortOrders.forall(_.isDefined) && childOp.nonEmpty) { val sortBuilder = OperatorOuterClass.Sort diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9f418e3068..8361f1e958 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -221,7 +221,6 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Cast] -> CometCast) private val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - // TODO SortOrder (?) // TODO PromotePrecision // TODO KnownFloatingPointNormalized // TODO ScalarSubquery @@ -235,7 +234,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Coalesce] -> CometCoalesce, classOf[Literal] -> CometLiteral, classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId, - classOf[SparkPartitionID] -> CometSparkPartitionId) + classOf[SparkPartitionID] -> CometSparkPartitionId, + classOf[SortOrder] -> CometSortOrder) /** * Mapping of Spark expression class to Comet expression handler. @@ -698,33 +698,6 @@ object QueryPlanSerde extends Logging with CometExprShim { versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr match { - case SortOrder(child, direction, nullOrdering, _) => - val childExpr = exprToProtoInternal(child, inputs, binding) - - if (childExpr.isDefined) { - val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder() - sortOrderBuilder.setChild(childExpr.get) - - direction match { - case Ascending => sortOrderBuilder.setDirectionValue(0) - case Descending => sortOrderBuilder.setDirectionValue(1) - } - - nullOrdering match { - case NullsFirst => sortOrderBuilder.setNullOrderingValue(0) - case NullsLast => sortOrderBuilder.setNullOrderingValue(1) - } - - Some( - ExprOuterClass.Expr - .newBuilder() - .setSortOrder(sortOrderBuilder) - .build()) - } else { - withInfo(expr, child) - None - } - case UnaryExpression(child) if expr.prettyName == "promote_precision" => // `UnaryExpression` includes `PromotePrecision` for Spark 3.3 // `PromotePrecision` is just a wrapper, don't need to serialize it. diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e0a3e1d363..dfd2769d02 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -65,7 +65,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val schema = StructType( Seq( StructField("c0", DataTypes.FloatType, true), - StructField("c0", DataTypes.DoubleType, true))) + StructField("c1", DataTypes.DoubleType, true))) val df = FuzzDataGenerator.generateDataFrame( new Random(42), spark, @@ -73,7 +73,47 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { 1000, DataGenOptions(generateNegativeZero = true)) df.createOrReplaceTempView("tbl") - checkSparkAnswerAndOperator("select * from tbl order by 1, 2") + + // TODO check fallback reason + checkSparkAnswer("select * from tbl order by 1, 2") + } + + test("sort array of floating point with negative zero") { + val schema = StructType( + Seq( + StructField("c0", DataTypes.createArrayType(DataTypes.FloatType), true), + StructField("c1", DataTypes.createArrayType(DataTypes.DoubleType), true))) + val df = FuzzDataGenerator.generateDataFrame( + new Random(42), + spark, + schema, + 1000, + DataGenOptions(generateNegativeZero = true)) + df.createOrReplaceTempView("tbl") + + // TODO check fallback reason + checkSparkAnswer("select * from tbl order by 1, 2") + } + + test("sort struct containing floating point with negative zero") { + val schema = StructType( + Seq( + StructField( + "float_struct", + StructType(Seq(StructField("c0", DataTypes.FloatType, true)))), + StructField( + "float_double", + StructType(Seq(StructField("c0", DataTypes.DoubleType, true)))))) + val df = FuzzDataGenerator.generateDataFrame( + new Random(42), + spark, + schema, + 1000, + DataGenOptions(generateNegativeZero = true)) + df.createOrReplaceTempView("tbl") + + // TODO check fallback reason + checkSparkAnswer("select * from tbl order by 1, 2") } test("compare true/false to negative zero") { From 901866def5ec52db7a546f7713f1de8fa722322f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 14:54:39 -0600 Subject: [PATCH 04/12] fix --- .../src/test/scala/org/apache/comet/CometFuzzTestSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 006112d2b0..bf6faad03c 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -144,7 +144,9 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val randomSize = Random.nextInt(shuffledPrimitiveCols.length) + 1 val randomColsSubset = shuffledPrimitiveCols.take(randomSize).toArray.mkString(",") val sql = s"SELECT $randomColsSubset FROM t1 ORDER BY $randomColsSubset" - checkSparkAnswerAndOperator(sql) + + // TODO + checkSparkAnswer/*AndOperator*/(sql) } } From 05a4de03fb37eda6149e7b87efe1b896c775f99a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 15:00:59 -0600 Subject: [PATCH 05/12] update --- docs/source/user-guide/latest/compatibility.md | 3 +++ spark/src/main/scala/org/apache/comet/serde/CometSort.scala | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 562baabfdc..866fe3d3aa 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -97,6 +97,9 @@ because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`). functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)). So Comet will add additional normalization expression of NaN and zero for comparison. +Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with +Spark if the data contains both zero and negative zero. + There is a known bug with using count(distinct) within aggregate queries, where each NaN value will be counted separately [#1824](https://github.com/apache/datafusion-comet/issues/1824). diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala index 67b5044dfd..3a7f0d9372 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala @@ -32,8 +32,6 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, object CometSortOrder extends CometExpressionSerde[SortOrder] { - val floatFallbackMessage = "Complex types containing floating-point not supported" - override def getSupportLevel(expr: SortOrder): SupportLevel = { def containsFloatingPoint(dt: DataType): Boolean = { @@ -48,7 +46,7 @@ object CometSortOrder extends CometExpressionSerde[SortOrder] { } if (containsFloatingPoint(expr.child.dataType)) { - Incompatible(Some(floatFallbackMessage)) + Incompatible(Some(s"Sorting on floating-point is not 100% compatible with Spark. ${CometConf.COMPAT_GUIDE}")) } else { Compatible() } From 777046578078fe41f1783db9a17b1b47433e94cc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 15:02:08 -0600 Subject: [PATCH 06/12] format --- spark/src/main/scala/org/apache/comet/serde/CometSort.scala | 3 ++- spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala index 3a7f0d9372..4a1063458c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala @@ -46,7 +46,8 @@ object CometSortOrder extends CometExpressionSerde[SortOrder] { } if (containsFloatingPoint(expr.child.dataType)) { - Incompatible(Some(s"Sorting on floating-point is not 100% compatible with Spark. ${CometConf.COMPAT_GUIDE}")) + Incompatible(Some( + s"Sorting on floating-point is not 100% compatible with Spark. ${CometConf.COMPAT_GUIDE}")) } else { Compatible() } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index bf6faad03c..52ba55ade9 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -146,7 +146,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $randomColsSubset FROM t1 ORDER BY $randomColsSubset" // TODO - checkSparkAnswer/*AndOperator*/(sql) + checkSparkAnswer /*AndOperator*/ (sql) } } From 8f3cbc5e1e66129226be9b02ecd26cd11e393865 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Oct 2025 16:24:31 -0600 Subject: [PATCH 07/12] enable --- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 2308858f61..219d15e234 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -86,6 +86,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") + conf.set(CometConf.getExprAllowIncompatConfigKey("SortOrder"), "true") conf } From 12bd291a4b777ece208f51b65ff55e29310f771a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 26 Oct 2025 09:37:05 -0600 Subject: [PATCH 08/12] fix --- .../apache/comet/CometExpressionSuite.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index dfd2769d02..9aecf55b3d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -74,8 +74,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { DataGenOptions(generateNegativeZero = true)) df.createOrReplaceTempView("tbl") - // TODO check fallback reason - checkSparkAnswer("select * from tbl order by 1, 2") + withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") { + checkSparkAnswerAndFallbackReason( + "select * from tbl order by 1, 2", + "unsupported range partitioning sort order") + } } test("sort array of floating point with negative zero") { @@ -91,8 +94,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { DataGenOptions(generateNegativeZero = true)) df.createOrReplaceTempView("tbl") - // TODO check fallback reason - checkSparkAnswer("select * from tbl order by 1, 2") + withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") { + checkSparkAnswerAndFallbackReason( + "select * from tbl order by 1, 2", + "unsupported range partitioning sort order") + } } test("sort struct containing floating point with negative zero") { @@ -112,8 +118,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { DataGenOptions(generateNegativeZero = true)) df.createOrReplaceTempView("tbl") - // TODO check fallback reason - checkSparkAnswer("select * from tbl order by 1, 2") + withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") { + checkSparkAnswerAndFallbackReason( + "select * from tbl order by 1, 2", + "unsupported range partitioning sort order") + } } test("compare true/false to negative zero") { From d2c7f941791750552f240d61e51df5420703a4d0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 26 Oct 2025 09:39:54 -0600 Subject: [PATCH 09/12] tuning guide --- docs/source/user-guide/latest/tuning.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index cc01095269..a29cfcb4d2 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -100,6 +100,12 @@ Comet Performance It may be possible to reduce Comet's memory overhead by reducing batch sizes or increasing number of partitions. +## Optimizing Sorting on Floating-Point + +Sorting on floating point values is disabled by default, because of known differences when the data +contains both positive and negative zero. This is likely an edge case that is not of concern for many users +and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`. + ## Optimizing Joins Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a From 370793edb313db4a7446990cebd46107214424eb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 26 Oct 2025 11:54:51 -0600 Subject: [PATCH 10/12] docs --- docs/source/user-guide/latest/compatibility.md | 3 ++- docs/source/user-guide/latest/tuning.md | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 866fe3d3aa..6c3bab59df 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -98,7 +98,8 @@ functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [a So Comet will add additional normalization expression of NaN and zero for comparison. Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with -Spark if the data contains both zero and negative zero. +Spark if the data contains both zero and negative zero. This is likely an edge case that is not of concern for many users +and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`. There is a known bug with using count(distinct) within aggregate queries, where each NaN value will be counted separately [#1824](https://github.com/apache/datafusion-comet/issues/1824). diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index a29cfcb4d2..09adf36203 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -100,7 +100,7 @@ Comet Performance It may be possible to reduce Comet's memory overhead by reducing batch sizes or increasing number of partitions. -## Optimizing Sorting on Floating-Point +## Optimizing Sorting on Floating-Point Values Sorting on floating point values is disabled by default, because of known differences when the data contains both positive and negative zero. This is likely an edge case that is not of concern for many users From 937d514667236f3e1ab4464bbdf1bd08f1efa0c3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 26 Oct 2025 11:55:43 -0600 Subject: [PATCH 11/12] docs --- docs/source/user-guide/latest/tuning.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 09adf36203..21b1df6522 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -102,8 +102,8 @@ It may be possible to reduce Comet's memory overhead by reducing batch sizes or ## Optimizing Sorting on Floating-Point Values -Sorting on floating point values is disabled by default, because of known differences when the data -contains both positive and negative zero. This is likely an edge case that is not of concern for many users +Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with +Spark if the data contains both zero and negative zero. This is likely an edge case that is not of concern for many users and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`. ## Optimizing Joins From 2ce637d742ead526855340130aca59c3eb629b60 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 26 Oct 2025 12:14:04 -0600 Subject: [PATCH 12/12] prep for review --- .../src/test/scala/org/apache/comet/CometFuzzTestSuite.scala | 4 +--- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 52ba55ade9..006112d2b0 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -144,9 +144,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val randomSize = Random.nextInt(shuffledPrimitiveCols.length) + 1 val randomColsSubset = shuffledPrimitiveCols.take(randomSize).toArray.mkString(",") val sql = s"SELECT $randomColsSubset FROM t1 ORDER BY $randomColsSubset" - - // TODO - checkSparkAnswer /*AndOperator*/ (sql) + checkSparkAnswerAndOperator(sql) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 219d15e234..fe2edc7056 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -86,6 +86,9 @@ abstract class CometTestBase conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") + // SortOrder is incompatible for mixed zero and negative zero floating point values, but + // this is an edge case, and we expect most users to allow sorts on floating point, so we + // enable this for the tests conf.set(CometConf.getExprAllowIncompatConfigKey("SortOrder"), "true") conf }