From 4852889c07408d65d25805d61c0c5fb36ecdc987 Mon Sep 17 00:00:00 2001 From: Kould Date: Fri, 3 Oct 2025 23:41:14 +0900 Subject: [PATCH 1/2] refactor: optimizing `HashJoin` and `RadixSort` --- README.md | 12 +- src/catalog/column.rs | 3 +- src/execution/dql/join/hash/full_join.rs | 107 +++++++ src/execution/dql/join/hash/inner_join.rs | 45 +++ src/execution/dql/join/hash/left_anti_join.rs | 71 +++++ src/execution/dql/join/hash/left_join.rs | 90 ++++++ src/execution/dql/join/hash/left_semi_join.rs | 92 ++++++ src/execution/dql/join/hash/mod.rs | 105 +++++++ src/execution/dql/join/hash/right_join.rs | 59 ++++ src/execution/dql/join/hash_join.rs | 278 +++++++---------- src/execution/dql/join/mod.rs | 1 + src/execution/dql/join/nested_loop_join.rs | 10 +- src/execution/dql/sort.rs | 294 ++++++++++-------- src/expression/evaluator.rs | 15 +- src/expression/function/scala.rs | 3 +- src/expression/range_detacher.rs | 4 +- src/function/char_length.rs | 3 +- src/function/current_date.rs | 3 +- src/function/current_timestamp.rs | 3 +- src/function/lower.rs | 3 +- src/function/numbers.rs | 2 +- src/function/octet_length.rs | 3 +- src/function/upper.rs | 3 +- src/macros/mod.rs | 4 +- src/optimizer/core/histogram.rs | 14 +- src/types/tuple.rs | 6 + src/types/value.rs | 2 - tests/slt/crdb/join.slt | 3 +- 28 files changed, 899 insertions(+), 339 deletions(-) create mode 100644 src/execution/dql/join/hash/full_join.rs create mode 100644 src/execution/dql/join/hash/inner_join.rs create mode 100644 src/execution/dql/join/hash/left_anti_join.rs create mode 100644 src/execution/dql/join/hash/left_join.rs create mode 100644 src/execution/dql/join/hash/left_semi_join.rs create mode 100644 src/execution/dql/join/hash/mod.rs create mode 100644 src/execution/dql/join/hash/right_join.rs diff --git a/README.md b/README.md index ca772d91..e4e58a2d 100755 --- a/README.md +++ b/README.md @@ -67,13 +67,13 @@ run `cargo run -p tpcc --release` to run tpcc - Tips: TPC-C currently only supports single thread ```shell <90th Percentile RT (MaxRT)> - New-Order : 0.002 (0.025) - Payment : 0.001 (0.013) -Order-Status : 0.054 (0.159) - Delivery : 0.020 (0.034) - Stock-Level : 0.003 (0.004) + New-Order : 0.002 (0.018) + Payment : 0.001 (0.024) +Order-Status : 0.050 (0.067) + Delivery : 0.021 (0.030) + Stock-Level : 0.003 (0.005) -7892 Tpmc +8101 Tpmc ``` #### 👉[check more](tpcc/README.md) diff --git a/src/catalog/column.rs b/src/catalog/column.rs index aa1289cb..08998613 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -1,6 +1,7 @@ use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; +use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::{ColumnId, LogicalType}; use kite_sql_serde_macros::ReferenceSerialization; @@ -169,7 +170,7 @@ impl ColumnCatalog { self.desc .default .as_ref() - .map(|expr| expr.eval(None)) + .map(|expr| expr.eval::<&Tuple>(None)) .transpose() } diff --git a/src/execution/dql/join/hash/full_join.rs b/src/execution/dql/join/hash/full_join.rs new file mode 100644 index 00000000..8c8530f7 --- /dev/null +++ b/src/execution/dql/join/hash/full_join.rs @@ -0,0 +1,107 @@ +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; +use crate::execution::dql::join::hash_join::BuildState; +use crate::execution::dql::sort::BumpVec; +use crate::execution::Executor; +use crate::throw; +use crate::types::tuple::Tuple; +use crate::types::value::DataValue; +use ahash::HashMap; +use fixedbitset::FixedBitSet; + +pub(crate) struct FullJoinState { + pub(crate) left_schema_len: usize, + pub(crate) right_schema_len: usize, + pub(crate) bits: FixedBitSet, +} + +impl<'a> JoinProbeState<'a> for FullJoinState { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + let left_schema_len = self.left_schema_len; + let bits_ptr: *mut FixedBitSet = &mut self.bits; + + Box::new( + #[coroutine] + move || { + let ProbeArgs { probe_tuple, .. } = probe_args; + + if let ProbeArgs { + is_keys_has_null: false, + build_state: Some(build_state), + .. + } = probe_args + { + let mut has_filtered = false; + for (i, Tuple { values, pk }) in build_state.tuples.iter() { + let full_values = + Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned()); + + match &filter_args { + None => (), + Some(filter_args) => { + if !throw!(filter(&full_values, filter_args)) { + has_filtered = true; + unsafe { + (*bits_ptr).set(*i, true); + } + yield Ok(Self::full_right_row(left_schema_len, &probe_tuple)); + continue; + } + } + } + yield Ok(Tuple::new(pk.clone(), full_values)); + } + build_state.is_used = !has_filtered; + build_state.has_filted = has_filtered; + return; + } + + yield Ok(Self::full_right_row(left_schema_len, &probe_tuple)); + }, + ) + } + + fn left_drop( + &mut self, + _build_map: HashMap, BuildState>, + _filter_args: Option<&'a FilterArgs>, + ) -> Option> { + let full_schema_len = self.right_schema_len + self.left_schema_len; + let bits_ptr: *mut FixedBitSet = &mut self.bits; + + Some(Box::new( + #[coroutine] + move || { + for (_, state) in _build_map { + if state.is_used { + continue; + } + for (i, mut left_tuple) in state.tuples { + unsafe { + if !(*bits_ptr).contains(i) && state.has_filted { + continue; + } + } + left_tuple.values.resize(full_schema_len, DataValue::Null); + yield Ok(left_tuple); + } + } + }, + )) + } +} + +impl FullJoinState { + pub(crate) fn full_right_row(left_schema_len: usize, probe_tuple: &Tuple) -> Tuple { + let full_values = Vec::from_iter( + (0..left_schema_len) + .map(|_| DataValue::Null) + .chain(probe_tuple.values.iter().cloned()), + ); + + Tuple::new(probe_tuple.pk.clone(), full_values) + } +} diff --git a/src/execution/dql/join/hash/inner_join.rs b/src/execution/dql/join/hash/inner_join.rs new file mode 100644 index 00000000..2b0f3298 --- /dev/null +++ b/src/execution/dql/join/hash/inner_join.rs @@ -0,0 +1,45 @@ +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; +use crate::execution::Executor; +use crate::throw; +use crate::types::tuple::Tuple; + +pub(crate) struct InnerJoinState; + +impl<'a> JoinProbeState<'a> for InnerJoinState { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + Box::new( + #[coroutine] + move || { + let ProbeArgs { + is_keys_has_null: false, + probe_tuple, + build_state: Some(build_state), + .. + } = probe_args + else { + return; + }; + + build_state.is_used = true; + for (_, Tuple { values, pk }) in build_state.tuples.iter() { + let full_values = + Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned()); + + match &filter_args { + None => (), + Some(filter_args) => { + if !throw!(filter(&full_values, filter_args)) { + continue; + } + } + } + yield Ok(Tuple::new(pk.clone(), full_values)); + } + }, + ) + } +} diff --git a/src/execution/dql/join/hash/left_anti_join.rs b/src/execution/dql/join/hash/left_anti_join.rs new file mode 100644 index 00000000..fe1d5b1b --- /dev/null +++ b/src/execution/dql/join/hash/left_anti_join.rs @@ -0,0 +1,71 @@ +use crate::execution::dql::join::hash::left_semi_join::LeftSemiJoinState; +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; +use crate::execution::dql::join::hash_join::BuildState; +use crate::execution::dql::sort::BumpVec; +use crate::execution::Executor; +use crate::throw; +use crate::types::value::DataValue; +use ahash::HashMap; +use fixedbitset::FixedBitSet; + +pub(crate) struct LeftAntiJoinState { + pub(crate) right_schema_len: usize, + pub(crate) inner: LeftSemiJoinState, +} + +impl<'a> JoinProbeState<'a> for LeftAntiJoinState { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + self.inner.probe(probe_args, filter_args) + } + + fn left_drop( + &mut self, + _build_map: HashMap, BuildState>, + filter_args: Option<&'a FilterArgs>, + ) -> Option> { + let bits_ptr: *mut FixedBitSet = &mut self.inner.bits; + let right_schema_len = self.right_schema_len; + Some(Box::new( + #[coroutine] + move || { + for ( + _, + BuildState { + tuples, + is_used, + has_filted, + }, + ) in _build_map + { + if is_used { + continue; + } + for (i, tuple) in tuples { + unsafe { + if (*bits_ptr).contains(i) && has_filted { + continue; + } + } + if let Some(filter_args) = filter_args { + let full_values = Vec::from_iter( + tuple + .values + .iter() + .cloned() + .chain((0..right_schema_len).map(|_| DataValue::Null)), + ); + if !throw!(filter(&full_values, filter_args)) { + continue; + } + } + yield Ok(tuple); + } + } + }, + )) + } +} diff --git a/src/execution/dql/join/hash/left_join.rs b/src/execution/dql/join/hash/left_join.rs new file mode 100644 index 00000000..3e727508 --- /dev/null +++ b/src/execution/dql/join/hash/left_join.rs @@ -0,0 +1,90 @@ +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; +use crate::execution::dql::join::hash_join::BuildState; +use crate::execution::dql::sort::BumpVec; +use crate::execution::Executor; +use crate::throw; +use crate::types::tuple::Tuple; +use crate::types::value::DataValue; +use ahash::HashMap; +use fixedbitset::FixedBitSet; + +pub(crate) struct LeftJoinState { + pub(crate) left_schema_len: usize, + pub(crate) right_schema_len: usize, + pub(crate) bits: FixedBitSet, +} + +impl<'a> JoinProbeState<'a> for LeftJoinState { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + let bits_ptr: *mut FixedBitSet = &mut self.bits; + Box::new( + #[coroutine] + move || { + let ProbeArgs { + is_keys_has_null: false, + probe_tuple, + build_state: Some(build_state), + .. + } = probe_args + else { + return; + }; + + let mut has_filted = false; + for (i, Tuple { values, pk }) in build_state.tuples.iter() { + let full_values = + Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned()); + + match &filter_args { + None => (), + Some(filter_args) => { + if !throw!(filter(&full_values, filter_args)) { + has_filted = true; + unsafe { + (*bits_ptr).set(*i, true); + } + continue; + } + } + } + yield Ok(Tuple::new(pk.clone(), full_values)); + } + build_state.is_used = !has_filted; + build_state.has_filted = has_filted; + }, + ) + } + + fn left_drop( + &mut self, + _build_map: HashMap, BuildState>, + _filter_args: Option<&'a FilterArgs>, + ) -> Option> { + let full_schema_len = self.right_schema_len + self.left_schema_len; + let bits_ptr: *mut FixedBitSet = &mut self.bits; + + Some(Box::new( + #[coroutine] + move || { + for (_, state) in _build_map { + if state.is_used { + continue; + } + for (i, mut left_tuple) in state.tuples { + unsafe { + if !(*bits_ptr).contains(i) && state.has_filted { + continue; + } + } + left_tuple.values.resize(full_schema_len, DataValue::Null); + yield Ok(left_tuple); + } + } + }, + )) + } +} diff --git a/src/execution/dql/join/hash/left_semi_join.rs b/src/execution/dql/join/hash/left_semi_join.rs new file mode 100644 index 00000000..6a98a249 --- /dev/null +++ b/src/execution/dql/join/hash/left_semi_join.rs @@ -0,0 +1,92 @@ +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; +use crate::execution::dql::join::hash_join::BuildState; +use crate::execution::dql::sort::BumpVec; +use crate::execution::Executor; +use crate::throw; +use crate::types::tuple::Tuple; +use crate::types::value::DataValue; +use ahash::HashMap; +use fixedbitset::FixedBitSet; + +pub(crate) struct LeftSemiJoinState { + pub(crate) bits: FixedBitSet, +} + +impl<'a> JoinProbeState<'a> for LeftSemiJoinState { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + let bits_ptr: *mut FixedBitSet = &mut self.bits; + Box::new( + #[coroutine] + move || { + let ProbeArgs { + is_keys_has_null: false, + probe_tuple, + build_state: Some(build_state), + .. + } = probe_args + else { + return; + }; + + let mut has_filted = false; + for (i, Tuple { values, .. }) in build_state.tuples.iter() { + let full_values = + Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned()); + + match &filter_args { + None => (), + Some(filter_args) => { + if !throw!(filter(&full_values, filter_args)) { + has_filted = true; + unsafe { + (*bits_ptr).set(*i, true); + } + continue; + } + } + } + } + build_state.is_used = true; + build_state.has_filted = has_filted; + }, + ) + } + + fn left_drop( + &mut self, + _build_map: HashMap, BuildState>, + _filter_args: Option<&'a FilterArgs>, + ) -> Option> { + let bits_ptr: *mut FixedBitSet = &mut self.bits; + Some(Box::new( + #[coroutine] + move || { + for ( + _, + BuildState { + tuples, + is_used, + has_filted, + }, + ) in _build_map + { + if !is_used { + continue; + } + for (i, tuple) in tuples { + unsafe { + if (*bits_ptr).contains(i) && has_filted { + continue; + } + } + yield Ok(tuple); + } + } + }, + )) + } +} diff --git a/src/execution/dql/join/hash/mod.rs b/src/execution/dql/join/hash/mod.rs new file mode 100644 index 00000000..16289d9f --- /dev/null +++ b/src/execution/dql/join/hash/mod.rs @@ -0,0 +1,105 @@ +pub(crate) mod full_join; +pub(crate) mod inner_join; +pub(crate) mod left_anti_join; +pub(crate) mod left_join; +pub(crate) mod left_semi_join; +pub(crate) mod right_join; + +use crate::errors::DatabaseError; +use crate::execution::dql::join::hash::full_join::FullJoinState; +use crate::execution::dql::join::hash::inner_join::InnerJoinState; +use crate::execution::dql::join::hash::left_anti_join::LeftAntiJoinState; +use crate::execution::dql::join::hash::left_join::LeftJoinState; +use crate::execution::dql::join::hash::left_semi_join::LeftSemiJoinState; +use crate::execution::dql::join::hash::right_join::RightJoinState; +use crate::execution::dql::join::hash_join::BuildState; +use crate::execution::dql::sort::BumpVec; +use crate::execution::Executor; +use crate::expression::ScalarExpression; +use crate::types::tuple::{SchemaRef, Tuple}; +use crate::types::value::DataValue; +use ahash::HashMap; + +#[derive(Debug)] +pub(crate) struct ProbeArgs<'a> { + pub(crate) is_keys_has_null: bool, + pub(crate) probe_tuple: Tuple, + pub(crate) build_state: Option<&'a mut BuildState>, +} + +pub(crate) struct FilterArgs { + pub(crate) full_schema: SchemaRef, + pub(crate) filter_expr: ScalarExpression, +} + +pub(crate) trait JoinProbeState<'a> { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a>; + + #[allow(clippy::mutable_key_type)] + fn left_drop( + &mut self, + _build_map: HashMap, BuildState>, + _filter_args: Option<&'a FilterArgs>, + ) -> Option> { + None + } +} + +pub(crate) enum JoinProbeStateImpl { + Inner(InnerJoinState), + Left(LeftJoinState), + Right(RightJoinState), + Full(FullJoinState), + LeftSemi(LeftSemiJoinState), + LeftAnti(LeftAntiJoinState), +} + +impl<'a> JoinProbeState<'a> for JoinProbeStateImpl { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + match self { + JoinProbeStateImpl::Inner(state) => state.probe(probe_args, filter_args), + JoinProbeStateImpl::Left(state) => state.probe(probe_args, filter_args), + JoinProbeStateImpl::Right(state) => state.probe(probe_args, filter_args), + JoinProbeStateImpl::Full(state) => state.probe(probe_args, filter_args), + JoinProbeStateImpl::LeftSemi(state) => state.probe(probe_args, filter_args), + JoinProbeStateImpl::LeftAnti(state) => state.probe(probe_args, filter_args), + } + } + + fn left_drop( + &mut self, + _build_map: HashMap, BuildState>, + filter_args: Option<&'a FilterArgs>, + ) -> Option> { + match self { + JoinProbeStateImpl::Inner(state) => state.left_drop(_build_map, filter_args), + JoinProbeStateImpl::Left(state) => state.left_drop(_build_map, filter_args), + JoinProbeStateImpl::Right(state) => state.left_drop(_build_map, filter_args), + JoinProbeStateImpl::Full(state) => state.left_drop(_build_map, filter_args), + JoinProbeStateImpl::LeftSemi(state) => state.left_drop(_build_map, filter_args), + JoinProbeStateImpl::LeftAnti(state) => state.left_drop(_build_map, filter_args), + } + } +} + +pub(crate) fn filter(values: &[DataValue], filter_arg: &FilterArgs) -> Result { + let FilterArgs { + full_schema, + filter_expr, + .. + } = filter_arg; + + match &filter_expr.eval(Some((values, full_schema)))? { + DataValue::Boolean(false) | DataValue::Null => Ok(false), + DataValue::Boolean(true) => Ok(true), + _ => Err(DatabaseError::InvalidType), + } +} diff --git a/src/execution/dql/join/hash/right_join.rs b/src/execution/dql/join/hash/right_join.rs new file mode 100644 index 00000000..f84cab97 --- /dev/null +++ b/src/execution/dql/join/hash/right_join.rs @@ -0,0 +1,59 @@ +use crate::execution::dql::join::hash::full_join::FullJoinState; +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; +use crate::execution::Executor; +use crate::throw; +use crate::types::tuple::Tuple; + +pub(crate) struct RightJoinState { + pub(crate) left_schema_len: usize, +} + +impl<'a> JoinProbeState<'a> for RightJoinState { + fn probe( + &mut self, + probe_args: ProbeArgs<'a>, + filter_args: Option<&'a FilterArgs>, + ) -> Executor<'a> { + let left_schema_len = self.left_schema_len; + + Box::new( + #[coroutine] + move || { + let ProbeArgs { probe_tuple, .. } = probe_args; + + if let ProbeArgs { + is_keys_has_null: false, + build_state: Some(build_state), + .. + } = probe_args + { + let mut has_filtered = false; + for (_, Tuple { values, pk }) in build_state.tuples.iter() { + let full_values = + Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned()); + + match &filter_args { + None => (), + Some(filter_args) => { + if !throw!(filter(&full_values, filter_args)) { + has_filtered = true; + yield Ok(FullJoinState::full_right_row( + left_schema_len, + &probe_tuple, + )); + continue; + } + } + } + yield Ok(Tuple::new(pk.clone(), full_values)); + } + build_state.is_used = !has_filtered; + build_state.has_filted = has_filtered; + return; + } + + yield Ok(FullJoinState::full_right_row(left_schema_len, &probe_tuple)); + }, + ) + } +} diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index 731b1b5d..d2cb9419 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -1,26 +1,38 @@ use crate::catalog::ColumnRef; use crate::errors::DatabaseError; +use crate::execution::dql::join::hash::full_join::FullJoinState; +use crate::execution::dql::join::hash::inner_join::InnerJoinState; +use crate::execution::dql::join::hash::left_anti_join::LeftAntiJoinState; +use crate::execution::dql::join::hash::left_join::LeftJoinState; +use crate::execution::dql::join::hash::left_semi_join::LeftSemiJoinState; +use crate::execution::dql::join::hash::right_join::RightJoinState; +use crate::execution::dql::join::hash::{ + FilterArgs, JoinProbeState, JoinProbeStateImpl, ProbeArgs, +}; use crate::execution::dql::join::joins_nullable; +use crate::execution::dql::sort::BumpVec; use crate::execution::{build_read, Executor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; -use crate::types::tuple::{Schema, Tuple}; -use crate::types::value::{DataValue, NULL_VALUE}; +use crate::types::tuple::Tuple; +use crate::types::value::DataValue; use ahash::{HashMap, HashMapExt}; +use bumpalo::Bump; use fixedbitset::FixedBitSet; -use itertools::Itertools; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; +use std::sync::Arc; pub struct HashJoin { on: JoinCondition, ty: JoinType, left_input: LogicalPlan, right_input: LogicalPlan, + bump: Bump, } impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for HashJoin { @@ -36,6 +48,7 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for HashJoin { ty: join_type, left_input, right_input, + bump: Default::default(), } } } @@ -45,51 +58,25 @@ impl HashJoin { on_keys: &[ScalarExpression], tuple: &Tuple, schema: &[ColumnRef], - ) -> Result, DatabaseError> { - let mut values = Vec::with_capacity(on_keys.len()); - + build_buf: &mut BumpVec, + ) -> Result<(), DatabaseError> { + build_buf.clear(); for expr in on_keys { - values.push(expr.eval(Some((tuple, schema)))?); + build_buf.push(expr.eval(Some((tuple, schema)))?); } - Ok(values) + Ok(()) } +} - pub(crate) fn filter( - mut tuple: Tuple, - schema: &Schema, - filter: &Option, - join_ty: &JoinType, - left_schema_len: usize, - ) -> Result, DatabaseError> { - if let (Some(expr), false) = (filter, matches!(join_ty, JoinType::Full | JoinType::Cross)) { - match &expr.eval(Some((&tuple, schema)))? { - DataValue::Boolean(false) | DataValue::Null => { - let full_schema_len = schema.len(); - - match join_ty { - JoinType::LeftOuter => { - for i in left_schema_len..full_schema_len { - tuple.values[i] = NULL_VALUE.clone(); - } - } - JoinType::RightOuter => { - for i in 0..left_schema_len { - tuple.values[i] = NULL_VALUE.clone(); - } - } - _ => return Ok(None), - } - } - DataValue::Boolean(true) => (), - _ => return Err(DatabaseError::InvalidType), - } - } - - Ok(Some(tuple)) - } +#[derive(Default, Debug)] +pub(crate) struct BuildState { + pub(crate) tuples: Vec<(usize, Tuple)>, + pub(crate) is_used: bool, + pub(crate) has_filted: bool, } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { + #[allow(clippy::mutable_key_type)] fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), @@ -103,6 +90,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { ty, mut left_input, mut right_input, + mut bump, } = self; if ty == JoinType::Cross { @@ -131,6 +119,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { } } }; + let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); let mut full_schema_ref = Vec::clone(left_input.output_schema()); @@ -142,31 +131,52 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { &mut full_schema_ref[left_schema_len..], right_force_nullable, ); + let right_schema_len = full_schema_ref.len() - left_schema_len; + let full_schema_ref = Arc::new(full_schema_ref); // build phase: // 1.construct hashtable, one hash key may contains multiple rows indices. // 2.merged all left tuples. let mut coroutine = build_read(left_input, cache, transaction); let mut build_map = HashMap::new(); - let build_map_ptr: *mut HashMap, (Vec, bool, bool)> = - &mut build_map; + let bump_ptr: *mut Bump = &mut bump; + let build_map_ptr: *mut HashMap, BuildState> = &mut build_map; + + let mut buf_row = + BumpVec::with_capacity_in(on_left_keys.len(), unsafe { &mut (*bump_ptr) }); + let mut build_count = 0; while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple: Tuple = throw!(tuple); - let values = throw!(Self::eval_keys( + throw!(Self::eval_keys( &on_left_keys, &tuple, - &full_schema_ref[0..left_schema_len] + &full_schema_ref[0..left_schema_len], + &mut buf_row, )); - unsafe { - (*build_map_ptr) - .entry(values) - .or_insert_with(|| (Vec::new(), false, false)) - .0 - .push(tuple); + let build_map_ref = unsafe { &mut (*build_map_ptr) }; + match build_map_ref.get_mut(&buf_row) { + None => { + build_map_ref.insert( + buf_row.clone(), + BuildState { + tuples: vec![(build_count, tuple)], + ..Default::default() + }, + ); + } + Some(BuildState { tuples, .. }) => tuples.push((build_count, tuple)), } + build_count += 1; } + let mut join_impl = + Self::create_join_impl(self.ty, left_schema_len, right_schema_len, build_count); + let mut filter_arg = filter.map(|expr| FilterArgs { + full_schema: full_schema_ref.clone(), + filter_expr: expr, + }); + let filter_arg_ptr: *mut Option = &mut filter_arg; // probe phase let mut coroutine = build_read(right_input, cache, transaction); @@ -174,131 +184,71 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple: Tuple = throw!(tuple); - let right_cols_len = tuple.values.len(); - let values = throw!(Self::eval_keys( + throw!(Self::eval_keys( &on_right_keys, &tuple, - &full_schema_ref[left_schema_len..] + &full_schema_ref[left_schema_len..], + &mut buf_row )); - let has_null = values.iter().any(|value| value.is_null()); - let build_value = unsafe { (*build_map_ptr).get_mut(&values) }; - drop(values); - - if let (false, Some((tuples, is_used, is_filtered))) = (has_null, build_value) { - let mut bits_option = None; - *is_used = true; - - match ty { - JoinType::LeftSemi => { - if *is_filtered { - continue; - } else { - bits_option = Some(FixedBitSet::with_capacity(tuples.len())); - } - } - JoinType::LeftAnti => continue, - _ => (), - } - for (i, Tuple { values, pk }) in tuples.iter().enumerate() { - let full_values = values - .iter() - .chain(tuple.values.iter()) - .cloned() - .collect_vec(); - let tuple = Tuple::new(pk.clone(), full_values); - if let Some(tuple) = throw!(Self::filter( - tuple, - &full_schema_ref, - &filter, - &ty, - left_schema_len - )) { - if let Some(bits) = bits_option.as_mut() { - bits.insert(i); - } else { - yield Ok(tuple); - } - } - } - if let Some(bits) = bits_option { - let mut cnt = 0; - tuples.retain(|_| { - let res = bits.contains(cnt); - cnt += 1; - res - }); - *is_filtered = true - } - } else if matches!(ty, JoinType::RightOuter | JoinType::Full) { - let empty_len = full_schema_ref.len() - right_cols_len; - let values = (0..empty_len) - .map(|_| NULL_VALUE.clone()) - .chain(tuple.values) - .collect_vec(); - let tuple = Tuple::new(tuple.pk, values); - if let Some(tuple) = throw!(Self::filter( - tuple, - &full_schema_ref, - &filter, - &ty, - left_schema_len - )) { - yield Ok(tuple); - } + let build_value = unsafe { (*build_map_ptr).get_mut(&buf_row) }; + + let probe_args = ProbeArgs { + is_keys_has_null: buf_row.iter().any(|value| value.is_null()), + probe_tuple: tuple, + build_state: build_value, + }; + let mut executor = + join_impl.probe(probe_args, unsafe { &mut (*filter_arg_ptr) }.as_ref()); + while let CoroutineState::Yielded(tuple) = Pin::new(&mut executor).resume(()) { + yield tuple; } } - - // left drop - match ty { - JoinType::LeftOuter | JoinType::Full => { - for (_, (left_tuples, is_used, _)) in build_map { - if is_used { - continue; - } - for mut tuple in left_tuples { - while tuple.values.len() != full_schema_ref.len() { - tuple.values.push(NULL_VALUE.clone()); - } - yield Ok(tuple); - } - } - } - JoinType::LeftSemi | JoinType::LeftAnti => { - let is_left_semi = matches!(ty, JoinType::LeftSemi); - - for (_, (left_tuples, mut is_used, is_filtered)) in build_map { - if is_left_semi { - is_used = !is_used; - } - if is_used { - continue; - } - if is_filtered { - for tuple in left_tuples { - yield Ok(tuple); - } - continue; - } - for tuple in left_tuples { - if let Some(tuple) = throw!(Self::filter( - tuple, - &full_schema_ref, - &filter, - &ty, - left_schema_len - )) { - yield Ok(tuple); - } - } - } + let executor = + join_impl.left_drop(build_map, unsafe { &mut (*filter_arg_ptr) }.as_ref()); + if let Some(mut executor) = executor { + while let CoroutineState::Yielded(tuple) = Pin::new(&mut executor).resume(()) { + yield tuple; } - _ => (), } }, ) } } +impl HashJoin { + fn create_join_impl( + ty: JoinType, + left_schema_len: usize, + right_schema_len: usize, + build_count: usize, + ) -> JoinProbeStateImpl { + match ty { + JoinType::Inner => JoinProbeStateImpl::Inner(InnerJoinState), + JoinType::LeftOuter => JoinProbeStateImpl::Left(LeftJoinState { + left_schema_len, + right_schema_len, + bits: FixedBitSet::with_capacity(build_count), + }), + JoinType::LeftSemi => JoinProbeStateImpl::LeftSemi(LeftSemiJoinState { + bits: FixedBitSet::with_capacity(build_count), + }), + JoinType::LeftAnti => JoinProbeStateImpl::LeftAnti(LeftAntiJoinState { + right_schema_len, + inner: LeftSemiJoinState { + bits: FixedBitSet::with_capacity(build_count), + }, + }), + JoinType::RightOuter => JoinProbeStateImpl::Right(RightJoinState { left_schema_len }), + JoinType::Full => JoinProbeStateImpl::Full(FullJoinState { + left_schema_len, + right_schema_len, + bits: FixedBitSet::with_capacity(build_count), + }), + JoinType::Cross => unreachable!(), + } + } +} + #[cfg(test)] mod test { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; diff --git a/src/execution/dql/join/mod.rs b/src/execution/dql/join/mod.rs index 907dddcb..59ce4982 100644 --- a/src/execution/dql/join/mod.rs +++ b/src/execution/dql/join/mod.rs @@ -1,5 +1,6 @@ use crate::planner::operator::join::JoinType; +mod hash; pub(crate) mod hash_join; pub(crate) mod nested_loop_join; diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index d5c17ed2..1da1b466 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -12,7 +12,7 @@ use crate::planner::LogicalPlan; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{Schema, SchemaRef, Tuple}; -use crate::types::value::{DataValue, NULL_VALUE}; +use crate::types::value::DataValue; use fixedbitset::FixedBitSet; use itertools::Itertools; use std::borrow::Cow; @@ -253,7 +253,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { if !has_matched => { let right_tuple = - Tuple::new(None, vec![NULL_VALUE.clone(); right_schema_len]); + Tuple::new(None, vec![DataValue::Null; right_schema_len]); if matches!(ty, JoinType::RightOuter) { Self::emit_tuple(&right_tuple, &left_tuple, ty, false) } else { @@ -279,7 +279,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { { if !bitmap.as_ref().unwrap().contains(idx) { let mut right_tuple: Tuple = throw!(right_tuple); - let mut values = vec![NULL_VALUE.clone(); right_schema_len]; + let mut values = vec![DataValue::Null; right_schema_len]; values.append(&mut right_tuple.values); yield Ok(Tuple::new(right_tuple.pk, values)) @@ -318,11 +318,11 @@ impl NestedLoopJoin { values .iter_mut() .skip(left_len) - .for_each(|v| *v = NULL_VALUE.clone()); + .for_each(|v| *v = DataValue::Null); } JoinType::RightOuter if !is_matched => { (0..left_len).for_each(|i| { - values[i] = NULL_VALUE.clone(); + values[i] = DataValue::Null; }); } JoinType::LeftSemi => values.truncate(left_len), diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index 188e027e..aa896ad3 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -8,14 +8,15 @@ use crate::throw; use crate::types::tuple::{Schema, Tuple}; use bumpalo::Bump; use std::cmp::Ordering; +use std::mem::MaybeUninit; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; pub(crate) type BumpVec<'bump, T> = bumpalo::collections::Vec<'bump, T>; -#[derive(Clone)] -pub(crate) struct NullableVec<'a, T>(pub(crate) BumpVec<'a, Option>); +#[derive(Debug)] +pub(crate) struct NullableVec<'a, T>(pub(crate) BumpVec<'a, MaybeUninit>); impl<'a, T> NullableVec<'a, T> { #[inline] @@ -28,58 +29,81 @@ impl<'a, T> NullableVec<'a, T> { NullableVec(BumpVec::with_capacity_in(capacity, arena)) } + #[inline] + pub(crate) fn fill_capacity(capacity: usize, arena: &'a Bump) -> NullableVec<'a, T> { + let mut data = BumpVec::with_capacity_in(capacity, arena); + for _ in 0..capacity { + data.push(MaybeUninit::uninit()); + } + NullableVec(data) + } + #[inline] pub(crate) fn put(&mut self, item: T) { - self.0.push(Some(item)); + self.0.push(MaybeUninit::new(item)); + } + + #[inline] + pub(crate) fn set(&mut self, pos: usize, item: T) { + self.0[pos] = MaybeUninit::new(item); } #[inline] pub(crate) fn take(&mut self, offset: usize) -> T { - self.0[offset].take().unwrap() + unsafe { self.0[offset].assume_init_read() } } #[inline] pub(crate) fn get(&self, offset: usize) -> &T { - self.0[offset].as_ref().unwrap() + unsafe { self.0[offset].assume_init_ref() } } #[inline] pub(crate) fn len(&self) -> usize { self.0.len() } + + #[inline] + pub(crate) fn is_empty(&self) -> bool { + self.0.is_empty() + } + + #[inline] + pub(crate) fn iter(&self) -> impl Iterator { + self.0.iter().map(|item| unsafe { item.assume_init_ref() }) + } + + #[inline] + pub(crate) fn take_iter(&mut self) -> impl Iterator + '_ { + self.0 + .iter_mut() + .map(|item| unsafe { item.assume_init_read() }) + } + + #[inline] + pub(crate) fn into_iter(self) -> impl Iterator + 'a { + self.0 + .into_iter() + .map(|item| unsafe { item.assume_init_read() }) + } } -pub struct RemappingIterator<'a> { - pos: usize, +pub struct RemappingIterator<'a, T> { tuples: NullableVec<'a, (usize, Tuple)>, - indices: BumpVec<'a, usize>, + indices: T, } -impl RemappingIterator<'_> { - pub fn new<'a>( - pos: usize, - tuples: NullableVec<'a, (usize, Tuple)>, - indices: BumpVec<'a, usize>, - ) -> RemappingIterator<'a> { - RemappingIterator { - pos, - tuples, - indices, - } +impl> RemappingIterator<'_, T> { + pub fn new(tuples: NullableVec<(usize, Tuple)>, indices: T) -> RemappingIterator { + RemappingIterator { tuples, indices } } } -impl Iterator for RemappingIterator<'_> { +impl> Iterator for RemappingIterator<'_, T> { type Item = Tuple; fn next(&mut self) -> Option { - if self.pos > self.indices.len() - 1 { - return None; - } - let (_, tuple) = self.tuples.take(self.indices[self.pos]); - self.pos += 1; - - Some(tuple) + self.indices.next().map(|pos| self.tuples.take(pos).1) } } @@ -87,33 +111,50 @@ const BUCKET_SIZE: usize = u8::MAX as usize + 1; // LSD Radix Sort pub(crate) fn radix_sort<'a, T, A: AsRef<[u8]>>( - mut tuples: BumpVec<'a, (T, A)>, + tuples: &mut NullableVec<'a, (T, A)>, arena: &'a Bump, -) -> BumpVec<'a, T> { - if let Some(max_len) = tuples.iter().map(|(_, bytes)| bytes.as_ref().len()).max() { - // init buckets - let mut temp_buckets = BumpVec::with_capacity_in(BUCKET_SIZE, arena); - for _ in 0..BUCKET_SIZE { - temp_buckets.push(BumpVec::new_in(arena)); - } +) { + if tuples.is_empty() { + return; + } + let max_len = tuples + .iter() + .map(|(_, bytes)| bytes.as_ref().len()) + .max() + .unwrap(); - for i in (0..max_len).rev() { - for (t, value) in tuples.drain(..) { - let bytes = value.as_ref(); - let index = if bytes.len() > i { bytes[i] } else { 0 }; + let mut buf = NullableVec::fill_capacity(tuples.len(), arena); - temp_buckets[index as usize].push((t, value)); - } - for bucket in temp_buckets.iter_mut() { - tuples.append(bucket); + let mut count = [0usize; BUCKET_SIZE]; + let mut pos = [0usize; BUCKET_SIZE]; + + for i in (0..max_len).rev() { + count.fill(0); + + for (_, value) in tuples.iter() { + let bytes = value.as_ref(); + let idx = if bytes.len() > i { bytes[i] } else { 0 }; + count[idx as usize] += 1; + } + + { + let mut sum = 0; + for j in 0..BUCKET_SIZE { + let c = count[j]; + pos[j] = sum; + sum += c; } } + + for (t, value) in tuples.take_iter() { + let bytes = value.as_ref(); + let idx = if bytes.len() > i { bytes[i] } else { 0 }; + let p = pos[idx as usize]; + buf.set(p, (t, value)); + pos[idx as usize] += 1; + } + std::mem::swap(tuples, &mut buf); } - let mut result = BumpVec::with_capacity_in(tuples.len(), arena); - for (item, _) in tuples { - result.push(item); - } - result } pub enum SortBy { @@ -131,11 +172,9 @@ impl SortBy { ) -> Result + 'a>, DatabaseError> { match self { SortBy::Radix => { - let mut sort_keys = BumpVec::with_capacity_in(tuples.len(), arena); - - for (i, tuple) in tuples.0.iter().enumerate() { - debug_assert!(tuple.is_some()); + let mut sort_keys = NullableVec::with_capacity(tuples.len(), arena); + for (i, (_, tuple)) in tuples.iter().enumerate() { let mut full_key = BumpVec::new_in(arena); for SortField { @@ -145,7 +184,6 @@ impl SortBy { } in sort_fields { let mut key = BumpBytes::new_in(arena); - let tuple = tuple.as_ref().map(|(_, tuple)| tuple).unwrap(); expr.eval(Some((tuple, schema)))? .memcomparable_encode(&mut key)?; @@ -157,11 +195,14 @@ impl SortBy { key.push(if *nulls_first { u8::MIN } else { u8::MAX }); full_key.extend(key); } - sort_keys.push((i, full_key)) + sort_keys.put((i, full_key)) } - let indices = radix_sort(sort_keys, arena); + radix_sort(&mut sort_keys, arena); - Ok(Box::new(RemappingIterator::new(0, tuples, indices))) + Ok(Box::new(RemappingIterator::new( + tuples, + sort_keys.into_iter().map(|(i, _)| i), + ))) } SortBy::Fast => { let fn_nulls_first = |nulls_first: bool| { @@ -176,20 +217,14 @@ impl SortBy { let mut eval_values = vec![Vec::with_capacity(sort_fields.len()); tuples.len()]; for (x, SortField { expr, .. }) in sort_fields.iter().enumerate() { - for tuple in tuples.0.iter() { - debug_assert!(tuple.is_some()); - - let (_, tuple) = tuple.as_ref().unwrap(); + for (_, tuple) in tuples.iter() { eval_values[x].push(expr.eval(Some((tuple, schema)))?); } } tuples.0.sort_by(|tuple_1, tuple_2| { - debug_assert!(tuple_1.is_some()); - debug_assert!(tuple_2.is_some()); - - let (i_1, _) = tuple_1.as_ref().unwrap(); - let (i_2, _) = tuple_2.as_ref().unwrap(); + let (i_1, _) = unsafe { tuple_1.assume_init_ref() }; + let (i_2, _) = unsafe { tuple_2.assume_init_ref() }; let mut ordering = Ordering::Equal; for ( @@ -223,12 +258,7 @@ impl SortBy { }); drop(eval_values); - Ok(Box::new( - tuples - .0 - .into_iter() - .map(|tuple| tuple.map(|(_, tuple)| tuple).unwrap()), - )) + Ok(Box::new(tuples.into_iter().map(|(_, tuple)| tuple))) } } } @@ -317,15 +347,20 @@ mod test { fn test_radix_sort() { let arena = Bump::new(); { - let mut indices = BumpVec::new_in(&arena); - indices.push((0, "abc".as_bytes().to_vec())); - indices.push((1, "abz".as_bytes().to_vec())); - indices.push((2, "abe".as_bytes().to_vec())); - indices.push((3, "abcd".as_bytes().to_vec())); - - let indices = radix_sort(indices, &arena); - assert_eq!(indices.as_slice(), &[0, 3, 2, 1]); - drop(indices) + let mut indices = NullableVec::with_capacity(4, &arena); + indices.put((0usize, "abc".as_bytes().to_vec())); + indices.put((1, "abz".as_bytes().to_vec())); + indices.put((2, "abe".as_bytes().to_vec())); + indices.put((3, "abcd".as_bytes().to_vec())); + + radix_sort(&mut indices, &arena); + + let mut iter = indices.iter(); + + assert_eq!(Some(&(0, "abc".as_bytes().to_vec())), iter.next()); + assert_eq!(Some(&(3, "abcd".as_bytes().to_vec())), iter.next()); + assert_eq!(Some(&(2, "abe".as_bytes().to_vec())), iter.next()); + assert_eq!(Some(&(1, "abz".as_bytes().to_vec())), iter.next()); } } @@ -352,11 +387,13 @@ mod test { ))]); let arena = Bump::new(); - let mut inner = BumpVec::new_in(&arena); - inner.push(Some((0_usize, Tuple::new(None, vec![DataValue::Null])))); - inner.push(Some((1_usize, Tuple::new(None, vec![DataValue::Int32(0)])))); - inner.push(Some((2_usize, Tuple::new(None, vec![DataValue::Int32(1)])))); - let tuples = NullableVec(inner); + let fn_tuples = || { + let mut vec = NullableVec::new(&arena); + vec.put((0_usize, Tuple::new(None, vec![DataValue::Null]))); + vec.put((1_usize, Tuple::new(None, vec![DataValue::Int32(0)]))); + vec.put((2_usize, Tuple::new(None, vec![DataValue::Int32(1)]))); + vec + }; let fn_asc_and_nulls_last_eq = |mut iter: Box>| { if let Some(tuple) = iter.next() { @@ -432,25 +469,25 @@ mod test { &arena, &schema, &fn_sort_fields(true, true), - tuples.clone(), + fn_tuples(), )?); fn_asc_and_nulls_last_eq(SortBy::Radix.sorted_tuples( &arena, &schema, &fn_sort_fields(true, false), - tuples.clone(), + fn_tuples(), )?); fn_desc_and_nulls_first_eq(SortBy::Radix.sorted_tuples( &arena, &schema, &fn_sort_fields(false, true), - tuples.clone(), + fn_tuples(), )?); fn_desc_and_nulls_last_eq(SortBy::Radix.sorted_tuples( &arena, &schema, &fn_sort_fields(false, false), - tuples.clone(), + fn_tuples(), )?); // FastSort @@ -458,25 +495,25 @@ mod test { &arena, &schema, &fn_sort_fields(true, true), - tuples.clone(), + fn_tuples(), )?); fn_asc_and_nulls_last_eq(SortBy::Fast.sorted_tuples( &arena, &schema, &fn_sort_fields(true, false), - tuples.clone(), + fn_tuples(), )?); fn_desc_and_nulls_first_eq(SortBy::Fast.sorted_tuples( &arena, &schema, &fn_sort_fields(false, true), - tuples.clone(), + fn_tuples(), )?); fn_desc_and_nulls_last_eq(SortBy::Fast.sorted_tuples( &arena, &schema, &&fn_sort_fields(false, false), - tuples.clone(), + fn_tuples(), )?); Ok(()) @@ -528,32 +565,35 @@ mod test { )), ]); let arena = Bump::new(); - let mut inner = BumpVec::new_in(&arena); - inner.push(Some(( - 0_usize, - Tuple::new(None, vec![DataValue::Null, DataValue::Null]), - ))); - inner.push(Some(( - 1_usize, - Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), - ))); - inner.push(Some(( - 2_usize, - Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), - ))); - inner.push(Some(( - 3_usize, - Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), - ))); - inner.push(Some(( - 4_usize, - Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), - ))); - inner.push(Some(( - 5_usize, - Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), - ))); - let tuples = NullableVec(inner); + + let fn_tuples = || { + let mut vec = NullableVec::new(&arena); + vec.put(( + 0_usize, + Tuple::new(None, vec![DataValue::Null, DataValue::Null]), + )); + vec.put(( + 1_usize, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), + )); + vec.put(( + 2_usize, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), + )); + vec.put(( + 3_usize, + Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), + )); + vec.put(( + 4_usize, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), + )); + vec.put(( + 5_usize, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), + )); + vec + }; let fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = |mut iter: Box>| { if let Some(tuple) = iter.next() { @@ -692,25 +732,25 @@ mod test { &arena, &schema, &fn_sort_fields(true, true, true, true), - tuples.clone(), + fn_tuples(), )?); fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( &arena, &schema, &fn_sort_fields(true, false, true, true), - tuples.clone(), + fn_tuples(), )?); fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( &arena, &schema, &fn_sort_fields(false, true, true, true), - tuples.clone(), + fn_tuples(), )?); fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( &arena, &schema, &fn_sort_fields(false, false, true, true), - tuples.clone(), + fn_tuples(), )?); // FastSort @@ -718,25 +758,25 @@ mod test { &arena, &schema, &fn_sort_fields(true, true, true, true), - tuples.clone(), + fn_tuples(), )?); fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( &arena, &schema, &fn_sort_fields(true, false, true, true), - tuples.clone(), + fn_tuples(), )?); fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( &arena, &schema, &fn_sort_fields(false, true, true, true), - tuples.clone(), + fn_tuples(), )?); fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( &arena, &schema, &fn_sort_fields(false, false, true, true), - tuples.clone(), + fn_tuples(), )?); Ok(()) diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index b1c6ce03..3ac9b536 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -3,7 +3,6 @@ use crate::errors::DatabaseError; use crate::expression::function::scala::ScalarFunction; use crate::expression::{AliasType, BinaryOperator, ScalarExpression}; use crate::types::evaluator::EvaluatorFactory; -use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type}; use crate::types::LogicalType; use regex::Regex; @@ -22,7 +21,10 @@ macro_rules! eval_to_num { } impl ScalarExpression { - pub fn eval(&self, tuple: Option<(&Tuple, &[ColumnRef])>) -> Result { + pub fn eval<'a, T: Into<&'a [DataValue]> + Copy>( + &self, + tuple: Option<(T, &[ColumnRef])>, + ) -> Result { let check_cast = |value: DataValue, return_type: &LogicalType| { if value.logical_type() != *return_type { return value.cast(return_type); @@ -38,8 +40,7 @@ impl ScalarExpression { }; let position = position .ok_or_else(|| DatabaseError::UnbindExpressionPosition(self.clone()))?; - - Ok(tuple.values[position].clone()) + Ok(tuple.into()[position].clone()) } ScalarExpression::Alias { expr, alias } => { let Some((tuple, schema)) = tuple else { @@ -247,9 +248,9 @@ impl ScalarExpression { } Ok(DataValue::Tuple(values, false)) } - ScalarExpression::ScalaFunction(ScalarFunction { inner, args, .. }) => { - inner.eval(args, tuple)?.cast(inner.return_type()) - } + ScalarExpression::ScalaFunction(ScalarFunction { inner, args, .. }) => inner + .eval(args, tuple.map(|(a, b)| (a.into(), b)))? + .cast(inner.return_type()), ScalarExpression::Empty => unreachable!(), ScalarExpression::If { condition, diff --git a/src/expression/function/scala.rs b/src/expression/function/scala.rs index cbc0f744..1fa4df46 100644 --- a/src/expression/function/scala.rs +++ b/src/expression/function/scala.rs @@ -2,7 +2,6 @@ use crate::catalog::ColumnRef; use crate::errors::DatabaseError; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use kite_sql_serde_macros::ReferenceSerialization; @@ -54,7 +53,7 @@ pub trait ScalarFunctionImpl: Debug + Send + Sync { fn eval( &self, args: &[ScalarExpression], - tuple: Option<(&Tuple, &[ColumnRef])>, + tuple: Option<(&[DataValue], &[ColumnRef])>, ) -> Result; // TODO: Exploiting monotonicity when optimizing `ScalarFunctionImpl::monotonicity()` diff --git a/src/expression/range_detacher.rs b/src/expression/range_detacher.rs index 0858c1ba..bd475837 100644 --- a/src/expression/range_detacher.rs +++ b/src/expression/range_detacher.rs @@ -1,7 +1,7 @@ use crate::catalog::ColumnRef; use crate::errors::DatabaseError; use crate::expression::{BinaryOperator, ScalarExpression}; -use crate::types::value::{DataValue, NULL_VALUE}; +use crate::types::value::DataValue; use crate::types::ColumnId; use itertools::Itertools; use kite_sql_serde_macros::ReferenceSerialization; @@ -223,7 +223,7 @@ impl<'a> RangeDetacher<'a> { // Range::NotEq(NULL_VALUE.clone()) Ok(None) } else { - Ok(Some(Range::Eq(NULL_VALUE.clone()))) + Ok(Some(Range::Eq(DataValue::Null))) }; } } diff --git a/src/function/char_length.rs b/src/function/char_length.rs index 817f591a..724887ec 100644 --- a/src/function/char_length.rs +++ b/src/function/char_length.rs @@ -4,7 +4,6 @@ use crate::expression::function::scala::FuncMonotonicity; use crate::expression::function::scala::ScalarFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use serde::Deserialize; @@ -35,7 +34,7 @@ impl ScalarFunctionImpl for CharLength { fn eval( &self, exprs: &[ScalarExpression], - tuples: Option<(&Tuple, &[ColumnRef])>, + tuples: Option<(&[DataValue], &[ColumnRef])>, ) -> Result { let mut value = exprs[0].eval(tuples)?; if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { diff --git a/src/function/current_date.rs b/src/function/current_date.rs index f48e128b..293e4541 100644 --- a/src/function/current_date.rs +++ b/src/function/current_date.rs @@ -4,7 +4,6 @@ use crate::expression::function::scala::FuncMonotonicity; use crate::expression::function::scala::ScalarFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use chrono::{Datelike, Local}; @@ -37,7 +36,7 @@ impl ScalarFunctionImpl for CurrentDate { fn eval( &self, _: &[ScalarExpression], - _: Option<(&Tuple, &[ColumnRef])>, + _: Option<(&[DataValue], &[ColumnRef])>, ) -> Result { Ok(DataValue::Date32(Local::now().num_days_from_ce())) } diff --git a/src/function/current_timestamp.rs b/src/function/current_timestamp.rs index 96a4fe07..333fa0a9 100644 --- a/src/function/current_timestamp.rs +++ b/src/function/current_timestamp.rs @@ -4,7 +4,6 @@ use crate::expression::function::scala::FuncMonotonicity; use crate::expression::function::scala::ScalarFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use chrono::Utc; @@ -37,7 +36,7 @@ impl ScalarFunctionImpl for CurrentTimeStamp { fn eval( &self, _: &[ScalarExpression], - _: Option<(&Tuple, &[ColumnRef])>, + _: Option<(&[DataValue], &[ColumnRef])>, ) -> Result { Ok(DataValue::Time64(Utc::now().timestamp(), 0, false)) } diff --git a/src/function/lower.rs b/src/function/lower.rs index 91bcae26..cc3505ce 100644 --- a/src/function/lower.rs +++ b/src/function/lower.rs @@ -4,7 +4,6 @@ use crate::expression::function::scala::FuncMonotonicity; use crate::expression::function::scala::ScalarFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use serde::Deserialize; @@ -37,7 +36,7 @@ impl ScalarFunctionImpl for Lower { fn eval( &self, exprs: &[ScalarExpression], - tuples: Option<(&Tuple, &[ColumnRef])>, + tuples: Option<(&[DataValue], &[ColumnRef])>, ) -> Result { let mut value = exprs[0].eval(tuples)?; if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { diff --git a/src/function/numbers.rs b/src/function/numbers.rs index c0847dc1..4aceca9d 100644 --- a/src/function/numbers.rs +++ b/src/function/numbers.rs @@ -52,7 +52,7 @@ impl TableFunctionImpl for Numbers { &self, args: &[ScalarExpression], ) -> Result>>, DatabaseError> { - let mut value = args[0].eval(None)?; + let mut value = args[0].eval::<&Tuple>(None)?; if value.logical_type() != LogicalType::Integer { value = value.cast(&LogicalType::Integer)?; diff --git a/src/function/octet_length.rs b/src/function/octet_length.rs index b712ee3c..dafacaf9 100644 --- a/src/function/octet_length.rs +++ b/src/function/octet_length.rs @@ -4,7 +4,6 @@ use crate::expression::function::scala::FuncMonotonicity; use crate::expression::function::scala::ScalarFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use serde::Deserialize; @@ -36,7 +35,7 @@ impl ScalarFunctionImpl for OctetLength { fn eval( &self, exprs: &[ScalarExpression], - tuples: Option<(&Tuple, &[ColumnRef])>, + tuples: Option<(&[DataValue], &[ColumnRef])>, ) -> Result { let mut value = exprs[0].eval(tuples)?; if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { diff --git a/src/function/upper.rs b/src/function/upper.rs index d41200b8..71adcd4f 100644 --- a/src/function/upper.rs +++ b/src/function/upper.rs @@ -4,7 +4,6 @@ use crate::expression::function::scala::FuncMonotonicity; use crate::expression::function::scala::ScalarFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; use serde::Deserialize; @@ -37,7 +36,7 @@ impl ScalarFunctionImpl for Upper { fn eval( &self, exprs: &[ScalarExpression], - tuples: Option<(&Tuple, &[ColumnRef])>, + tuples: Option<(&[DataValue], &[ColumnRef])>, ) -> Result { let mut value = exprs[0].eval(tuples)?; if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) { diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 66cdb4df..6a271540 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -93,7 +93,7 @@ macro_rules! scala_function { #[typetag::serde] impl ::kite_sql::expression::function::scala::ScalarFunctionImpl for $struct_name { #[allow(unused_variables, clippy::redundant_closure_call)] - fn eval(&self, args: &[::kite_sql::expression::ScalarExpression], tuple: Option<(&::kite_sql::types::tuple::Tuple, &[::kite_sql::catalog::column::ColumnRef])>) -> Result<::kite_sql::types::value::DataValue, ::kite_sql::errors::DatabaseError> { + fn eval(&self, args: &[::kite_sql::expression::ScalarExpression], tuple: Option<(&[::kite_sql::types::value::DataValue], &[::kite_sql::catalog::column::ColumnRef])>) -> Result<::kite_sql::types::value::DataValue, ::kite_sql::errors::DatabaseError> { let mut _index = 0; $closure($({ @@ -184,7 +184,7 @@ macro_rules! table_function { let mut _index = 0; $closure($({ - let mut value = args[_index].eval(None)?; + let mut value = args[_index].eval::<&::kite_sql::types::tuple::Tuple>(None)?; _index += 1; if value.logical_type() != $arg_ty { diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 69319edd..60512ab4 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -22,7 +22,7 @@ pub struct HistogramBuilder { null_count: usize, values: Option>, - sort_keys: Option)>>, + sort_keys: Option)>>, value_index: usize, } @@ -61,7 +61,7 @@ impl HistogramBuilder { .capacity .map(|capacity| { ( - NullableVec::<(usize, DataValue)>::with_capacity(capacity, &self.arena), + NullableVec::<(usize, DataValue)>::fill_capacity(capacity, &self.arena), BumpVec::<(usize, BumpBytes<'static>)>::with_capacity_in(capacity, &self.arena), ) }) @@ -101,7 +101,7 @@ impl HistogramBuilder { self.sort_keys .as_mut() .unwrap() - .push((self.value_index, unsafe { mem::transmute::<_, _>(bytes) })) + .put((self.value_index, unsafe { mem::transmute::<_, _>(bytes) })) } self.value_index += 1; @@ -129,21 +129,21 @@ impl HistogramBuilder { .. } = self; let mut values = values.unwrap(); - let sort_keys = sort_keys.unwrap(); + let mut sort_keys = sort_keys.unwrap(); let mut buckets = Vec::with_capacity(number_of_buckets); let bucket_len = if values_len % number_of_buckets == 0 { values_len / number_of_buckets } else { (values_len + number_of_buckets) / number_of_buckets }; - let sorted_indices = radix_sort(sort_keys, &arena); + radix_sort(&mut sort_keys, &arena); for i in 0..number_of_buckets { let mut bucket = Bucket::empty(); let j = (i + 1) * bucket_len; bucket.upper = values - .get(sorted_indices[cmp::min(j, values_len) - 1]) + .get(sort_keys.get(cmp::min(j, values_len) - 1).0) .1 .clone(); buckets.push(bucket); @@ -152,7 +152,7 @@ impl HistogramBuilder { let mut number_of_distinct_value = 0; let mut last_value: Option = None; - for (i, index) in sorted_indices.into_iter().enumerate() { + for (i, (index, _)) in sort_keys.into_iter().enumerate() { let (ordinal, value) = values.take(index); sketch.increment(&value); diff --git a/src/types/tuple.rs b/src/types/tuple.rs index af2850a3..5cfdc020 100644 --- a/src/types/tuple.rs +++ b/src/types/tuple.rs @@ -22,6 +22,12 @@ pub struct Tuple { pub values: Vec, } +impl<'a> From<&'a Tuple> for &'a [DataValue] { + fn from(val: &'a Tuple) -> Self { + val.values.as_slice() + } +} + impl Tuple { pub fn new(pk: Option, values: Vec) -> Self { Tuple { pk, values } diff --git a/src/types/value.rs b/src/types/value.rs index bb19254d..8c95d3cc 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -16,8 +16,6 @@ use std::str::FromStr; use std::sync::LazyLock; use std::{cmp, fmt, mem}; -pub static NULL_VALUE: LazyLock = LazyLock::new(|| DataValue::Null); - static UNIX_DATETIME: LazyLock = LazyLock::new(|| DateTime::from_timestamp(0, 0).unwrap().naive_utc()); diff --git a/tests/slt/crdb/join.slt b/tests/slt/crdb/join.slt index 1246ee03..d9c13f42 100644 --- a/tests/slt/crdb/join.slt +++ b/tests/slt/crdb/join.slt @@ -373,8 +373,9 @@ query III SELECT * FROM a FULL OUTER JOIN b ON (a.i = b.i and a.i>2) ORDER BY a.i, b.i ---- 0 1 null null null -1 2 0 2 true +1 2 null null null 2 3 1 3 true +null null 0 2 true null null 2 4 false statement ok From ec83c122295f41f7bdb76673dbfddbf2f8a5110f Mon Sep 17 00:00:00 2001 From: Kould Date: Sun, 5 Oct 2025 17:11:12 +0800 Subject: [PATCH 2/2] chore: fix histogram init --- src/execution/dql/sort.rs | 2 +- src/optimizer/core/histogram.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index aa896ad3..b2c82412 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -334,7 +334,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Sort { mod test { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; use crate::errors::DatabaseError; - use crate::execution::dql::sort::{radix_sort, BumpVec, NullableVec, SortBy}; + use crate::execution::dql::sort::{radix_sort, NullableVec, SortBy}; use crate::expression::ScalarExpression; use crate::planner::operator::sort::SortField; use crate::types::tuple::Tuple; diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 60512ab4..a8011bf6 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -61,7 +61,7 @@ impl HistogramBuilder { .capacity .map(|capacity| { ( - NullableVec::<(usize, DataValue)>::fill_capacity(capacity, &self.arena), + NullableVec::<(usize, DataValue)>::with_capacity(capacity, &self.arena), BumpVec::<(usize, BumpBytes<'static>)>::with_capacity_in(capacity, &self.arena), ) })