diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index df84a23c7ee79..574191b4c73b1 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -16,6 +16,7 @@ use std::error::Error; use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::sync::PoisonError; use geozero::error::GeozeroError; @@ -436,3 +437,9 @@ impl From for ErrorCode { ErrorCode::DictionarySourceError(format!("Dictionary Redis Error, cause: {}", error)) } } + +impl From> for ErrorCode { + fn from(error: PoisonError) -> Self { + ErrorCode::Internal(format!("{error}")) + } +} diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index cbeede2fec333..c8322beebd795 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -191,6 +191,18 @@ impl BlockEntry { BlockEntry::Column(column) => Ok(ColumnView::Column(T::try_downcast_column(column)?)), } } + + pub fn into_nullable(self) -> BlockEntry { + match self { + BlockEntry::Const(scalar, data_type, n) if !data_type.is_nullable_or_null() => { + BlockEntry::Const(scalar, DataType::Nullable(Box::new(data_type)), n) + } + entry @ BlockEntry::Const(_, _, _) + | entry @ BlockEntry::Column(Column::Nullable(_)) + | entry @ BlockEntry::Column(Column::Null { .. }) => entry, + BlockEntry::Column(column) => column.wrap_nullable(None).into(), + } + } } impl From for BlockEntry { diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index 63a3bd00008b6..ca227df6bea42 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -362,6 +362,10 @@ impl DataSchema { } } + pub fn new_ref(fields: Vec) -> Arc { + Self::new(fields).into() + } + pub fn new_from(fields: Vec, metadata: BTreeMap) -> Self { Self { fields, metadata } } @@ -416,8 +420,7 @@ impl DataSchema { let mut valid_fields: Vec = self.fields.iter().map(|f| f.name().clone()).collect(); valid_fields.truncate(16); Err(ErrorCode::BadArguments(format!( - "Unable to get field named \"{}\". Valid fields: {:?} ...", - name, valid_fields + "Unable to get field named {name:?}. Valid fields: {valid_fields:?} ...", ))) } diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index d7684a3d87788..949b678e04414 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -41,6 +41,7 @@ #![allow(clippy::diverging_sub_expression)] #![allow(clippy::arc_with_non_send_sync)] #![feature(debug_closure_helpers)] +#![feature(stmt_expr_attributes)] extern crate core; diff --git a/src/query/service/src/physical_plans/mod.rs b/src/query/service/src/physical_plans/mod.rs index a6008acbace85..9a0d5f7e4a2e4 100644 --- a/src/query/service/src/physical_plans/mod.rs +++ b/src/query/service/src/physical_plans/mod.rs @@ -81,7 +81,7 @@ pub use physical_exchange::Exchange; pub use physical_exchange_sink::ExchangeSink; pub use physical_exchange_source::ExchangeSource; pub use physical_filter::Filter; -pub use physical_hash_join::HashJoin; +pub use physical_hash_join::*; pub use physical_limit::Limit; pub use physical_materialized_cte::*; pub use physical_multi_table_insert::*; diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 553a462ba688e..fdeacfae28117 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -25,6 +25,7 @@ use databend_common_expression::types::DataType; use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; use databend_common_expression::DataField; +use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; @@ -35,7 +36,9 @@ use databend_common_pipeline::core::Pipe; use databend_common_pipeline::core::PipeItem; use databend_common_pipeline::core::ProcessorPtr; use databend_common_sql::optimizer::ir::SExpr; +use databend_common_sql::plans::FunctionCall; use databend_common_sql::plans::Join; +use databend_common_sql::plans::JoinEquiCondition; use databend_common_sql::plans::JoinType; use databend_common_sql::ColumnEntry; use databend_common_sql::ColumnSet; @@ -52,6 +55,7 @@ use crate::physical_plans::format::PhysicalFormat; use crate::physical_plans::physical_plan::IPhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlanMeta; +use crate::physical_plans::resolve_scalar; use crate::physical_plans::runtime_filter::build_runtime_filter; use crate::physical_plans::Exchange; use crate::physical_plans::PhysicalPlanBuilder; @@ -99,6 +103,12 @@ type MergedFieldsResult = ( Vec<(usize, (bool, bool))>, ); +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct NestedLoopFilterInfo { + pub predicates: Vec, + pub projection: Vec, +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct HashJoin { pub meta: PhysicalPlanMeta, @@ -140,6 +150,7 @@ pub struct HashJoin { pub runtime_filter: PhysicalRuntimeFilters, pub broadcast_id: Option, + pub nested_loop_filter: Option, } #[typetag::serde] @@ -261,6 +272,7 @@ impl IPhysicalPlan for HashJoin { build_side_cache_info: self.build_side_cache_info.clone(), runtime_filter: self.runtime_filter.clone(), broadcast_id: self.broadcast_id, + nested_loop_filter: self.nested_loop_filter.clone(), }) } @@ -527,13 +539,15 @@ impl PhysicalPlanBuilder { required: &mut ColumnSet, others_required: &mut ColumnSet, ) -> (Vec, Vec) { - let retained_columns = self.metadata.read().get_retained_column().clone(); - *required = required.union(&retained_columns).cloned().collect(); - let column_projections = required.clone().into_iter().collect::>(); - - *others_required = others_required.union(&retained_columns).cloned().collect(); - let pre_column_projections = others_required.clone().into_iter().collect::>(); + { + let metadata = self.metadata.read(); + let retained_columns = metadata.get_retained_column(); + required.extend(retained_columns); + others_required.extend(retained_columns); + } + let column_projections = required.iter().copied().collect(); + let pre_column_projections = others_required.iter().copied().collect(); (column_projections, pre_column_projections) } @@ -1029,7 +1043,7 @@ impl PhysicalPlanBuilder { } // Add tail fields - build_fields.extend(tail_fields.clone()); + build_fields.extend_from_slice(&tail_fields); merged_fields.extend(tail_fields); Ok((merged_fields, probe_fields, build_fields, probe_to_build)) @@ -1137,7 +1151,7 @@ impl PhysicalPlanBuilder { // Create projections and output schema let mut projections = ColumnSet::new(); - let projected_schema = DataSchemaRefExt::create(merged_fields.clone()); + let projected_schema = DataSchema::new(merged_fields.clone()); for column in column_projections.iter() { if let Some((index, _)) = projected_schema.column_with_name(&column.to_string()) { @@ -1182,79 +1196,75 @@ impl PhysicalPlanBuilder { .collect::>() } - /// Creates a HashJoin physical plan - /// - /// # Arguments - /// * `join` - Join operation - /// * `probe_side` - Probe side physical plan - /// * `build_side` - Build side physical plan - /// * `is_broadcast` - Whether this is a broadcast join - /// * `projections` - Column projections - /// * `probe_projections` - Probe side projections - /// * `build_projections` - Build side projections - /// * `left_join_conditions` - Left join conditions - /// * `right_join_conditions` - Right join conditions - /// * `is_null_equal` - Null equality flags - /// * `non_equi_conditions` - Non-equi conditions - /// * `probe_to_build` - Probe to build mapping - /// * `output_schema` - Output schema - /// * `build_side_cache_info` - Build side cache info - /// * `runtime_filter` - Runtime filter - /// * `stat_info` - Statistics info - /// - /// # Returns - /// * `Result` - The HashJoin physical plan - #[allow(clippy::too_many_arguments)] - fn create_hash_join( + fn build_nested_loop_filter_info( &self, - s_expr: &SExpr, join: &Join, - probe_side: PhysicalPlan, - build_side: PhysicalPlan, - projections: ColumnSet, - probe_projections: ColumnSet, - build_projections: ColumnSet, - left_join_conditions: Vec, - right_join_conditions: Vec, - is_null_equal: Vec, - non_equi_conditions: Vec, - probe_to_build: Vec<(usize, (bool, bool))>, - output_schema: DataSchemaRef, - build_side_cache_info: Option<(usize, HashMap)>, - runtime_filter: PhysicalRuntimeFilters, - stat_info: PlanStatsInfo, - ) -> Result { - let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?; - let broadcast_id = if build_side_data_distribution - .as_ref() - .is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_))) - { - Some(self.ctx.get_next_broadcast_id()) - } else { - None - }; - Ok(PhysicalPlan::new(HashJoin { - projections, - build_projections, - probe_projections, - build: build_side, - probe: probe_side, - join_type: join.join_type, - build_keys: right_join_conditions, - probe_keys: left_join_conditions, - is_null_equal, - non_equi_conditions, - marker_index: join.marker_index, - meta: PhysicalPlanMeta::new("HashJoin"), - from_correlated_subquery: join.from_correlated_subquery, - probe_to_build, - output_schema, - need_hold_hash_table: join.need_hold_hash_table, - stat_info: Some(stat_info), - single_to_inner: join.single_to_inner, - build_side_cache_info, - runtime_filter, - broadcast_id, + probe_schema: &DataSchema, + build_schema: &DataSchema, + target_schema: &DataSchema, + ) -> Result> { + if !matches!(join.join_type, JoinType::Inner) { + return Ok(None); + } + + let merged = DataSchema::new( + probe_schema + .fields + .iter() + .cloned() + .chain(build_schema.fields.iter().cloned()) + .collect(), + ); + + let mut predicates = + Vec::with_capacity(join.equi_conditions.len() + join.non_equi_conditions.len()); + + for condition in &join.equi_conditions { + let scalar = condition_to_expr(condition)?; + match resolve_scalar(&scalar, &merged) { + Ok(expr) => predicates.push(expr), + Err(_) + if condition + .left + .data_type() + .map(|data_type| data_type.remove_nullable().is_bitmap()) + .unwrap_or_default() => + { + // no function matches signature `eq(Bitmap NULL, Bitmap NULL) + return Ok(None); + } + Err(err) => { + return Err(err.add_message(format!( + "Failed build nested loop filter schema: {merged:#?} equi_conditions: {:#?}", + join.equi_conditions + ))) + } + } + } + + for scalar in &join.non_equi_conditions { + predicates.push(resolve_scalar(scalar, &merged).map_err(|err|{ + err.add_message(format!( + "Failed build nested loop filter schema: {merged:#?} non_equi_conditions: {:#?}", + join.non_equi_conditions + )) + })?); + } + + let projection = target_schema + .fields + .iter() + .map(|column| merged.index_of(column.name())) + .collect::>>() + .map_err(|err| { + err.add_message(format!( + "Failed build nested loop filter schema: {merged:#?} target: {target_schema:#?}", + )) + })?; + + Ok(Some(NestedLoopFilterInfo { + predicates, + projection, })) } @@ -1342,24 +1352,67 @@ impl PhysicalPlanBuilder { ) .await?; + let nested_loop_filter = + self.build_nested_loop_filter_info(join, &probe_schema, &build_schema, &merged_schema)?; + // Step 12: Create and return the HashJoin - self.create_hash_join( - s_expr, - join, - probe_side, - build_side, + let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?; + let broadcast_id = if build_side_data_distribution + .as_ref() + .is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_))) + { + Some(self.ctx.get_next_broadcast_id()) + } else { + None + }; + Ok(PhysicalPlan::new(HashJoin { projections, - probe_projections, build_projections, - left_join_conditions, - right_join_conditions, + probe_projections, + build: build_side, + probe: probe_side, + join_type: join.join_type, + build_keys: right_join_conditions, + probe_keys: left_join_conditions, is_null_equal, non_equi_conditions, + marker_index: join.marker_index, + meta: PhysicalPlanMeta::new("HashJoin"), + from_correlated_subquery: join.from_correlated_subquery, probe_to_build, output_schema, + need_hold_hash_table: join.need_hold_hash_table, + stat_info: Some(stat_info), + single_to_inner: join.single_to_inner, build_side_cache_info, runtime_filter, - stat_info, - ) + broadcast_id, + nested_loop_filter, + })) + } +} + +fn condition_to_expr(condition: &JoinEquiCondition) -> Result { + let left_type = condition.left.data_type()?; + let right_type = condition.right.data_type()?; + + let arguments = match (&left_type, &right_type) { + (DataType::Nullable(box left), right) if left == right => vec![ + condition.left.clone(), + condition.right.clone().unify_to_data_type(&left_type), + ], + (left, DataType::Nullable(box right)) if left == right => vec![ + condition.left.clone().unify_to_data_type(&right_type), + condition.right.clone(), + ], + _ => vec![condition.left.clone(), condition.right.clone()], + }; + + Ok(FunctionCall { + span: condition.left.span(), + func_name: "eq".to_string(), + params: vec![], + arguments, } + .into()) } diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index bc48246312e38..b8910faf02664 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -30,7 +30,10 @@ use crate::physical_plans::PhysicalPlanBuilder; enum PhysicalJoinType { Hash, // The first arg is range conditions, the second arg is other conditions - RangeJoin(Vec, Vec), + RangeJoin { + range: Vec, + other: Vec, + }, } // Choose physical join type by join conditions @@ -41,6 +44,10 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { )); } + let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child()); + let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child()); + let right_stat_info = right_rel_expr.derive_cardinality()?; + if !join.equi_conditions.is_empty() { // Contain equi condition, use hash join return Ok(PhysicalJoinType::Hash); @@ -51,9 +58,6 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { return Ok(PhysicalJoinType::Hash); } - let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?); - let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?); - let right_stat_info = right_rel_expr.derive_cardinality()?; if matches!(right_stat_info.statistics.precise_cardinality, Some(1)) || right_stat_info.cardinality == 1.0 { @@ -61,22 +65,22 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { return Ok(PhysicalJoinType::Hash); } - let left_prop = left_rel_expr.derive_relational_prop()?; - let right_prop = right_rel_expr.derive_relational_prop()?; - let (range_conditions, other_conditions) = join - .non_equi_conditions - .iter() - .cloned() - .partition::, _>(|condition| { - is_range_join_condition(condition, &left_prop, &right_prop).is_some() - }); - - if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) { - return Ok(PhysicalJoinType::RangeJoin( - range_conditions, - other_conditions, - )); + if matches!(join.join_type, JoinType::Inner | JoinType::Cross) { + let left_prop = left_rel_expr.derive_relational_prop()?; + let right_prop = right_rel_expr.derive_relational_prop()?; + let (range, other) = join + .non_equi_conditions + .iter() + .cloned() + .partition::, _>(|condition| { + is_range_join_condition(condition, &left_prop, &right_prop).is_some() + }); + + if !range.is_empty() { + return Ok(PhysicalJoinType::RangeJoin { range, other }); + } } + // Leverage hash join to execute nested loop join Ok(PhysicalJoinType::Hash) } @@ -170,7 +174,7 @@ impl PhysicalPlanBuilder { ) .await } - PhysicalJoinType::RangeJoin(range, other) => { + PhysicalJoinType::RangeJoin { range, other } => { self.build_range_join( join.join_type, s_expr, diff --git a/src/query/service/src/physical_plans/physical_range_join.rs b/src/query/service/src/physical_plans/physical_range_join.rs index 9c9723edf6e73..1be812e3f26d0 100644 --- a/src/query/service/src/physical_plans/physical_range_join.rs +++ b/src/query/service/src/physical_plans/physical_range_join.rs @@ -20,7 +20,6 @@ use databend_common_exception::Result; use databend_common_expression::type_check::common_super_type; use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; -use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline::core::ProcessorPtr; @@ -232,10 +231,7 @@ impl PhysicalPlanBuilder { let left_schema = self.prepare_probe_schema(join_type, &left_side)?; let right_schema = self.prepare_build_schema(join_type, &right_side)?; - let mut output_schema = Vec::clone(left_schema.fields()); - output_schema.extend_from_slice(right_schema.fields()); - - let merged_schema = DataSchemaRefExt::create( + let output_schema = DataSchema::new_ref( left_schema .fields() .iter() @@ -262,11 +258,11 @@ impl PhysicalPlanBuilder { .collect::>()?, other_conditions: other_conditions .iter() - .map(|scalar| resolve_scalar(scalar, &merged_schema)) + .map(|scalar| resolve_scalar(scalar, &output_schema)) .collect::>()?, join_type, range_join_type, - output_schema: Arc::new(DataSchema::new(output_schema)), + output_schema, stat_info: Some(self.build_plan_stat_info(s_expr)?), })) } @@ -343,9 +339,9 @@ fn resolve_range_condition( } } -fn resolve_scalar(scalar: &ScalarExpr, schema: &DataSchemaRef) -> Result { +pub fn resolve_scalar(scalar: &ScalarExpr, schema: &DataSchema) -> Result { let expr = scalar - .type_check(schema.as_ref())? + .type_check(schema)? .project_column_ref(|index| schema.index_of(&index.to_string()))?; Ok(expr.as_remote_expr()) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index ae2a25d06733b..a0b09a97e49bf 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -22,14 +22,18 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::Evaluator; use databend_common_expression::Expr; +use databend_common_expression::FieldIndex; +use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_settings::Settings; use databend_common_sql::executor::cast_expr_to_non_null_boolean; use databend_common_sql::ColumnSet; use parking_lot::RwLock; use crate::physical_plans::HashJoin; +use crate::physical_plans::NestedLoopFilterInfo; use crate::physical_plans::PhysicalRuntimeFilter; use crate::physical_plans::PhysicalRuntimeFilters; use crate::pipelines::processors::transforms::wrap_true_validity; @@ -60,9 +64,10 @@ pub struct HashJoinDesc { pub(crate) runtime_filter: RuntimeFiltersDesc, pub(crate) build_projection: ColumnSet, - pub(crate) probe_projections: ColumnSet, + pub(crate) probe_projection: ColumnSet, pub(crate) probe_to_build: Vec<(usize, (bool, bool))>, pub(crate) build_schema: DataSchemaRef, + pub(crate) nested_loop_filter: Option, } #[derive(Debug, Clone)] @@ -76,7 +81,7 @@ pub struct RuntimeFilterDesc { pub enable_min_max_runtime_filter: bool, } -pub struct RuntimeFiltersDesc { +pub(crate) struct RuntimeFiltersDesc { pub filters: Vec, } @@ -136,8 +141,9 @@ impl HashJoinDesc { runtime_filter: (&join.runtime_filter).into(), probe_to_build: join.probe_to_build.clone(), build_projection: join.build_projections.clone(), - probe_projections: join.probe_projections.clone(), + probe_projection: join.probe_projections.clone(), build_schema: join.build.output_schema()?, + nested_loop_filter: join.nested_loop_filter.clone(), }) } @@ -259,4 +265,63 @@ impl HashJoinDesc { } } } + + pub fn create_nested_loop_desc( + &self, + settings: &Settings, + function_ctx: &FunctionContext, + ) -> Result> { + let nested_loop_join_threshold = settings.get_nested_loop_join_threshold()? as usize; + let block_size = settings.get_max_block_size()? as usize; + if nested_loop_join_threshold == 0 { + return Ok(None); + } + + let Some(NestedLoopFilterInfo { + predicates, + projection, + }) = &self.nested_loop_filter + else { + return Ok(None); + }; + + let predicates = predicates + .iter() + .map(|x| Ok(x.as_expr(&BUILTIN_FUNCTIONS))) + .reduce(|lhs, rhs| { + check_function(None, "and_filters", &[], &[lhs?, rhs?], &BUILTIN_FUNCTIONS) + }) + .transpose()?; + let Some(predicates) = predicates else { + return Ok(None); + }; + + let field_reorder = if !projection.is_sorted() { + let mut mapper = projection.iter().cloned().enumerate().collect::>(); + mapper.sort_by_key(|(_, field)| *field); + let reorder = mapper.iter().map(|(i, _)| *i).collect::>(); + Some(reorder) + } else { + None + }; + + Ok(Some(NestedLoopDesc { + filter: FilterExecutor::new( + predicates, + function_ctx.clone(), + block_size, + Some(projection.iter().copied().collect()), + &BUILTIN_FUNCTIONS, + false, + ), + field_reorder, + nested_loop_join_threshold, + })) + } +} + +pub struct NestedLoopDesc { + pub filter: FilterExecutor, + pub field_reorder: Option>, + pub nested_loop_join_threshold: usize, } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 380977cee104e..78380489e3fdf 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -744,6 +744,7 @@ impl HashJoinBuildState { "Aborted query, because the hash table is uninitialized.", )); } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::SkipDuplicatesSerializer(table) => insert_binary_key! { &mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, }, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 1632c7dab8371..0dd15dd68df6f 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -165,6 +165,7 @@ impl HashJoinProbeState { // Continue to probe hash table and process data blocks. self.result_blocks(probe_state, keys, &table.hash_table) } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", )), @@ -376,6 +377,7 @@ impl HashJoinProbeState { // Continue to probe hash table and process data blocks. self.result_blocks(probe_state, keys, &table.hash_table) } + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", )), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index d696ca2304e7b..9107d9ea12965 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -78,6 +78,7 @@ pub struct FixedKeyHashJoinHashTable< pub enum HashJoinHashTable { Null, + NestedLoop(Vec), Serializer(SerializerHashJoinHashTable), SkipDuplicatesSerializer(SkipDuplicatesSerializerHashJoinHashTable), SingleBinary(SingleBinaryHashJoinHashTable), @@ -348,9 +349,9 @@ impl HashJoinState { } impl HashJoinHashTable { - pub fn len(&self) -> usize { - match self { - HashJoinHashTable::Null => 0, + pub fn size(&self) -> Option { + let n = match self { + HashJoinHashTable::Null | HashJoinHashTable::NestedLoop(_) => return None, HashJoinHashTable::Serializer(table) => table.hash_table.len(), HashJoinHashTable::SingleBinary(table) => table.hash_table.len(), HashJoinHashTable::KeysU8(table) => table.hash_table.len(), @@ -367,11 +368,7 @@ impl HashJoinHashTable { HashJoinHashTable::SkipDuplicatesKeysU64(table) => table.hash_table.len(), HashJoinHashTable::SkipDuplicatesKeysU128(table) => table.hash_table.len(), HashJoinHashTable::SkipDuplicatesKeysU256(table) => table.hash_table.len(), - } - } - - #[allow(dead_code)] - pub fn is_empty(&self) -> bool { - self.len() == 0 + }; + Some(n) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index 32b46295ccb1f..2f5caba0e8474 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -32,6 +32,7 @@ mod util; pub use common::wrap_true_validity; pub use desc::HashJoinDesc; +pub use desc::NestedLoopDesc; pub use desc::RuntimeFilterDesc; pub use hash_join_build_state::HashJoinBuildState; pub use hash_join_probe_state::HashJoinProbeState; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index 114f019669287..f7950e3948366 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -54,7 +54,9 @@ impl HashJoinProbeState { JoinType::InnerAny | JoinType::RightAny ) { let hash_table = unsafe { &*self.hash_join_state.hash_table.get() }; - probe_state.used_once = Some(MutableBitmap::from_len_zeroed(hash_table.len())) + probe_state.used_once = Some(MutableBitmap::from_len_zeroed( + hash_table.size().unwrap_or_default(), + )) } let no_other_predicate = self .hash_join_state diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs index bc7aad3e4a36f..c643526d15156 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs @@ -26,9 +26,10 @@ use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; use databend_common_sql::plans::JoinType; -use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; -use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; -use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; +use super::common::CStyleCell; +use super::grace::GraceHashJoinState; +use super::memory::outer_left_join::OuterLeftHashJoin; +use super::memory::NestedLoopJoin; use crate::pipelines::processors::transforms::BasicHashJoinState; use crate::pipelines::processors::transforms::GraceHashJoin; use crate::pipelines::processors::transforms::InnerHashJoin; @@ -126,13 +127,29 @@ impl HashJoinFactory { } match typ { - JoinType::Inner => Ok(Box::new(InnerHashJoin::create( - &self.ctx, - self.function_ctx.clone(), - self.hash_method.clone(), - self.desc.clone(), - self.create_basic_state(id)?, - )?)), + JoinType::Inner => { + let state = self.create_basic_state(id)?; + let nested_loop_desc = self + .desc + .create_nested_loop_desc(&settings, &self.function_ctx)?; + + let inner = InnerHashJoin::create( + &settings, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + state.clone(), + nested_loop_desc + .as_ref() + .map(|desc| desc.nested_loop_join_threshold) + .unwrap_or_default(), + )?; + + match nested_loop_desc { + Some(desc) => Ok(Box::new(NestedLoopJoin::create(inner, state, desc))), + None => Ok(Box::new(inner)), + } + } JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create( &self.ctx, self.function_ctx.clone(), @@ -148,11 +165,12 @@ impl HashJoinFactory { match typ { JoinType::Inner => { let inner_hash_join = InnerHashJoin::create( - &self.ctx, + &self.ctx.get_settings(), self.function_ctx.clone(), self.hash_method.clone(), self.desc.clone(), self.create_basic_state(id)?, + 0, )?; Ok(Box::new(GraceHashJoin::create( diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs index d1a44c0992b28..a4006838900f7 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs @@ -24,16 +24,22 @@ pub trait JoinStream: Send + Sync { } pub trait Join: Send + Sync + 'static { + /// Push one block into the build side. `None` signals the end of input. fn add_block(&mut self, data: Option) -> Result<()>; + /// Finalize build phase in chunks; each call processes the next pending build batch and + /// returns its progress. Once all batches are consumed it returns `None` to signal completion. fn final_build(&mut self) -> Result>; + /// Generate runtime filter packet for the given filter description. fn build_runtime_filter(&self, _: &RuntimeFiltersDesc) -> Result { Ok(JoinRuntimeFilterPacket::default()) } + /// Probe with a single block and return a streaming iterator over results. fn probe_block(&mut self, data: DataBlock) -> Result>; + /// Final steps after probing all blocks; used when more output is pending. fn final_probe(&mut self) -> Result>> { Ok(None) } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs index 8168fbc081173..3c15a7ff8d3c7 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::sync::PoisonError; use databend_common_base::base::ProgressValues; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::Column; use databend_common_expression::DataBlock; @@ -27,6 +26,7 @@ use databend_common_expression::HashMethodSerializer; use databend_common_expression::HashMethodSingleBinary; use databend_common_hashtable::BinaryHashJoinHashMap; use databend_common_hashtable::HashJoinHashMap; +use databend_common_settings::Settings; use databend_common_sql::plans::JoinType; use ethnum::U256; @@ -40,7 +40,6 @@ use crate::pipelines::processors::transforms::SkipDuplicatesFixedKeyHashJoinHash use crate::pipelines::processors::transforms::SkipDuplicatesSerializerHashJoinHashTable; use crate::pipelines::processors::transforms::SkipDuplicatesSingleBinaryHashJoinHashTable; use crate::pipelines::processors::HashJoinDesc; -use crate::sessions::QueryContext; pub struct BasicHashJoin { pub(crate) desc: Arc, @@ -49,28 +48,33 @@ pub struct BasicHashJoin { pub(crate) method: HashMethodKind, pub(crate) function_ctx: FunctionContext, pub(crate) state: Arc, + nested_loop_join_threshold: usize, } impl BasicHashJoin { pub fn create( - ctx: &QueryContext, + settings: &Settings, function_ctx: FunctionContext, method: HashMethodKind, desc: Arc, state: Arc, + nested_loop_join_threshold: usize, ) -> Result { - let settings = ctx.get_settings(); - let block_size = settings.get_max_block_size()? as usize; - let block_bytes = settings.get_max_block_size()? as usize; + let squash_block = SquashBlocks::new( + settings.get_max_block_size()? as _, + settings.get_max_block_bytes()? as _, + ); Ok(BasicHashJoin { desc, state, method, function_ctx, - squash_block: SquashBlocks::new(block_size, block_bytes), + squash_block, + nested_loop_join_threshold, }) } + pub(crate) fn add_block(&mut self, mut data: Option) -> Result<()> { let mut squashed_block = match data.take() { None => self.squash_block.finalize()?, @@ -78,22 +82,18 @@ impl BasicHashJoin { }; if let Some(squashed_block) = squashed_block.take() { - let locked = self.state.mutex.lock(); - let _locked = locked.unwrap_or_else(PoisonError::into_inner); - - *self.state.build_rows.as_mut() += squashed_block.num_rows(); - let chunk_index = self.state.chunks.len(); - self.state.chunks.as_mut().push(squashed_block); - self.state.build_queue.as_mut().push_back(chunk_index); + self.state.push_chunk(squashed_block); } Ok(()) } pub(crate) fn final_build(&mut self) -> Result> { - self.init_memory_hash_table(); + if let Some(true) = self.init_memory_hash_table() { + return Ok(Some(self.build_nested_loop())); + }; - let Some(chunk_index) = self.steal_chunk_index() else { + let Some(chunk_index) = self.state.steal_chunk_index() else { return Ok(None); }; @@ -135,20 +135,19 @@ impl BasicHashJoin { bytes: num_bytes, })) } -} - -impl BasicHashJoin { - fn steal_chunk_index(&self) -> Option { - let locked = self.state.mutex.lock(); - let _locked = locked.unwrap_or_else(PoisonError::into_inner); - self.state.build_queue.as_mut().pop_front() - } pub(crate) fn finalize_chunks(&mut self) { if self.desc.build_projection.is_empty() || !self.state.columns.is_empty() { return; } + if matches!( + self.state.hash_table.deref(), + HashJoinHashTable::NestedLoop(_) + ) { + return; + } + let locked = self.state.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); @@ -175,135 +174,139 @@ impl BasicHashJoin { columns.push(Column::take_downcast_column_vec(&full_columns)); } - std::mem::swap(&mut columns, self.state.columns.as_mut()); + *self.state.columns.as_mut() = columns; } - fn init_memory_hash_table(&mut self) { + fn init_memory_hash_table(&mut self) -> Option { if !matches!(self.state.hash_table.deref(), HashJoinHashTable::Null) { - return; + return None; } let skip_duplicates = matches!(self.desc.join_type, JoinType::InnerAny | JoinType::LeftAny); let locked = self.state.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); - if matches!(self.state.hash_table.deref(), HashJoinHashTable::Null) { - let build_num_rows = *self.state.build_rows.deref(); - *self.state.hash_table.as_mut() = match (self.method.clone(), skip_duplicates) { - (HashMethodKind::Serializer(_), false) => { - HashJoinHashTable::Serializer(SerializerHashJoinHashTable::new( - BinaryHashJoinHashMap::with_build_row_num(build_num_rows), - HashMethodSerializer::default(), - )) - } - (HashMethodKind::Serializer(_), true) => { - HashJoinHashTable::SkipDuplicatesSerializer( - SkipDuplicatesSerializerHashJoinHashTable::new( - BinaryHashJoinHashMap::with_build_row_num(build_num_rows), - HashMethodSerializer::default(), - ), - ) - } - (HashMethodKind::SingleBinary(_), false) => { - HashJoinHashTable::SingleBinary(SingleBinaryHashJoinHashTable::new( + if !matches!(self.state.hash_table.deref(), HashJoinHashTable::Null) { + return None; + } + + let build_num_rows = *self.state.build_rows.deref(); + if build_num_rows < self.nested_loop_join_threshold { + *self.state.hash_table.as_mut() = HashJoinHashTable::NestedLoop(vec![]); + return Some(true); + } + + *self.state.hash_table.as_mut() = match (self.method.clone(), skip_duplicates) { + (HashMethodKind::Serializer(_), false) => { + HashJoinHashTable::Serializer(SerializerHashJoinHashTable::new( + BinaryHashJoinHashMap::with_build_row_num(build_num_rows), + HashMethodSerializer::default(), + )) + } + (HashMethodKind::Serializer(_), true) => HashJoinHashTable::SkipDuplicatesSerializer( + SkipDuplicatesSerializerHashJoinHashTable::new( + BinaryHashJoinHashMap::with_build_row_num(build_num_rows), + HashMethodSerializer::default(), + ), + ), + (HashMethodKind::SingleBinary(_), false) => { + HashJoinHashTable::SingleBinary(SingleBinaryHashJoinHashTable::new( + BinaryHashJoinHashMap::with_build_row_num(build_num_rows), + HashMethodSingleBinary::default(), + )) + } + (HashMethodKind::SingleBinary(_), true) => { + HashJoinHashTable::SkipDuplicatesSingleBinary( + SkipDuplicatesSingleBinaryHashJoinHashTable::new( BinaryHashJoinHashMap::with_build_row_num(build_num_rows), HashMethodSingleBinary::default(), - )) - } - (HashMethodKind::SingleBinary(_), true) => { - HashJoinHashTable::SkipDuplicatesSingleBinary( - SkipDuplicatesSingleBinaryHashJoinHashTable::new( - BinaryHashJoinHashMap::with_build_row_num(build_num_rows), - HashMethodSingleBinary::default(), - ), - ) - } - (HashMethodKind::KeysU8(hash_method), false) => { - HashJoinHashTable::KeysU8(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - )) - } - (HashMethodKind::KeysU8(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU8( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU16(hash_method), false) => { - HashJoinHashTable::KeysU16(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU8(hash_method), false) => { + HashJoinHashTable::KeysU8(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU8(hash_method), true) => HashJoinHashTable::SkipDuplicatesKeysU8( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + ), + ), + (HashMethodKind::KeysU16(hash_method), false) => { + HashJoinHashTable::KeysU16(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU16(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU16( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU16(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU16( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU32(hash_method), false) => { - HashJoinHashTable::KeysU32(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU32(hash_method), false) => { + HashJoinHashTable::KeysU32(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU32(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU32( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU32(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU32( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU64(hash_method), false) => { - HashJoinHashTable::KeysU64(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU64(hash_method), false) => { + HashJoinHashTable::KeysU64(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU64(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU64( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU64(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU64( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU128(hash_method), false) => { - HashJoinHashTable::KeysU128(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU128(hash_method), false) => { + HashJoinHashTable::KeysU128(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU128(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU128( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU128(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU128( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - (HashMethodKind::KeysU256(hash_method), false) => { - HashJoinHashTable::KeysU256(FixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), + ), + ) + } + (HashMethodKind::KeysU256(hash_method), false) => { + HashJoinHashTable::KeysU256(FixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), + hash_method, + )) + } + (HashMethodKind::KeysU256(hash_method), true) => { + HashJoinHashTable::SkipDuplicatesKeysU256( + SkipDuplicatesFixedKeyHashJoinHashTable::new( + HashJoinHashMap::::with_build_row_num(build_num_rows), hash_method, - )) - } - (HashMethodKind::KeysU256(hash_method), true) => { - HashJoinHashTable::SkipDuplicatesKeysU256( - SkipDuplicatesFixedKeyHashJoinHashTable::new( - HashJoinHashMap::::with_build_row_num(build_num_rows), - hash_method, - ), - ) - } - }; - } + ), + ) + } + }; + Some(false) } fn build_hash_table(&self, keys: DataBlock, chunk_idx: usize) -> Result<()> { @@ -311,6 +314,7 @@ impl BasicHashJoin { match self.state.hash_table.deref() { HashJoinHashTable::Null => (), + HashJoinHashTable::NestedLoop(_) => unreachable!(), HashJoinHashTable::Serializer(v) => v.insert(keys, chunk_idx, &mut arena)?, HashJoinHashTable::SingleBinary(v) => v.insert(keys, chunk_idx, &mut arena)?, HashJoinHashTable::KeysU8(v) => v.insert(keys, chunk_idx, &mut arena)?, @@ -345,4 +349,21 @@ impl BasicHashJoin { Ok(()) } + + fn build_nested_loop(&self) -> ProgressValues { + let mut progress = ProgressValues::default(); + let mut plain = vec![]; + while let Some(chunk_index) = self.state.steal_chunk_index() { + let chunk_block = &self.state.chunks[chunk_index]; + progress.rows += chunk_block.num_rows(); + progress.bytes += chunk_block.memory_size(); + plain.push(chunk_block.clone()); + } + debug_assert!(matches!( + *self.state.hash_table, + HashJoinHashTable::NestedLoop(_) + )); + *self.state.hash_table.as_mut() = HashJoinHashTable::NestedLoop(plain); + progress + } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs index 897ed42c27eee..40fc675bc7834 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::Mutex; +use std::sync::PoisonError; use databend_common_expression::types::DataType; use databend_common_expression::ColumnVec; @@ -54,6 +55,22 @@ impl BasicHashJoinState { hash_table: CStyleCell::new(HashJoinHashTable::Null), } } + + pub(super) fn push_chunk(&self, chunk: DataBlock) { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + *self.build_rows.as_mut() += chunk.num_rows(); + let chunk_index = self.chunks.len(); + self.chunks.as_mut().push(chunk); + self.build_queue.as_mut().push_back(chunk_index); + } + + pub(super) fn steal_chunk_index(&self) -> Option { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.build_queue.as_mut().pop_front() + } } impl Drop for BasicHashJoinState { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs index 3b1dab84b39a7..a5bebc663062f 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs @@ -16,7 +16,6 @@ use std::ops::Deref; use std::sync::Arc; use databend_common_base::base::ProgressValues; -use databend_common_catalog::table_context::TableContext; use databend_common_column::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -27,7 +26,10 @@ use databend_common_expression::DataBlock; use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; +use databend_common_settings::Settings; +use super::basic::BasicHashJoin; +use super::basic_state::BasicHashJoinState; use crate::pipelines::processors::transforms::build_runtime_filter_packet; use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbedRows; @@ -35,14 +37,11 @@ use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeDat use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; use crate::pipelines::processors::transforms::new_hash_join::join::Join; use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; -use crate::pipelines::processors::transforms::new_hash_join::memory::basic::BasicHashJoin; -use crate::pipelines::processors::transforms::new_hash_join::memory::basic_state::BasicHashJoinState; use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; use crate::pipelines::processors::transforms::HashJoinHashTable; use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; use crate::pipelines::processors::transforms::RuntimeFiltersDesc; use crate::pipelines::processors::HashJoinDesc; -use crate::sessions::QueryContext; pub struct InnerHashJoin { pub(crate) basic_hash_join: BasicHashJoin, @@ -55,23 +54,24 @@ pub struct InnerHashJoin { impl InnerHashJoin { pub fn create( - ctx: &QueryContext, + settings: &Settings, function_ctx: FunctionContext, method: HashMethodKind, desc: Arc, state: Arc, + nested_loop_join_threshold: usize, ) -> Result { - let settings = ctx.get_settings(); let block_size = settings.get_max_block_size()? as usize; let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); let basic_hash_join = BasicHashJoin::create( - ctx, + settings, function_ctx.clone(), method, desc.clone(), state.clone(), + nested_loop_join_threshold, )?; Ok(InnerHashJoin { @@ -123,35 +123,38 @@ impl Join for InnerHashJoin { }; self.desc.remove_keys_nullable(&mut keys); - let probe_block = data.project(&self.desc.probe_projections); - - let joined_stream = - with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { - HashJoinHashTable::T(table) => { - let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; - probe_hash_statistics.clear(probe_block.num_rows()); - - let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); - let probe_keys_stream = table.probe_matched(probe_data)?; - - Ok(InnerHashJoinStream::create( - probe_block, - self.basic_state.clone(), - probe_keys_stream, - self.desc.clone(), - &mut self.performance_context.probe_result, - )) - } - HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + let probe_block = data.project(&self.desc.probe_projection); + + let joined_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); + let probe_keys_stream = table.probe_matched(probe_data)?; + + InnerHashJoinStream::create( + probe_block, + self.basic_state.clone(), + probe_keys_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + ) + } + HashJoinHashTable::Null => { + return Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", - )), - })?; + )); + } + HashJoinHashTable::NestedLoop(_) => unreachable!(), + }); match &mut self.performance_context.filter_executor { None => Ok(joined_stream), Some(filter_executor) => Ok(InnerHashJoinFilterStream::create( joined_stream, filter_executor, + None, )), } } @@ -232,31 +235,27 @@ impl<'a> JoinStream for InnerHashJoinStream<'a> { (None, None) => DataBlock::new(vec![], self.probed_rows.matched_build.len()), }; - if !self.desc.probe_to_build.is_empty() { - for (index, (is_probe_nullable, is_build_nullable)) in - self.desc.probe_to_build.iter() - { - let entry = match (is_probe_nullable, is_build_nullable) { - (true, true) | (false, false) => result_block.get_by_offset(*index).clone(), - (true, false) => { - result_block.get_by_offset(*index).clone().remove_nullable() + for (index, (is_probe_nullable, is_build_nullable)) in + self.desc.probe_to_build.iter().cloned() + { + let entry = match (is_probe_nullable, is_build_nullable) { + (true, true) | (false, false) => result_block.get_by_offset(index).clone(), + (true, false) => result_block.get_by_offset(index).clone().remove_nullable(), + (false, true) => { + let entry = result_block.get_by_offset(index); + let col = entry.to_column(); + + match col.is_null() || col.is_nullable() { + true => entry.clone(), + false => BlockEntry::from(NullableColumn::new_column( + col, + Bitmap::new_constant(true, result_block.num_rows()), + )), } - (false, true) => { - let entry = result_block.get_by_offset(*index); - let col = entry.to_column(); - - match col.is_null() || col.is_nullable() { - true => entry.clone(), - false => BlockEntry::from(NullableColumn::new_column( - col, - Bitmap::new_constant(true, result_block.num_rows()), - )), - } - } - }; + } + }; - result_block.add_entry(entry); - } + result_block.add_entry(entry); } return Ok(Some(result_block)); @@ -264,19 +263,22 @@ impl<'a> JoinStream for InnerHashJoinStream<'a> { } } -struct InnerHashJoinFilterStream<'a> { +pub(super) struct InnerHashJoinFilterStream<'a> { inner: Box, filter_executor: &'a mut FilterExecutor, + field_reorder: Option<&'a [usize]>, } impl<'a> InnerHashJoinFilterStream<'a> { pub fn create( inner: Box, filter_executor: &'a mut FilterExecutor, + field_reorder: Option<&'a [usize]>, ) -> Box { Box::new(InnerHashJoinFilterStream { inner, filter_executor, + field_reorder, }) } } @@ -298,6 +300,16 @@ impl<'a> JoinStream for InnerHashJoinFilterStream<'a> { continue; } + let data_block = if let Some(field_reorder) = self.field_reorder { + DataBlock::from_iter( + field_reorder + .iter() + .map(|offset| data_block.get_by_offset(*offset).clone()), + data_block.num_rows(), + ) + } else { + data_block + }; return Ok(Some(data_block)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs index 4979c37245fca..e8eb9fba13b79 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs @@ -15,7 +15,9 @@ mod basic; mod basic_state; mod inner_join; +mod nested_loop; pub mod outer_left_join; pub use basic_state::BasicHashJoinState; pub use inner_join::InnerHashJoin; +pub use nested_loop::*; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs new file mode 100644 index 0000000000000..ce157e2622516 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/nested_loop.rs @@ -0,0 +1,146 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::Scalar; + +use super::inner_join::InnerHashJoinFilterStream; +use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::JoinStream; +use crate::pipelines::processors::transforms::NestedLoopDesc; +use crate::pipelines::processors::transforms::RuntimeFiltersDesc; + +pub struct NestedLoopJoin { + inner: T, + basic_state: Arc, + desc: NestedLoopDesc, +} + +impl NestedLoopJoin { + pub fn create(inner: T, basic_state: Arc, desc: NestedLoopDesc) -> Self { + Self { + inner, + basic_state, + desc, + } + } +} + +impl Join for NestedLoopJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.inner.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.inner.final_build() + } + + fn build_runtime_filter(&self, desc: &RuntimeFiltersDesc) -> Result { + self.inner.build_runtime_filter(desc) + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + if data.is_empty() || *self.basic_state.build_rows == 0 { + return Ok(Box::new(EmptyJoinStream)); + } + let HashJoinHashTable::NestedLoop(build_blocks) = &*self.basic_state.hash_table else { + return self.inner.probe_block(data); + }; + + let nested = Box::new(LoopJoinStream::new(data, build_blocks)); + Ok(InnerHashJoinFilterStream::create( + nested, + &mut self.desc.filter, + self.desc.field_reorder.as_deref(), + )) + } +} + +struct LoopJoinStream<'a> { + probe_rows: VecDeque>, + probe_types: Vec, + build_blocks: &'a [DataBlock], + build_index: usize, +} + +impl<'a> LoopJoinStream<'a> { + pub fn new(probe: DataBlock, build_blocks: &'a [DataBlock]) -> Self { + let mut probe_rows = vec![Vec::new(); probe.num_rows()]; + for entry in probe.columns().iter() { + match entry { + BlockEntry::Const(scalar, _, _) => { + for row in &mut probe_rows { + row.push(scalar.to_owned()); + } + } + BlockEntry::Column(column) => { + for (row, scalar) in probe_rows.iter_mut().zip(column.iter()) { + row.push(scalar.to_owned()); + } + } + } + } + + let left_types = probe + .columns() + .iter() + .map(|entry| entry.data_type()) + .collect(); + + LoopJoinStream { + probe_rows: probe_rows.into(), + probe_types: left_types, + build_blocks, + build_index: 0, + } + } +} + +impl<'a> JoinStream for LoopJoinStream<'a> { + fn next(&mut self) -> Result> { + let Some(probe_entries) = self.probe_rows.front() else { + return Ok(None); + }; + + let build_block = &self.build_blocks[self.build_index]; + + let entries = probe_entries + .iter() + .zip(self.probe_types.iter()) + .map(|(scalar, data_type)| { + BlockEntry::Const(scalar.clone(), data_type.clone(), build_block.num_rows()) + }) + .chain(build_block.columns().iter().cloned()) + .collect(); + + self.build_index += 1; + if self.build_index >= self.build_blocks.len() { + self.build_index = 0; + self.probe_rows.pop_front(); + } + + Ok(Some(DataBlock::new(entries, build_block.num_rows()))) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs index 8210b495bfca3..e548178a06646 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs @@ -68,11 +68,12 @@ impl OuterLeftHashJoin { let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); let basic_hash_join = BasicHashJoin::create( - ctx, + &settings, function_ctx.clone(), method, desc.clone(), state.clone(), + 0, )?; Ok(OuterLeftHashJoin { @@ -111,7 +112,7 @@ impl Join for OuterLeftHashJoin { .collect::>(); let build_block = null_build_block(&types, data.num_rows()); - let probe_block = Some(data.project(&self.desc.probe_projections)); + let probe_block = Some(data.project(&self.desc.probe_projection)); let result_block = final_result_block(&self.desc, probe_block, build_block, num_rows); return Ok(Box::new(OneBlockJoinStream(Some(result_block)))); } @@ -127,7 +128,7 @@ impl Join for OuterLeftHashJoin { }; self.desc.remove_keys_nullable(&mut keys); - let probe_block = data.project(&self.desc.probe_projections); + let probe_block = data.project(&self.desc.probe_projection); let probe_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { HashJoinHashTable::T(table) => { @@ -137,6 +138,9 @@ impl Join for OuterLeftHashJoin { let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); table.probe(probe_data) } + HashJoinHashTable::NestedLoop(_) => { + todo!() + } HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( "Aborted query, because the hash table is uninitialized.", )), diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs index 002e21b578204..30a71a4a0ee72 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs @@ -26,6 +26,7 @@ mod transform_hash_join; pub use grace::GraceHashJoin; pub use hash_join_factory::HashJoinFactory; pub use join::Join; +pub use join::JoinStream; pub use memory::BasicHashJoinState; pub use memory::InnerHashJoin; pub use runtime_filter::RuntimeFiltersDesc; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index dce2ca6ecceb7..cd73b20d14599 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -15,6 +15,7 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; +use std::marker::PhantomPinned; use std::sync::Arc; use databend_common_exception::Result; @@ -42,6 +43,7 @@ pub struct TransformHashJoin { stage_sync_barrier: Arc, projection: ColumnSet, rf_desc: Arc, + _p: PhantomPinned, } impl TransformHashJoin { @@ -67,6 +69,7 @@ impl TransformHashJoin { finished: false, build_data: None, }), + _p: PhantomPinned, })) } } @@ -117,8 +120,7 @@ impl Processor for TransformHashJoin { } } - #[allow(clippy::missing_transmute_annotations)] - fn process(&mut self) -> Result<()> { + fn process<'a>(&'a mut self) -> Result<()> { match &mut self.stage { Stage::Finished => Ok(()), Stage::Build(state) => { @@ -144,7 +146,9 @@ impl Processor for TransformHashJoin { if let Some(probe_data) = state.input_data.take() { let stream = self.join.probe_block(probe_data)?; // This is safe because both join and stream are properties of the struct. - state.stream = Some(unsafe { std::mem::transmute(stream) }); + state.stream = Some(unsafe { + std::mem::transmute::, Box>(stream) + }); } if let Some(mut stream) = state.stream.take() { @@ -161,7 +165,11 @@ impl Processor for TransformHashJoin { if let Some(final_stream) = self.join.final_probe()? { state.initialize = true; // This is safe because both join and stream are properties of the struct. - state.stream = Some(unsafe { std::mem::transmute(final_stream) }); + state.stream = Some(unsafe { + std::mem::transmute::, Box>( + final_stream, + ) + }); } else { state.finished = true; } diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs index 45b5acea21e5e..19b88f044cd7d 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs @@ -17,12 +17,12 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use databend_common_catalog::table_context::TableContext; -use databend_common_column::bitmap::Bitmap; use databend_common_column::bitmap::MutableBitmap; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberScalar; +use databend_common_expression::BlockEntry; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; use databend_common_expression::Evaluator; @@ -39,7 +39,6 @@ use crate::physical_plans::RangeJoinCondition; use crate::physical_plans::RangeJoinType; use crate::pipelines::executor::WatchNotify; use crate::pipelines::processors::transforms::range_join::IEJoinState; -use crate::pipelines::processors::transforms::wrap_true_validity; use crate::sessions::QueryContext; pub struct RangeJoinState { @@ -109,16 +108,17 @@ impl RangeJoinState { pub(crate) fn sink_right(&self, block: DataBlock) -> Result<()> { // Sink block to right table let mut right_table = self.right_table.write(); - let mut right_block = block; - if matches!(self.join_type, JoinType::Left | JoinType::LeftAsof) { - let validity = Bitmap::new_constant(true, right_block.num_rows()); - let nullable_right_columns = right_block - .columns() - .iter() - .map(|c| wrap_true_validity(c, right_block.num_rows(), &validity)) - .collect::>(); - right_block = DataBlock::new(nullable_right_columns, right_block.num_rows()); - } + let right_block = if matches!(self.join_type, JoinType::Left | JoinType::LeftAsof) { + let rows = block.num_rows(); + let nullable_right_columns = block + .take_columns() + .into_iter() + .map(BlockEntry::into_nullable) + .collect(); + DataBlock::new(nullable_right_columns, rows) + } else { + block + }; right_table.push(right_block); Ok(()) } @@ -126,16 +126,17 @@ impl RangeJoinState { pub(crate) fn sink_left(&self, block: DataBlock) -> Result<()> { // Sink block to left table let mut left_table = self.left_table.write(); - let mut left_block = block; - if matches!(self.join_type, JoinType::Right | JoinType::RightAsof) { - let validity = Bitmap::new_constant(true, left_block.num_rows()); - let nullable_left_columns = left_block - .columns() - .iter() - .map(|c| wrap_true_validity(c, left_block.num_rows(), &validity)) - .collect::>(); - left_block = DataBlock::new(nullable_left_columns, left_block.num_rows()); - } + let left_block = if matches!(self.join_type, JoinType::Right | JoinType::RightAsof) { + let rows = block.num_rows(); + let nullable_left_columns = block + .take_columns() + .into_iter() + .map(BlockEntry::into_nullable) + .collect(); + DataBlock::new(nullable_left_columns, rows) + } else { + block + }; left_table.push(left_block); Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs b/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs index bda89435842c7..d62382fcd3e05 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/transform_range_join.rs @@ -24,7 +24,7 @@ use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline::sinks::Sink; -use crate::pipelines::processors::transforms::range_join::RangeJoinState; +use super::RangeJoinState; enum RangeJoinStep { Sink, diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 4a341ba6e2597..9089ced456a96 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -495,6 +495,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("nested_loop_join_threshold", DefaultSettingValue { + value: UserSettingValue::UInt64(1024), + desc: "Set the threshold for use nested loop join. Setting it to 0 disable nested loop join.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_bloom_runtime_filter", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enables bloom runtime filter optimization for JOIN.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6057058270e25..e0cb508822177 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -380,6 +380,10 @@ impl Settings { Ok(self.try_get_u64("inlist_to_join_threshold")? as usize) } + pub fn get_nested_loop_join_threshold(&self) -> Result { + self.try_get_u64("nested_loop_join_threshold") + } + pub fn get_bloom_runtime_filter(&self) -> Result { Ok(self.try_get_u64("enable_bloom_runtime_filter")? != 0) } diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index 7cc07b8b3909d..c7d6147379bc8 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -222,7 +222,11 @@ impl JoinEquiCondition { left.into_iter() .zip(right) .enumerate() - .map(|(index, (l, r))| JoinEquiCondition::new(l, r, is_null_equal.contains(&index))) + .map(|(index, (left, right))| Self { + left, + right, + is_null_equal: is_null_equal.contains(&index), + }) .collect() } } diff --git a/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test b/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test index 70be64638f413..1ef504ff4fe9d 100644 --- a/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test +++ b/tests/sqllogictests/suites/duckdb/join/inner/equality_join_limits.test @@ -1,18 +1,12 @@ -statement ok -drop table if exists t; - -statement ok -drop table if exists u; - # TINYINT limits statement ok -CREATE TABLE t(t_k0 TINYINT); +CREATE OR REPLACE TABLE t(t_k0 TINYINT); statement ok INSERT INTO t VALUES (-128), (127); statement ok -CREATE TABLE u(u_k0 TINYINT); +CREATE OR REPLACE TABLE u(u_k0 TINYINT); statement ok INSERT INTO u VALUES (-128), (127);