Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`).
functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
So Comet will add additional normalization expression of NaN and zero for comparison.

Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with
Spark if the data contains both zero and negative zero. This is likely an edge case that is not of concern for many users
and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`.

There is a known bug with using count(distinct) within aggregate queries, where each NaN value will be counted
separately [#1824](https://github.com/apache/datafusion-comet/issues/1824).

Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ Comet Performance

It may be possible to reduce Comet's memory overhead by reducing batch sizes or increasing number of partitions.

## Optimizing Sorting on Floating-Point Values

Sorting on floating-point data types (or complex types containing floating-point values) is not compatible with
Spark if the data contains both zero and negative zero. This is likely an edge case that is not of concern for many users
and sorting on floating-point data can be enabled by setting `spark.comet.expression.SortOrder.allowIncompatible=true`.

## Optimizing Joins

Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a
Expand Down
60 changes: 59 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/CometSort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,70 @@ package org.apache.comet.serde

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Descending, NullsFirst, NullsLast, SortOrder}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, supportedSortType}
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType}

object CometSortOrder extends CometExpressionSerde[SortOrder] {

override def getSupportLevel(expr: SortOrder): SupportLevel = {

def containsFloatingPoint(dt: DataType): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def containsFloatingPoint(dt: DataType): Boolean = {
def dataTypeContainsFloatingPoint(dt: DataType): Boolean = {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method takes a DataType parameter, so it seems redundant to include that in the method name as well?

dt match {
case DataTypes.FloatType | DataTypes.DoubleType => true
case ArrayType(elementType, _) => containsFloatingPoint(elementType)
case StructType(fields) => fields.exists(f => containsFloatingPoint(f.dataType))
case MapType(keyType, valueType, _) =>
containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
case _ => false
}
}

if (containsFloatingPoint(expr.child.dataType)) {
Incompatible(Some(
s"Sorting on floating-point is not 100% compatible with Spark. ${CometConf.COMPAT_GUIDE}"))
} else {
Compatible()
}
}

override def convert(
expr: SortOrder,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val childExpr = exprToProtoInternal(expr.child, inputs, binding)

if (childExpr.isDefined) {
val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()

sortOrderBuilder.setChild(childExpr.get)

expr.direction match {
case Ascending => sortOrderBuilder.setDirectionValue(0)
case Descending => sortOrderBuilder.setDirectionValue(1)
}

expr.nullOrdering match {
case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
}

Some(
ExprOuterClass.Expr
.newBuilder()
.setSortOrder(sortOrderBuilder)
.build())
} else {
withInfo(expr, expr.child)
None
}
}
}

object CometSort extends CometOperatorSerde[SortExec] {

Expand Down
31 changes: 2 additions & 29 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Cast] -> CometCast)

private val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
// TODO SortOrder (?)
// TODO PromotePrecision
// TODO KnownFloatingPointNormalized
// TODO ScalarSubquery
Expand All @@ -235,7 +234,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Coalesce] -> CometCoalesce,
classOf[Literal] -> CometLiteral,
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
classOf[SparkPartitionID] -> CometSparkPartitionId)
classOf[SparkPartitionID] -> CometSparkPartitionId,
classOf[SortOrder] -> CometSortOrder)

/**
* Mapping of Spark expression class to Comet expression handler.
Expand Down Expand Up @@ -698,33 +698,6 @@ object QueryPlanSerde extends Logging with CometExprShim {

versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr match {

case SortOrder(child, direction, nullOrdering, _) =>
val childExpr = exprToProtoInternal(child, inputs, binding)

if (childExpr.isDefined) {
val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
sortOrderBuilder.setChild(childExpr.get)

direction match {
case Ascending => sortOrderBuilder.setDirectionValue(0)
case Descending => sortOrderBuilder.setDirectionValue(1)
}

nullOrdering match {
case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
}

Some(
ExprOuterClass.Expr
.newBuilder()
.setSortOrder(sortOrderBuilder)
.build())
} else {
withInfo(expr, child)
None
}

case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
// `UnaryExpression` includes `PromotePrecision` for Spark 3.3
// `PromotePrecision` is just a wrapper, don't need to serialize it.
Expand Down
65 changes: 65 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.sql.types._

import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}

class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
Expand All @@ -60,6 +61,70 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val DIVIDE_BY_ZERO_EXCEPTION_MSG =
"""Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead"""

test("sort floating point with negative zero") {
val schema = StructType(
Seq(
StructField("c0", DataTypes.FloatType, true),
StructField("c1", DataTypes.DoubleType, true)))
val df = FuzzDataGenerator.generateDataFrame(
new Random(42),
spark,
schema,
1000,
DataGenOptions(generateNegativeZero = true))
df.createOrReplaceTempView("tbl")

withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") {
checkSparkAnswerAndFallbackReason(
"select * from tbl order by 1, 2",
"unsupported range partitioning sort order")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a follow-up issue for improving the fallback reason to show the root cause

#2651

}
}

test("sort array of floating point with negative zero") {
val schema = StructType(
Seq(
StructField("c0", DataTypes.createArrayType(DataTypes.FloatType), true),
StructField("c1", DataTypes.createArrayType(DataTypes.DoubleType), true)))
val df = FuzzDataGenerator.generateDataFrame(
new Random(42),
spark,
schema,
1000,
DataGenOptions(generateNegativeZero = true))
df.createOrReplaceTempView("tbl")

withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") {
checkSparkAnswerAndFallbackReason(
"select * from tbl order by 1, 2",
"unsupported range partitioning sort order")
}
}

test("sort struct containing floating point with negative zero") {
val schema = StructType(
Seq(
StructField(
"float_struct",
StructType(Seq(StructField("c0", DataTypes.FloatType, true)))),
StructField(
"float_double",
StructType(Seq(StructField("c0", DataTypes.DoubleType, true))))))
val df = FuzzDataGenerator.generateDataFrame(
new Random(42),
spark,
schema,
1000,
DataGenOptions(generateNegativeZero = true))
df.createOrReplaceTempView("tbl")

withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false") {
checkSparkAnswerAndFallbackReason(
"select * from tbl order by 1, 2",
"unsupported range partitioning sort order")
}
}

test("compare true/false to negative zero") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Expand Down
4 changes: 4 additions & 0 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ abstract class CometTestBase
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
// SortOrder is incompatible for mixed zero and negative zero floating point values, but
// this is an edge case, and we expect most users to allow sorts on floating point, so we
// enable this for the tests
conf.set(CometConf.getExprAllowIncompatConfigKey("SortOrder"), "true")
conf
}

Expand Down
Loading