Skip to content

Commit 5554794

Browse files
berkaysynnadaadriangb
authored andcommitted
Update enforce_distribution.rs (apache#16913)
1 parent 5a19105 commit 5554794

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -925,32 +925,34 @@ fn add_hash_on_top(
925925
Ok(input)
926926
}
927927

928-
/// Adds a [`SortPreservingMergeExec`] operator on top of input executor
929-
/// to satisfy single distribution requirement.
928+
/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
929+
/// on top of the given plan node to satisfy a single partition requirement
930+
/// while preserving ordering constraints.
930931
///
931-
/// # Arguments
932+
/// # Parameters
932933
///
933934
/// * `input`: Current node.
934935
///
935936
/// # Returns
936937
///
937-
/// Updated node with an execution plan, where desired single
938-
/// distribution is satisfied by adding [`SortPreservingMergeExec`].
939-
fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
940-
// Add SortPreservingMerge only when partition count is larger than 1.
938+
/// Updated node with an execution plan, where the desired single distribution
939+
/// requirement is satisfied.
940+
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
941+
// Apply only when the partition count is larger than one.
941942
if input.plan.output_partitioning().partition_count() > 1 {
942943
// When there is an existing ordering, we preserve ordering
943944
// when decreasing partitions. This will be un-done in the future
944945
// if any of the following conditions is true
945946
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
946947
// - Usage of order preserving variants is not desirable
947948
// (determined by flag `config.optimizer.prefer_existing_sort`)
948-
let new_plan = if let Some(ordering) = input.plan.output_ordering() {
949+
let new_plan = if let Some(req) = input.plan.output_ordering() {
949950
Arc::new(SortPreservingMergeExec::new(
950-
ordering.clone(),
951+
req.clone(),
951952
Arc::clone(&input.plan),
952953
)) as _
953954
} else {
955+
// If there is no input order, we can simply coalesce partitions:
954956
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
955957
};
956958

@@ -1259,7 +1261,7 @@ pub fn ensure_distribution(
12591261
// Satisfy the distribution requirement if it is unmet.
12601262
match &requirement {
12611263
Distribution::SinglePartition => {
1262-
child = add_spm_on_top(child);
1264+
child = add_merge_on_top(child);
12631265
}
12641266
Distribution::HashPartitioned(exprs) => {
12651267
if add_roundrobin {

0 commit comments

Comments
 (0)