Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::error::Error;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::PoisonError;

use geozero::error::GeozeroError;

Expand Down Expand Up @@ -436,3 +437,9 @@ impl From<redis::RedisError> for ErrorCode {
ErrorCode::DictionarySourceError(format!("Dictionary Redis Error, cause: {}", error))
}
}

impl<T> From<PoisonError<T>> for ErrorCode {
fn from(error: PoisonError<T>) -> Self {
ErrorCode::Internal(format!("{error}"))
}
}
12 changes: 12 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,18 @@ impl BlockEntry {
BlockEntry::Column(column) => Ok(ColumnView::Column(T::try_downcast_column(column)?)),
}
}

pub fn into_nullable(self) -> BlockEntry {
match self {
BlockEntry::Const(scalar, data_type, n) if !data_type.is_nullable_or_null() => {
BlockEntry::Const(scalar, DataType::Nullable(Box::new(data_type)), n)
}
entry @ BlockEntry::Const(_, _, _)
| entry @ BlockEntry::Column(Column::Nullable(_))
| entry @ BlockEntry::Column(Column::Null { .. }) => entry,
BlockEntry::Column(column) => column.wrap_nullable(None).into(),
}
}
}

impl From<Column> for BlockEntry {
Expand Down
4 changes: 4 additions & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ impl DataSchema {
}
}

pub fn new_ref(fields: Vec<DataField>) -> Arc<Self> {
Self::new(fields).into()
}

pub fn new_from(fields: Vec<DataField>, metadata: BTreeMap<String, String>) -> Self {
Self { fields, metadata }
}
Expand Down
125 changes: 125 additions & 0 deletions src/query/service/src/physical_plans/format/format_nested_loop_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_ast::ast::FormatTreeNode;
use databend_common_exception::Result;
use databend_common_functions::BUILTIN_FUNCTIONS;

use crate::physical_plans::format::append_output_rows_info;
use crate::physical_plans::format::format_output_columns;
use crate::physical_plans::format::plan_stats_info_to_format_tree;
use crate::physical_plans::format::FormatContext;
use crate::physical_plans::format::PhysicalFormat;
use crate::physical_plans::IPhysicalPlan;
use crate::physical_plans::NestedLoopJoin;
use crate::physical_plans::PhysicalPlanMeta;

pub struct NestedLoopJoinFormatter<'a> {
inner: &'a NestedLoopJoin,
}

impl<'a> NestedLoopJoinFormatter<'a> {
pub fn create(inner: &'a NestedLoopJoin) -> Box<dyn PhysicalFormat + 'a> {
Box::new(NestedLoopJoinFormatter { inner })
}
}

impl<'a> PhysicalFormat for NestedLoopJoinFormatter<'a> {
fn get_meta(&self) -> &PhysicalPlanMeta {
self.inner.get_meta()
}

#[recursive::recursive]
fn format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
let conditions = self
.inner
.conditions
.iter()
.map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS).sql_display())
.collect::<Vec<_>>()
.join(", ");

let mut node_children = vec![
FormatTreeNode::new(format!(
"output columns: [{}]",
format_output_columns(self.inner.output_schema()?, ctx.metadata, true)
)),
FormatTreeNode::new(format!("join type: {}", self.inner.join_type)),
FormatTreeNode::new(format!("conditions: [{conditions}]")),
];

if let Some(info) = &self.inner.stat_info {
let items = plan_stats_info_to_format_tree(info);
node_children.extend(items);
}

let left_formatter = self.inner.left.formatter()?;
let mut left_child = left_formatter.dispatch(ctx)?;
left_child.payload = format!("{}(Left)", left_child.payload);

let right_formatter = self.inner.right.formatter()?;
let mut right_child = right_formatter.dispatch(ctx)?;
right_child.payload = format!("{}(Right)", right_child.payload);

node_children.push(left_child);
node_children.push(right_child);

Ok(FormatTreeNode::with_children(
"NestedLoopJoin".to_string(),
node_children,
))
}

#[recursive::recursive]
fn format_join(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
let left_child = self.inner.left.formatter()?.format_join(ctx)?;
let right_child = self.inner.right.formatter()?.format_join(ctx)?;

let children = vec![
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
FormatTreeNode::with_children("Right".to_string(), vec![right_child]),
];

Ok(FormatTreeNode::with_children(
format!("NestedLoopJoin: {}", self.inner.join_type),
children,
))
}

#[recursive::recursive]
fn partial_format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
let left_child = self.inner.left.formatter()?.partial_format(ctx)?;
let right_child = self.inner.right.formatter()?.partial_format(ctx)?;

let mut children = vec![];
if let Some(info) = &self.inner.stat_info {
let items = plan_stats_info_to_format_tree(info);
children.extend(items);
}

append_output_rows_info(&mut children, &ctx.profs, self.inner.get_id());

children.push(FormatTreeNode::with_children("Left".to_string(), vec![
left_child,
]));
children.push(FormatTreeNode::with_children("Right".to_string(), vec![
right_child,
]));

Ok(FormatTreeNode::with_children(
format!("NestedLoopJoin: {}", self.inner.join_type),
children,
))
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/physical_plans/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod format_mutation_into_organize;
mod format_mutation_into_split;
mod format_mutation_manipulate;
mod format_mutation_source;
mod format_nested_loop_join;
mod format_project_set;
mod format_range_join;
mod format_replace_into;
Expand Down Expand Up @@ -93,6 +94,7 @@ pub use format_mutation_into_organize::*;
pub use format_mutation_into_split::*;
pub use format_mutation_manipulate::*;
pub use format_mutation_source::*;
pub use format_nested_loop_join::*;
pub use format_project_set::*;
pub use format_range_join::*;
pub use format_replace_into::*;
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/physical_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod physical_mutation_into_organize;
mod physical_mutation_into_split;
mod physical_mutation_manipulate;
mod physical_mutation_source;
mod physical_nested_loop_join;
mod physical_project_set;
mod physical_r_cte_scan;
mod physical_range_join;
Expand Down Expand Up @@ -90,6 +91,7 @@ pub use physical_mutation_into_organize::MutationOrganize;
pub use physical_mutation_into_split::MutationSplit;
pub use physical_mutation_manipulate::MutationManipulate;
pub use physical_mutation_source::*;
pub use physical_nested_loop_join::NestedLoopJoin;
pub use physical_project_set::ProjectSet;
pub use physical_r_cte_scan::RecursiveCteScan;
pub use physical_range_join::*;
Expand Down
14 changes: 8 additions & 6 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,15 @@ impl PhysicalPlanBuilder {
required: &mut ColumnSet,
others_required: &mut ColumnSet,
) -> (Vec<IndexType>, Vec<IndexType>) {
let retained_columns = self.metadata.read().get_retained_column().clone();
*required = required.union(&retained_columns).cloned().collect();
let column_projections = required.clone().into_iter().collect::<Vec<_>>();

*others_required = others_required.union(&retained_columns).cloned().collect();
let pre_column_projections = others_required.clone().into_iter().collect::<Vec<_>>();
{
let metadata = self.metadata.read();
let retained_columns = metadata.get_retained_column();
required.extend(retained_columns);
others_required.extend(retained_columns);
}

let column_projections = required.iter().copied().collect();
let pre_column_projections = others_required.iter().copied().collect();
(column_projections, pre_column_projections)
}

Expand Down
88 changes: 66 additions & 22 deletions src/query/service/src/physical_plans/physical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_settings::Settings;
use databend_common_sql::binder::is_range_join_condition;
use databend_common_sql::optimizer::ir::RelExpr;
use databend_common_sql::optimizer::ir::SExpr;
Expand All @@ -30,17 +31,52 @@ use crate::physical_plans::PhysicalPlanBuilder;
enum PhysicalJoinType {
Hash,
// The first arg is range conditions, the second arg is other conditions
RangeJoin(Vec<ScalarExpr>, Vec<ScalarExpr>),
RangeJoin {
range: Vec<ScalarExpr>,
other: Vec<ScalarExpr>,
},
LoopJoin {
conditions: Vec<ScalarExpr>,
},
}

// Choose physical join type by join conditions
fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
fn physical_join(join: &Join, s_expr: &SExpr, settings: &Settings) -> Result<PhysicalJoinType> {
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
return Err(ErrorCode::SemanticError(
"ANY JOIN only supports equality-based hash joins",
));
}

let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child());
let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child());
let right_stat_info = right_rel_expr.derive_cardinality()?;
let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()?;
if matches!(join.join_type, JoinType::Inner | JoinType::Cross)
&& (right_stat_info
.statistics
.precise_cardinality
.map(|n| n < nested_loop_join_threshold)
.unwrap_or(false)
|| right_stat_info.cardinality < nested_loop_join_threshold as _)
{
let conditions = join
.non_equi_conditions
.iter()
.cloned()
.chain(join.equi_conditions.iter().cloned().map(|condition| {
FunctionCall {
span: condition.left.span(),
func_name: "eq".to_string(),
params: vec![],
arguments: vec![condition.left, condition.right],
}
.into()
}))
.collect();
return Ok(PhysicalJoinType::LoopJoin { conditions });
};

if !join.equi_conditions.is_empty() {
// Contain equi condition, use hash join
return Ok(PhysicalJoinType::Hash);
Expand All @@ -51,32 +87,29 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
return Ok(PhysicalJoinType::Hash);
}

let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?);
let right_stat_info = right_rel_expr.derive_cardinality()?;
if matches!(right_stat_info.statistics.precise_cardinality, Some(1))
|| right_stat_info.cardinality == 1.0
{
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN.
return Ok(PhysicalJoinType::Hash);
}

let left_prop = left_rel_expr.derive_relational_prop()?;
let right_prop = right_rel_expr.derive_relational_prop()?;
let (range_conditions, other_conditions) = join
.non_equi_conditions
.iter()
.cloned()
.partition::<Vec<_>, _>(|condition| {
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
});

if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) {
return Ok(PhysicalJoinType::RangeJoin(
range_conditions,
other_conditions,
));
if matches!(join.join_type, JoinType::Inner | JoinType::Cross) {
let left_prop = left_rel_expr.derive_relational_prop()?;
let right_prop = right_rel_expr.derive_relational_prop()?;
let (range, other) = join
.non_equi_conditions
.iter()
.cloned()
.partition::<Vec<_>, _>(|condition| {
is_range_join_condition(condition, &left_prop, &right_prop).is_some()
});

if !range.is_empty() {
return Ok(PhysicalJoinType::RangeJoin { range, other });
}
}

// Leverage hash join to execute nested loop join
Ok(PhysicalJoinType::Hash)
}
Expand Down Expand Up @@ -157,7 +190,8 @@ impl PhysicalPlanBuilder {
)
.await
} else {
match physical_join(join, s_expr)? {
let settings = self.ctx.get_settings();
match physical_join(join, s_expr, &settings)? {
PhysicalJoinType::Hash => {
self.build_hash_join(
join,
Expand All @@ -170,7 +204,7 @@ impl PhysicalPlanBuilder {
)
.await
}
PhysicalJoinType::RangeJoin(range, other) => {
PhysicalJoinType::RangeJoin { range, other } => {
self.build_range_join(
join.join_type,
s_expr,
Expand All @@ -181,6 +215,16 @@ impl PhysicalPlanBuilder {
)
.await
}
PhysicalJoinType::LoopJoin { conditions } => {
self.build_loop_join(
join.join_type,
s_expr,
left_required,
right_required,
conditions,
)
.await
}
}
}
}
Expand Down
Loading
Loading