Skip to content

Commit e1a5cdf

Browse files
findepialamb
andauthored
Support multiple ordered array_agg aggregations (#16625)
* Validate states shape in merge_batch Due to `..` in the pattern, the `OrderSensitiveArrayAggAccumulator::merge_batch` did not validate it's not receiving additional states columns it ignores. Update the code to check number of inputs. * Support multiple ordered array_agg Before the change, `array_agg` with ordering would depend on input being ordered. As a result, it was impossible to do two or more `array_agg(x ORDER BY ...)` with incompatible ordering. This change moves ordering responsibility into `OrderSensitiveArrayAggAccumulator`. When input is pre-ordered (beneficial ordering), no additional work is done. However, when it's not, `array_agg` accumulator will order the data on its own. * Generate sorts based on aggregations soft requirements The sorting consideration before aggregations did respect only ordered aggregation functions with `AggregateOrderSensitivity::HardRequirement`. This change includes sorting expectations from `AggregateOrderSensitivity::Beneficial` functions. When beneficial ordered function requirements are not satisfied, no error is raised, they are considered in the second pass only. * Fix reversing first_value, last_value Upon reversing, a schema and field mismatch would happen. * Revert "Fix reversing first_value, last_value" This reverts commit 9b7e94d. * sort array_agg input the old way whenever possible * revert some now unnecessary change * Improve doc for SoftRequiement Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Add comment for include_soft_requirement Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Document include_soft_requirement param * fmt * doc fix --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent c6d5520 commit e1a5cdf

File tree

6 files changed

+222
-62
lines changed

6 files changed

+222
-62
lines changed

datafusion/ffi/src/udaf/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,7 @@ impl AggregateUDFImpl for ForeignAggregateUDF {
589589
pub enum FFI_AggregateOrderSensitivity {
590590
Insensitive,
591591
HardRequirement,
592+
SoftRequirement,
592593
Beneficial,
593594
}
594595

@@ -597,6 +598,7 @@ impl From<FFI_AggregateOrderSensitivity> for AggregateOrderSensitivity {
597598
match value {
598599
FFI_AggregateOrderSensitivity::Insensitive => Self::Insensitive,
599600
FFI_AggregateOrderSensitivity::HardRequirement => Self::HardRequirement,
601+
FFI_AggregateOrderSensitivity::SoftRequirement => Self::SoftRequirement,
600602
FFI_AggregateOrderSensitivity::Beneficial => Self::Beneficial,
601603
}
602604
}
@@ -607,6 +609,7 @@ impl From<AggregateOrderSensitivity> for FFI_AggregateOrderSensitivity {
607609
match value {
608610
AggregateOrderSensitivity::Insensitive => Self::Insensitive,
609611
AggregateOrderSensitivity::HardRequirement => Self::HardRequirement,
612+
AggregateOrderSensitivity::SoftRequirement => Self::SoftRequirement,
610613
AggregateOrderSensitivity::Beneficial => Self::Beneficial,
611614
}
612615
}
@@ -748,6 +751,7 @@ mod tests {
748751
fn test_round_trip_all_order_sensitivities() {
749752
test_round_trip_order_sensitivity(AggregateOrderSensitivity::Insensitive);
750753
test_round_trip_order_sensitivity(AggregateOrderSensitivity::HardRequirement);
754+
test_round_trip_order_sensitivity(AggregateOrderSensitivity::SoftRequirement);
751755
test_round_trip_order_sensitivity(AggregateOrderSensitivity::Beneficial);
752756
}
753757
}

datafusion/functions-aggregate-common/src/order.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,20 @@ pub enum AggregateOrderSensitivity {
2222
/// Ordering at the input is not important for the result of the aggregator.
2323
Insensitive,
2424
/// Indicates that the aggregate expression has a hard requirement on ordering.
25-
/// The aggregator can not produce a correct result unless its ordering
25+
/// The aggregator cannot produce a correct result unless its ordering
2626
/// requirement is satisfied.
2727
HardRequirement,
28+
/// Indicates that the aggregator is more efficient when the input is ordered
29+
/// but can still produce its result correctly regardless of the input ordering.
30+
/// This is similar to, but stronger than, [`Self::Beneficial`].
31+
///
32+
/// Similarly to [`Self::HardRequirement`], when possible DataFusion will insert
33+
/// a `SortExec`, to reorder the input to match the SoftRequirement. However,
34+
/// when such a `SortExec` cannot be inserted, (for example, due to conflicting
35+
/// [`Self::HardRequirement`] with other ordered aggregates in the query),
36+
/// the aggregate function will still execute, without the preferred order, unlike
37+
/// with [`Self::HardRequirement`]
38+
SoftRequirement,
2839
/// Indicates that ordering is beneficial for the aggregate expression in terms
2940
/// of evaluation efficiency. The aggregator can produce its result efficiently
3041
/// when its required ordering is satisfied; however, it can still produce the
@@ -38,7 +49,7 @@ impl AggregateOrderSensitivity {
3849
}
3950

4051
pub fn is_beneficial(&self) -> bool {
41-
self.eq(&AggregateOrderSensitivity::Beneficial)
52+
matches!(self, Self::SoftRequirement | Self::Beneficial)
4253
}
4354

4455
pub fn hard_requires(&self) -> bool {

datafusion/functions-aggregate/src/array_agg.rs

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::cmp::Ordering;
2121
use std::collections::{HashSet, VecDeque};
22-
use std::mem::{size_of, size_of_val};
22+
use std::mem::{size_of, size_of_val, take};
2323
use std::sync::Arc;
2424

2525
use arrow::array::{
@@ -31,14 +31,17 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields};
3131

3232
use datafusion_common::cast::as_list_array;
3333
use datafusion_common::scalar::copy_array_data;
34-
use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder};
34+
use datafusion_common::utils::{
35+
compare_rows, get_row_at_idx, take_function_args, SingleRowListArrayBuilder,
36+
};
3537
use datafusion_common::{exec_err, internal_err, Result, ScalarValue};
3638
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
3739
use datafusion_expr::utils::format_state_name;
3840
use datafusion_expr::{
3941
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
4042
};
4143
use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
44+
use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
4245
use datafusion_functions_aggregate_common::utils::ordering_fields;
4346
use datafusion_macros::user_doc;
4447
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
@@ -78,12 +81,14 @@ This aggregation function can only mix DISTINCT and ORDER BY if the ordering exp
7881
/// ARRAY_AGG aggregate expression
7982
pub struct ArrayAgg {
8083
signature: Signature,
84+
is_input_pre_ordered: bool,
8185
}
8286

8387
impl Default for ArrayAgg {
8488
fn default() -> Self {
8589
Self {
8690
signature: Signature::any(1, Volatility::Immutable),
91+
is_input_pre_ordered: false,
8792
}
8893
}
8994
}
@@ -144,6 +149,20 @@ impl AggregateUDFImpl for ArrayAgg {
144149
Ok(fields)
145150
}
146151

152+
fn order_sensitivity(&self) -> AggregateOrderSensitivity {
153+
AggregateOrderSensitivity::SoftRequirement
154+
}
155+
156+
fn with_beneficial_ordering(
157+
self: Arc<Self>,
158+
beneficial_ordering: bool,
159+
) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
160+
Ok(Some(Arc::new(Self {
161+
signature: self.signature.clone(),
162+
is_input_pre_ordered: beneficial_ordering,
163+
})))
164+
}
165+
147166
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
148167
let data_type = acc_args.exprs[0].data_type(acc_args.schema)?;
149168
let ignore_nulls =
@@ -196,6 +215,7 @@ impl AggregateUDFImpl for ArrayAgg {
196215
&data_type,
197216
&ordering_dtypes,
198217
ordering,
218+
self.is_input_pre_ordered,
199219
acc_args.is_reversed,
200220
ignore_nulls,
201221
)
@@ -518,6 +538,8 @@ pub(crate) struct OrderSensitiveArrayAggAccumulator {
518538
datatypes: Vec<DataType>,
519539
/// Stores the ordering requirement of the `Accumulator`.
520540
ordering_req: LexOrdering,
541+
/// Whether the input is known to be pre-ordered
542+
is_input_pre_ordered: bool,
521543
/// Whether the aggregation is running in reverse.
522544
reverse: bool,
523545
/// Whether the aggregation should ignore null values.
@@ -531,6 +553,7 @@ impl OrderSensitiveArrayAggAccumulator {
531553
datatype: &DataType,
532554
ordering_dtypes: &[DataType],
533555
ordering_req: LexOrdering,
556+
is_input_pre_ordered: bool,
534557
reverse: bool,
535558
ignore_nulls: bool,
536559
) -> Result<Self> {
@@ -541,11 +564,34 @@ impl OrderSensitiveArrayAggAccumulator {
541564
ordering_values: vec![],
542565
datatypes,
543566
ordering_req,
567+
is_input_pre_ordered,
544568
reverse,
545569
ignore_nulls,
546570
})
547571
}
548572

573+
fn sort(&mut self) {
574+
let sort_options = self
575+
.ordering_req
576+
.iter()
577+
.map(|sort_expr| sort_expr.options)
578+
.collect::<Vec<_>>();
579+
let mut values = take(&mut self.values)
580+
.into_iter()
581+
.zip(take(&mut self.ordering_values))
582+
.collect::<Vec<_>>();
583+
let mut delayed_cmp_err = Ok(());
584+
values.sort_by(|(_, left_ordering), (_, right_ordering)| {
585+
compare_rows(left_ordering, right_ordering, &sort_options).unwrap_or_else(
586+
|err| {
587+
delayed_cmp_err = Err(err);
588+
Ordering::Equal
589+
},
590+
)
591+
});
592+
(self.values, self.ordering_values) = values.into_iter().unzip();
593+
}
594+
549595
fn evaluate_orderings(&self) -> Result<ScalarValue> {
550596
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
551597

@@ -616,9 +662,8 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
616662
// inside `ARRAY_AGG` list, we will receive an `Array` that stores values
617663
// received from its ordering requirement expression. (This information
618664
// is necessary for during merging).
619-
let [array_agg_values, agg_orderings, ..] = &states else {
620-
return exec_err!("State should have two elements");
621-
};
665+
let [array_agg_values, agg_orderings] =
666+
take_function_args("OrderSensitiveArrayAggAccumulator::merge_batch", states)?;
622667
let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() else {
623668
return exec_err!("Expects to receive a list array");
624669
};
@@ -629,8 +674,11 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
629674
let mut partition_ordering_values = vec![];
630675

631676
// Existing values should be merged also.
632-
partition_values.push(self.values.clone().into());
633-
partition_ordering_values.push(self.ordering_values.clone().into());
677+
if !self.is_input_pre_ordered {
678+
self.sort();
679+
}
680+
partition_values.push(take(&mut self.values).into());
681+
partition_ordering_values.push(take(&mut self.ordering_values).into());
634682

635683
// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
636684
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
@@ -679,13 +727,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
679727
}
680728

681729
fn state(&mut self) -> Result<Vec<ScalarValue>> {
730+
if !self.is_input_pre_ordered {
731+
self.sort();
732+
}
733+
682734
let mut result = vec![self.evaluate()?];
683735
result.push(self.evaluate_orderings()?);
684736

685737
Ok(result)
686738
}
687739

688740
fn evaluate(&mut self) -> Result<ScalarValue> {
741+
if !self.is_input_pre_ordered {
742+
self.sort();
743+
}
744+
689745
if self.values.is_empty() {
690746
return Ok(ScalarValue::new_null_list(
691747
self.datatypes[0].clone(),

0 commit comments

Comments
 (0)