From e5c67a59cd1b3d4ee96725044c2774a4df2fb30e Mon Sep 17 00:00:00 2001 From: wszhdshys <1925792291@qq.com> Date: Tue, 5 Aug 2025 00:48:04 +0800 Subject: [PATCH 1/6] The topk algorithm is implemented to optimize the use of both order by and limit --- src/db.rs | 5 + src/execution/dql/mod.rs | 1 + src/execution/dql/sort.rs | 26 ++- src/execution/dql/top_k.rs | 153 ++++++++++++++++++ src/execution/mod.rs | 6 + .../rule/normalization/column_pruning.rs | 3 +- .../normalization/compilation_in_advance.rs | 10 ++ src/optimizer/rule/normalization/mod.rs | 7 +- src/optimizer/rule/normalization/top_k.rs | 46 ++++++ src/planner/mod.rs | 2 +- src/planner/operator/mod.rs | 12 +- src/planner/operator/top_k.rs | 49 ++++++ 12 files changed, 308 insertions(+), 12 deletions(-) create mode 100644 src/execution/dql/top_k.rs create mode 100644 src/optimizer/rule/normalization/top_k.rs create mode 100644 src/planner/operator/top_k.rs diff --git a/src/db.rs b/src/db.rs index 4b0a3359..6d52d882 100644 --- a/src/db.rs +++ b/src/db.rs @@ -227,6 +227,11 @@ impl State { NormalizationRuleImpl::CombineFilter, ], ) + .batch( + "TopK".to_string(), + HepBatchStrategy::once_topdown(), + vec![NormalizationRuleImpl::TopK], + ) .batch( "Expression Remapper".to_string(), HepBatchStrategy::once_topdown(), diff --git a/src/execution/dql/mod.rs b/src/execution/dql/mod.rs index 40540248..d59b5dd8 100644 --- a/src/execution/dql/mod.rs +++ b/src/execution/dql/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod seq_scan; pub(crate) mod show_table; pub(crate) mod show_view; pub(crate) mod sort; +pub(crate) mod top_k; pub(crate) mod union; pub(crate) mod values; diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index e253c446..fbc6f740 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -15,7 +15,7 @@ use std::pin::Pin; pub(crate) type BumpVec<'bump, T> = bumpalo::collections::Vec<'bump, T>; #[derive(Clone)] -pub(crate) struct NullableVec<'a, T>(BumpVec<'a, Option>); +pub(crate) struct NullableVec<'a, T>(pub(crate) BumpVec<'a, Option>); impl<'a, T> NullableVec<'a, T> { #[inline] @@ -49,17 +49,31 @@ impl<'a, T> NullableVec<'a, T> { } } -struct RemappingIterator<'a> { +pub struct RemappingIterator<'a> { pos: usize, tuples: NullableVec<'a, (usize, Tuple)>, indices: BumpVec<'a, usize>, } +impl RemappingIterator<'_> { + pub fn new<'a>( + pos: usize, + tuples: NullableVec<'a, (usize, Tuple)>, + indices: BumpVec<'a, usize>, + ) -> RemappingIterator<'a> { + RemappingIterator { + pos, + tuples, + indices, + } + } +} + impl Iterator for RemappingIterator<'_> { type Item = Tuple; fn next(&mut self) -> Option { - if self.pos > self.tuples.len() - 1 { + if self.pos > self.indices.len() - 1 { return None; } let (_, tuple) = self.tuples.take(self.indices[self.pos]); @@ -147,11 +161,7 @@ impl SortBy { } let indices = radix_sort(sort_keys, arena); - Ok(Box::new(RemappingIterator { - pos: 0, - tuples, - indices, - })) + Ok(Box::new(RemappingIterator::new(0, tuples, indices))) } SortBy::Fast => { let fn_nulls_first = |nulls_first: bool| { diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs new file mode 100644 index 00000000..05b847f0 --- /dev/null +++ b/src/execution/dql/top_k.rs @@ -0,0 +1,153 @@ +use crate::errors::DatabaseError; +use crate::execution::dql::sort::{BumpVec, NullableVec, RemappingIterator}; +use crate::execution::{build_read, Executor, ReadExecutor}; +use crate::planner::operator::sort::SortField; +use crate::planner::operator::top_k::TopKOperator; +use crate::planner::LogicalPlan; +use crate::storage::table_codec::BumpBytes; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; +use crate::throw; +use crate::types::tuple::{Schema, Tuple}; +use bumpalo::Bump; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::ops::Coroutine; +use std::ops::CoroutineState; +use std::pin::Pin; + +fn top_sort<'a>( + arena: &'a Bump, + schema: &Schema, + sort_fields: &[SortField], + tuples: NullableVec<'a, (usize, Tuple)>, + limit: Option, + offset: Option, +) -> Result + 'a>, DatabaseError> { + let mut sort_keys = BumpVec::with_capacity_in(tuples.len(), arena); + for (i, tuple) in tuples.0.iter().enumerate() { + let mut full_key = BumpVec::new_in(arena); + for SortField { + expr, + nulls_first, + asc, + } in sort_fields + { + let mut key = BumpBytes::new_in(arena); + let tuple = tuple.as_ref().map(|(_, tuple)| tuple).unwrap(); + expr.eval(Some((tuple, &**schema)))? + .memcomparable_encode(&mut key)?; + if *asc { + for byte in key.iter_mut() { + *byte ^= 0xFF; + } + } + key.push(if *nulls_first { u8::MIN } else { u8::MAX }); + full_key.extend(key); + } + //full_key.extend_from_slice(&(i as u64).to_be_bytes()); + sort_keys.push((i, full_key)) + } + + let keep_count = offset.unwrap_or(0) + limit.unwrap_or(sort_keys.len()); + + let mut heap: BinaryHeap> = BinaryHeap::with_capacity(keep_count); + for (i, key) in sort_keys.iter() { + let key = key.as_slice(); + if heap.len() < keep_count { + heap.push(Reverse((key, *i))); + } else if let Some(&Reverse((min_key, _))) = heap.peek() { + if key > min_key { + heap.pop(); + heap.push(Reverse((key, *i))); + } + } + } + + let mut topk: Vec<(Vec, usize)> = heap + .into_iter() + .map(|Reverse((key, i))| (key.to_vec(), i)) + .collect(); + topk.sort_by(|(k1, i1), (k2, i2)| k1.cmp(k2).then_with(|| i1.cmp(i2).reverse())); + topk.reverse(); + + let mut bumped_indices = + BumpVec::with_capacity_in(topk.len().saturating_sub(offset.unwrap_or(0)), arena); + for (_, idx) in topk.into_iter().skip(offset.unwrap_or(0)) { + bumped_indices.push(idx); + } + Ok(Box::new(RemappingIterator::new(0, tuples, bumped_indices))) +} + +pub struct TopK { + arena: Bump, + sort_fields: Vec, + limit: Option, + offset: Option, + input: LogicalPlan, +} + +impl From<(TopKOperator, LogicalPlan)> for TopK { + fn from( + ( + TopKOperator { + sort_fields, + limit, + offset, + }, + input, + ): (TopKOperator, LogicalPlan), + ) -> Self { + TopK { + arena: Default::default(), + sort_fields, + limit, + offset, + input, + } + } +} + +impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK { + fn execute( + self, + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + transaction: *mut T, + ) -> Executor<'a> { + Box::new( + #[coroutine] + move || { + let TopK { + arena, + sort_fields, + limit, + offset, + mut input, + } = self; + + let arena: *const Bump = &arena; + + let mut tuples = NullableVec::new(unsafe { &*arena }); + let schema = input.output_schema().clone(); + let mut tuple_offset = 0; + + let mut coroutine = build_read(input, cache, transaction); + + while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { + tuples.put((tuple_offset, throw!(tuple))); + tuple_offset += 1; + } + + for tuple in throw!(top_sort( + unsafe { &*arena }, + &schema, + &sort_fields, + tuples, + limit, + offset + )) { + yield Ok(tuple) + } + }, + ) + } +} diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 85a868a9..202a878d 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -36,6 +36,7 @@ use crate::execution::dql::seq_scan::SeqScan; use crate::execution::dql::show_table::ShowTables; use crate::execution::dql::show_view::ShowViews; use crate::execution::dql::sort::Sort; +use crate::execution::dql::top_k::TopK; use crate::execution::dql::union::Union; use crate::execution::dql::values::Values; use crate::planner::operator::join::JoinCondition; @@ -133,6 +134,11 @@ pub fn build_read<'a, T: Transaction + 'a>( Limit::from((op, input)).execute(cache, transaction) } + Operator::TopK(op) => { + let input = childrens.pop_only(); + + TopK::from((op, input)).execute(cache, transaction) + } Operator::Values(op) => Values::from(op).execute(cache, transaction), Operator::ShowTable => ShowTables.execute(cache, transaction), Operator::ShowView => ShowViews.execute(cache, transaction), diff --git a/src/optimizer/rule/normalization/column_pruning.rs b/src/optimizer/rule/normalization/column_pruning.rs index e76dd1fc..b5e8be7a 100644 --- a/src/optimizer/rule/normalization/column_pruning.rs +++ b/src/optimizer/rule/normalization/column_pruning.rs @@ -111,7 +111,8 @@ impl ColumnPruning { | Operator::Join(_) | Operator::Filter(_) | Operator::Union(_) - | Operator::Except(_) => { + | Operator::Except(_) + | Operator::TopK(_) => { let temp_columns = operator.referenced_columns(false); // why? let mut column_references = column_references; diff --git a/src/optimizer/rule/normalization/compilation_in_advance.rs b/src/optimizer/rule/normalization/compilation_in_advance.rs index db48a303..39a60360 100644 --- a/src/optimizer/rule/normalization/compilation_in_advance.rs +++ b/src/optimizer/rule/normalization/compilation_in_advance.rs @@ -76,6 +76,11 @@ impl ExpressionRemapper { TryReference::new(output_exprs).visit(&mut sort_field.expr)?; } } + Operator::TopK(op) => { + for sort_field in op.sort_fields.iter_mut() { + TryReference::new(output_exprs).visit(&mut sort_field.expr)?; + } + } Operator::FunctionScan(op) => { for expr in op.table_function.args.iter_mut() { TryReference::new(output_exprs).visit(expr)?; @@ -186,6 +191,11 @@ impl EvaluatorBind { BindEvaluator.visit(&mut sort_field.expr)?; } } + Operator::TopK(op) => { + for sort_field in op.sort_fields.iter_mut() { + BindEvaluator.visit(&mut sort_field.expr)?; + } + } Operator::FunctionScan(op) => { for expr in op.table_function.args.iter_mut() { BindEvaluator.visit(expr)?; diff --git a/src/optimizer/rule/normalization/mod.rs b/src/optimizer/rule/normalization/mod.rs index 8c30fd4a..01ce3705 100644 --- a/src/optimizer/rule/normalization/mod.rs +++ b/src/optimizer/rule/normalization/mod.rs @@ -10,6 +10,7 @@ use crate::optimizer::rule::normalization::combine_operators::{ use crate::optimizer::rule::normalization::compilation_in_advance::{ EvaluatorBind, ExpressionRemapper, }; + use crate::optimizer::rule::normalization::pushdown_limit::{ LimitProjectTranspose, PushLimitIntoScan, PushLimitThroughJoin, }; @@ -17,13 +18,14 @@ use crate::optimizer::rule::normalization::pushdown_predicates::PushPredicateInt use crate::optimizer::rule::normalization::pushdown_predicates::PushPredicateThroughJoin; use crate::optimizer::rule::normalization::simplification::ConstantCalculation; use crate::optimizer::rule::normalization::simplification::SimplifyFilter; - +use crate::optimizer::rule::normalization::top_k::TopK; mod column_pruning; mod combine_operators; mod compilation_in_advance; mod pushdown_limit; mod pushdown_predicates; mod simplification; +mod top_k; #[derive(Debug, Copy, Clone)] pub enum NormalizationRuleImpl { @@ -46,6 +48,7 @@ pub enum NormalizationRuleImpl { // CompilationInAdvance ExpressionRemapper, EvaluatorBind, + TopK, } impl MatchPattern for NormalizationRuleImpl { @@ -64,6 +67,7 @@ impl MatchPattern for NormalizationRuleImpl { NormalizationRuleImpl::ConstantCalculation => ConstantCalculation.pattern(), NormalizationRuleImpl::ExpressionRemapper => ExpressionRemapper.pattern(), NormalizationRuleImpl::EvaluatorBind => EvaluatorBind.pattern(), + NormalizationRuleImpl::TopK => TopK.pattern(), } } } @@ -94,6 +98,7 @@ impl NormalizationRule for NormalizationRuleImpl { NormalizationRuleImpl::ConstantCalculation => ConstantCalculation.apply(node_id, graph), NormalizationRuleImpl::ExpressionRemapper => ExpressionRemapper.apply(node_id, graph), NormalizationRuleImpl::EvaluatorBind => EvaluatorBind.apply(node_id, graph), + NormalizationRuleImpl::TopK => TopK.apply(node_id, graph), } } } diff --git a/src/optimizer/rule/normalization/top_k.rs b/src/optimizer/rule/normalization/top_k.rs new file mode 100644 index 00000000..cad55b60 --- /dev/null +++ b/src/optimizer/rule/normalization/top_k.rs @@ -0,0 +1,46 @@ +use crate::errors::DatabaseError; +use crate::optimizer::core::pattern::Pattern; +use crate::optimizer::core::pattern::PatternChildrenPredicate; +use crate::optimizer::core::rule::{MatchPattern, NormalizationRule}; +use crate::optimizer::heuristic::graph::{HepGraph, HepNodeId}; +use crate::planner::operator::top_k::TopKOperator; +use crate::planner::operator::Operator; +use std::sync::LazyLock; + +static TOP_K_RULE: LazyLock = LazyLock::new(|| Pattern { + predicate: |op| matches!(op, Operator::Limit(_)), + children: PatternChildrenPredicate::Predicate(vec![Pattern { + predicate: |op| matches!(op, Operator::Sort(_)), + children: PatternChildrenPredicate::None, + }]), +}); + +pub struct TopK; + +impl MatchPattern for TopK { + fn pattern(&self) -> &Pattern { + &TOP_K_RULE + } +} + +impl NormalizationRule for TopK { + fn apply(&self, node_id: HepNodeId, graph: &mut HepGraph) -> Result<(), DatabaseError> { + if let Operator::Limit(op) = graph.operator(node_id) { + if let Some(child_id) = graph.eldest_child_at(node_id) { + if let Operator::Sort(child_op) = graph.operator(child_id) { + graph.replace_node( + node_id, + Operator::TopK(TopKOperator { + sort_fields: child_op.sort_fields.clone(), + limit: op.limit, + offset: op.offset, + }), + ); + graph.remove_node(child_id, false); + } + } + } + + Ok(()) + } +} diff --git a/src/planner/mod.rs b/src/planner/mod.rs index c8f12976..c23d196b 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -131,7 +131,7 @@ impl LogicalPlan { mut childrens_iter: ChildrensIter, ) -> SchemaOutput { match operator { - Operator::Filter(_) | Operator::Sort(_) | Operator::Limit(_) => { + Operator::Filter(_) | Operator::Sort(_) | Operator::Limit(_) | Operator::TopK(_) => { childrens_iter.next().unwrap().output_schema_direct() } Operator::Aggregate(op) => SchemaOutput::Schema( diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 67ddf593..611b0c9c 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -20,6 +20,7 @@ pub mod limit; pub mod project; pub mod sort; pub mod table_scan; +pub mod top_k; pub mod truncate; pub mod union; pub mod update; @@ -48,6 +49,7 @@ use crate::planner::operator::except::ExceptOperator; use crate::planner::operator::function_scan::FunctionScanOperator; use crate::planner::operator::insert::InsertOperator; use crate::planner::operator::join::JoinCondition; +use crate::planner::operator::top_k::TopKOperator; use crate::planner::operator::truncate::TruncateOperator; use crate::planner::operator::union::UnionOperator; use crate::planner::operator::update::UpdateOperator; @@ -70,6 +72,7 @@ pub enum Operator { FunctionScan(FunctionScanOperator), Sort(SortOperator), Limit(LimitOperator), + TopK(TopKOperator), Values(ValuesOperator), ShowTable, ShowView, @@ -145,7 +148,7 @@ impl Operator { .map(|column| ScalarExpression::ColumnRef(column.clone())) .collect_vec(), ), - Operator::Sort(_) | Operator::Limit(_) => None, + Operator::Sort(_) | Operator::Limit(_) | Operator::TopK(_) => None, Operator::Values(ValuesOperator { schema_ref, .. }) | Operator::Union(UnionOperator { left_schema_ref: schema_ref, @@ -233,6 +236,12 @@ impl Operator { .map(|field| &field.expr) .flat_map(|expr| expr.referenced_columns(only_column_ref)) .collect_vec(), + Operator::TopK(op) => op + .sort_fields + .iter() + .map(|field| &field.expr) + .flat_map(|expr| expr.referenced_columns(only_column_ref)) + .collect_vec(), Operator::Values(ValuesOperator { schema_ref, .. }) => Vec::clone(schema_ref), Operator::Union(UnionOperator { left_schema_ref, @@ -283,6 +292,7 @@ impl fmt::Display for Operator { Operator::FunctionScan(op) => write!(f, "{}", op), Operator::Sort(op) => write!(f, "{}", op), Operator::Limit(op) => write!(f, "{}", op), + Operator::TopK(op) => write!(f, "{}", op), Operator::Values(op) => write!(f, "{}", op), Operator::ShowTable => write!(f, "Show Tables"), Operator::ShowView => write!(f, "Show Views"), diff --git a/src/planner/operator/top_k.rs b/src/planner/operator/top_k.rs new file mode 100644 index 00000000..66b103c8 --- /dev/null +++ b/src/planner/operator/top_k.rs @@ -0,0 +1,49 @@ +use super::Operator; +use crate::planner::operator::sort::SortField; +use crate::planner::{Childrens, LogicalPlan}; +use itertools::Itertools; +use kite_sql_serde_macros::ReferenceSerialization; +use std::fmt; +use std::fmt::Formatter; + +#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] +pub struct TopKOperator { + pub sort_fields: Vec, + pub limit: Option, + pub offset: Option, +} + +impl TopKOperator { + pub fn build( + sort_fields: Vec, + limit: Option, + offset: Option, + children: LogicalPlan, + ) -> LogicalPlan { + LogicalPlan::new( + Operator::TopK(TopKOperator { + sort_fields, + limit, + offset, + }), + Childrens::Only(children), + ) + } +} + +impl fmt::Display for TopKOperator { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + if let Some(limit) = self.limit { + write!(f, "Top {}, ", limit)?; + } + + let sort_fields = self + .sort_fields + .iter() + .map(|sort_field| format!("{}", sort_field)) + .join(", "); + write!(f, "Sort By {}", sort_fields)?; + + Ok(()) + } +} From 1f4ef86cb8255a9f14c7eb7b954d2a4165abcbcf Mon Sep 17 00:00:00 2001 From: wszhdshys <1925792291@qq.com> Date: Tue, 5 Aug 2025 16:08:25 +0800 Subject: [PATCH 2/6] Optimized logic --- src/execution/dql/top_k.rs | 134 +++++++++++----------- src/optimizer/rule/normalization/top_k.rs | 12 +- src/planner/operator/top_k.rs | 10 +- 3 files changed, 81 insertions(+), 75 deletions(-) diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index 05b847f0..cc7f102f 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -1,5 +1,4 @@ use crate::errors::DatabaseError; -use crate::execution::dql::sort::{BumpVec, NullableVec, RemappingIterator}; use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::operator::sort::SortField; use crate::planner::operator::top_k::TopKOperator; @@ -8,8 +7,8 @@ use crate::storage::table_codec::BumpBytes; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{Schema, Tuple}; +use ahash::{HashMap, HashMapExt}; use bumpalo::Bump; -use std::cmp::Reverse; use std::collections::BinaryHeap; use std::ops::Coroutine; use std::ops::CoroutineState; @@ -19,69 +18,64 @@ fn top_sort<'a>( arena: &'a Bump, schema: &Schema, sort_fields: &[SortField], - tuples: NullableVec<'a, (usize, Tuple)>, - limit: Option, - offset: Option, -) -> Result + 'a>, DatabaseError> { - let mut sort_keys = BumpVec::with_capacity_in(tuples.len(), arena); - for (i, tuple) in tuples.0.iter().enumerate() { - let mut full_key = BumpVec::new_in(arena); - for SortField { - expr, - nulls_first, - asc, - } in sort_fields - { - let mut key = BumpBytes::new_in(arena); - let tuple = tuple.as_ref().map(|(_, tuple)| tuple).unwrap(); - expr.eval(Some((tuple, &**schema)))? - .memcomparable_encode(&mut key)?; - if *asc { - for byte in key.iter_mut() { - *byte ^= 0xFF; - } + indices: &mut BinaryHeap<(Vec, usize)>, + tuples: &mut HashMap, + tuple: Tuple, + keep_count: usize, + index: usize, +) -> Result<(), DatabaseError> { + let mut full_key = vec![]; + for SortField { + expr, + nulls_first, + asc, + } in sort_fields + { + let mut key = BumpBytes::new_in(arena); + expr.eval(Some((&tuple, &**schema)))? + .memcomparable_encode(&mut key)?; + if !asc { + for byte in key.iter_mut() { + *byte ^= 0xFF; } - key.push(if *nulls_first { u8::MIN } else { u8::MAX }); - full_key.extend(key); } - //full_key.extend_from_slice(&(i as u64).to_be_bytes()); - sort_keys.push((i, full_key)) + key.push(if *nulls_first { u8::MIN } else { u8::MAX }); + full_key.extend(key); } - let keep_count = offset.unwrap_or(0) + limit.unwrap_or(sort_keys.len()); - - let mut heap: BinaryHeap> = BinaryHeap::with_capacity(keep_count); - for (i, key) in sort_keys.iter() { - let key = key.as_slice(); - if heap.len() < keep_count { - heap.push(Reverse((key, *i))); - } else if let Some(&Reverse((min_key, _))) = heap.peek() { - if key > min_key { - heap.pop(); - heap.push(Reverse((key, *i))); - } + if indices.len() < keep_count { + indices.push((full_key, index)); + tuples.insert(index, tuple); + } else if let Some((min_key, i)) = indices.peek() { + let pop_index = *i; + if full_key.as_slice() < min_key.as_slice() { + indices.pop(); + indices.push((full_key, index)); + tuples.remove(&pop_index); + tuples.insert(index, tuple); } } + Ok(()) +} - let mut topk: Vec<(Vec, usize)> = heap - .into_iter() - .map(|Reverse((key, i))| (key.to_vec(), i)) - .collect(); - topk.sort_by(|(k1, i1), (k2, i2)| k1.cmp(k2).then_with(|| i1.cmp(i2).reverse())); +fn final_sort( + indices: BinaryHeap<(Vec, usize)>, + mut tuples: HashMap, +) -> Vec { + let mut topk: Vec<(Vec, usize)> = indices.into_iter().map(|(key, i)| (key, i)).collect(); + topk.sort_by(|(k1, i1), (k2, i2)| k2.cmp(k1).then_with(|| i1.cmp(i2).reverse())); topk.reverse(); - - let mut bumped_indices = - BumpVec::with_capacity_in(topk.len().saturating_sub(offset.unwrap_or(0)), arena); - for (_, idx) in topk.into_iter().skip(offset.unwrap_or(0)) { - bumped_indices.push(idx); - } - Ok(Box::new(RemappingIterator::new(0, tuples, bumped_indices))) + Vec::from( + topk.into_iter() + .map(|(_, index)| tuples.remove(&index).unwrap()) + .collect::>(), + ) } pub struct TopK { arena: Bump, sort_fields: Vec, - limit: Option, + limit: usize, offset: Option, input: LogicalPlan, } @@ -126,26 +120,36 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK { let arena: *const Bump = &arena; - let mut tuples = NullableVec::new(unsafe { &*arena }); let schema = input.output_schema().clone(); - let mut tuple_offset = 0; + let keep_count = offset.unwrap_or(0) + limit; + let mut indices: BinaryHeap<(Vec, usize)> = + BinaryHeap::with_capacity(keep_count); + let mut tuples: HashMap = HashMap::with_capacity(keep_count); let mut coroutine = build_read(input, cache, transaction); + let mut i: usize = 0; while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - tuples.put((tuple_offset, throw!(tuple))); - tuple_offset += 1; + throw!(top_sort( + unsafe { &*arena }, + &schema, + &sort_fields, + &mut indices, + &mut tuples, + throw!(tuple), + keep_count, + i + )); + i += 1; } - for tuple in throw!(top_sort( - unsafe { &*arena }, - &schema, - &sort_fields, - tuples, - limit, - offset - )) { - yield Ok(tuple) + i = 0; + for tuple in final_sort(indices, tuples) { + i += 1; + if i - 1 < offset.unwrap_or(0) { + continue; + } + yield Ok(tuple); } }, ) diff --git a/src/optimizer/rule/normalization/top_k.rs b/src/optimizer/rule/normalization/top_k.rs index cad55b60..7950c498 100644 --- a/src/optimizer/rule/normalization/top_k.rs +++ b/src/optimizer/rule/normalization/top_k.rs @@ -26,21 +26,21 @@ impl MatchPattern for TopK { impl NormalizationRule for TopK { fn apply(&self, node_id: HepNodeId, graph: &mut HepGraph) -> Result<(), DatabaseError> { if let Operator::Limit(op) = graph.operator(node_id) { - if let Some(child_id) = graph.eldest_child_at(node_id) { - if let Operator::Sort(child_op) = graph.operator(child_id) { + if let Some(limit) = op.limit { + let sort_id = graph.eldest_child_at(node_id).unwrap(); + if let Operator::Sort(sort_op) = graph.operator(sort_id) { graph.replace_node( node_id, Operator::TopK(TopKOperator { - sort_fields: child_op.sort_fields.clone(), - limit: op.limit, + sort_fields: sort_op.sort_fields.clone(), + limit, offset: op.offset, }), ); - graph.remove_node(child_id, false); + graph.remove_node(sort_id, false); } } } - Ok(()) } } diff --git a/src/planner/operator/top_k.rs b/src/planner/operator/top_k.rs index 66b103c8..3d9788c7 100644 --- a/src/planner/operator/top_k.rs +++ b/src/planner/operator/top_k.rs @@ -9,14 +9,14 @@ use std::fmt::Formatter; #[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] pub struct TopKOperator { pub sort_fields: Vec, - pub limit: Option, + pub limit: usize, pub offset: Option, } impl TopKOperator { pub fn build( sort_fields: Vec, - limit: Option, + limit: usize, offset: Option, children: LogicalPlan, ) -> LogicalPlan { @@ -33,8 +33,10 @@ impl TopKOperator { impl fmt::Display for TopKOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - if let Some(limit) = self.limit { - write!(f, "Top {}, ", limit)?; + write!(f, "Top {}, ", self.limit)?; + + if let Some(offset) = self.offset { + write!(f, "Offset {}, ", offset)?; } let sort_fields = self From 7d8b4df7a6556cec060800342db157494cec13fb Mon Sep 17 00:00:00 2001 From: wszhdshys <1925792291@qq.com> Date: Tue, 5 Aug 2025 18:23:11 +0800 Subject: [PATCH 3/6] Optimized performance and added TOPK related tests. Improved the display of TOPK's plan --- src/db.rs | 1 + src/execution/dql/top_k.rs | 637 +++++++++++++++++- src/optimizer/rule/implementation/dql/mod.rs | 1 + .../rule/implementation/dql/top_k.rs | 19 + src/optimizer/rule/implementation/mod.rs | 6 + src/planner/operator/mod.rs | 2 + src/planner/operator/top_k.rs | 2 +- 7 files changed, 658 insertions(+), 10 deletions(-) create mode 100644 src/optimizer/rule/implementation/dql/top_k.rs diff --git a/src/db.rs b/src/db.rs index 6d52d882..69ef34f2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -254,6 +254,7 @@ impl State { ImplementationRuleImpl::IndexScan, ImplementationRuleImpl::FunctionScan, ImplementationRuleImpl::Sort, + ImplementationRuleImpl::TopK, ImplementationRuleImpl::Values, // DML ImplementationRuleImpl::Analyze, diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index cc7f102f..7379ed51 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -59,17 +59,15 @@ fn top_sort<'a>( } fn final_sort( - indices: BinaryHeap<(Vec, usize)>, + mut indices: BinaryHeap<(Vec, usize)>, mut tuples: HashMap, ) -> Vec { - let mut topk: Vec<(Vec, usize)> = indices.into_iter().map(|(key, i)| (key, i)).collect(); - topk.sort_by(|(k1, i1), (k2, i2)| k2.cmp(k1).then_with(|| i1.cmp(i2).reverse())); - topk.reverse(); - Vec::from( - topk.into_iter() - .map(|(_, index)| tuples.remove(&index).unwrap()) - .collect::>(), - ) + let mut result = Vec::with_capacity(indices.len()); + while let Some((_, index)) = indices.pop() { + result.push(tuples.remove(&index).unwrap()); + } + result.reverse(); + result } pub struct TopK { @@ -155,3 +153,624 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK { ) } } + +#[cfg(test)] +mod test { + use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; + use crate::errors::DatabaseError; + use crate::execution::dql::top_k::{final_sort, top_sort}; + use crate::expression::ScalarExpression; + use crate::planner::operator::sort::SortField; + use crate::types::tuple::Tuple; + use crate::types::value::DataValue; + use crate::types::LogicalType; + use ahash::{HashMap, HashMapExt}; + use bumpalo::Bump; + use std::collections::BinaryHeap; + use std::sync::Arc; + + #[test] + fn test_top_k_sort() -> Result<(), DatabaseError> { + let fn_sort_fields = |asc: bool, nulls_first: bool| { + vec![SortField { + expr: ScalarExpression::Reference { + expr: Box::new(ScalarExpression::Empty), + pos: 0, + }, + asc, + nulls_first, + }] + }; + let schema = Arc::new(vec![ColumnRef::from(ColumnCatalog::new( + "c1".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), + ))]); + + let arena = Bump::new(); + + let fn_asc_and_nulls_last_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0)]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1)]) + } else { + unreachable!() + } + }; + let fn_desc_and_nulls_last_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1)]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0)]) + } else { + unreachable!() + } + }; + let fn_asc_and_nulls_first_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0)]) + } else { + unreachable!() + } + }; + let fn_desc_and_nulls_first_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1)]) + } else { + unreachable!() + } + }; + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); + let mut tuples: HashMap = HashMap::with_capacity(2); + top_sort( + &arena, + &schema, + &*fn_sort_fields(true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null]), + 2, + 0, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0)]), + 2, + 1, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1)]), + 2, + 2, + )?; + fn_asc_and_nulls_first_eq(Box::new(final_sort(indices, tuples).into_iter())); + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); + let mut tuples: HashMap = HashMap::with_capacity(2); + top_sort( + &arena, + &schema, + &*fn_sort_fields(true, false), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null]), + 2, + 0, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(true, false), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0)]), + 2, + 1, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(true, false), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1)]), + 2, + 2, + )?; + fn_asc_and_nulls_last_eq(Box::new(final_sort(indices, tuples).into_iter())); + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); + let mut tuples: HashMap = HashMap::with_capacity(2); + top_sort( + &arena, + &schema, + &*fn_sort_fields(false, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null]), + 2, + 0, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(false, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0)]), + 2, + 1, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(false, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1)]), + 2, + 2, + )?; + fn_desc_and_nulls_first_eq(Box::new(final_sort(indices, tuples).into_iter())); + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); + let mut tuples: HashMap = HashMap::with_capacity(2); + top_sort( + &arena, + &schema, + &*fn_sort_fields(false, false), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null]), + 2, + 0, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(false, false), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0)]), + 2, + 1, + )?; + top_sort( + &arena, + &schema, + &*fn_sort_fields(false, false), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1)]), + 2, + 2, + )?; + fn_desc_and_nulls_last_eq(Box::new(final_sort(indices, tuples).into_iter())); + + Ok(()) + } + + #[test] + fn test_top_k_sort_mix_values() -> Result<(), DatabaseError> { + let fn_sort_fields = + |asc_1: bool, nulls_first_1: bool, asc_2: bool, nulls_first_2: bool| { + vec![ + SortField { + expr: ScalarExpression::Reference { + expr: Box::new(ScalarExpression::Empty), + pos: 0, + }, + asc: asc_1, + nulls_first: nulls_first_1, + }, + SortField { + expr: ScalarExpression::Reference { + expr: Box::new(ScalarExpression::Empty), + pos: 0, + }, + asc: asc_2, + nulls_first: nulls_first_2, + }, + ] + }; + let schema = Arc::new(vec![ + ColumnRef::from(ColumnCatalog::new( + "c1".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), + )), + ColumnRef::from(ColumnCatalog::new( + "c2".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), + )), + ]); + let arena = Bump::new(); + + let fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Int32(0)]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Int32(0)]) + } else { + unreachable!() + } + }; + let fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Int32(0)]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(0)]) + } else { + unreachable!() + } + }; + let fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Int32(0)]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(0)]) + } else { + unreachable!() + } + }; + let fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(0)]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Null]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Int32(0)]) + } else { + unreachable!() + } + }; + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); + let mut tuples: HashMap = HashMap::with_capacity(4); + top_sort( + &arena, + &schema, + &fn_sort_fields(true, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Null]), + 4, + 0, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), + 4, + 1, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), + 4, + 2, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), + 4, + 3, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), + 4, + 5, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), + 4, + 6, + )?; + fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(Box::new( + final_sort(indices, tuples).into_iter(), + )); + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); + let mut tuples: HashMap = HashMap::with_capacity(4); + top_sort( + &arena, + &schema, + &fn_sort_fields(true, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Null]), + 4, + 0, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), + 4, + 1, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), + 4, + 2, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), + 4, + 3, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), + 4, + 5, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(true, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), + 4, + 6, + )?; + fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(Box::new( + final_sort(indices, tuples).into_iter(), + )); + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); + let mut tuples: HashMap = HashMap::with_capacity(4); + top_sort( + &arena, + &schema, + &fn_sort_fields(false, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Null]), + 4, + 0, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), + 4, + 1, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), + 4, + 2, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), + 4, + 3, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), + 4, + 5, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, true, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), + 4, + 6, + )?; + fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(Box::new( + final_sort(indices, tuples).into_iter(), + )); + + let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); + let mut tuples: HashMap = HashMap::with_capacity(4); + top_sort( + &arena, + &schema, + &fn_sort_fields(false, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Null]), + 4, + 0, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), + 4, + 1, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), + 4, + 2, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), + 4, + 3, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), + 4, + 5, + )?; + top_sort( + &arena, + &schema, + &fn_sort_fields(false, false, true, true), + &mut indices, + &mut tuples, + Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), + 4, + 6, + )?; + fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(Box::new( + final_sort(indices, tuples).into_iter(), + )); + + Ok(()) + } +} diff --git a/src/optimizer/rule/implementation/dql/mod.rs b/src/optimizer/rule/implementation/dql/mod.rs index 23a286e2..2907c60f 100644 --- a/src/optimizer/rule/implementation/dql/mod.rs +++ b/src/optimizer/rule/implementation/dql/mod.rs @@ -7,4 +7,5 @@ pub(crate) mod limit; pub(crate) mod projection; pub(crate) mod sort; pub(crate) mod table_scan; +pub(crate) mod top_k; pub(crate) mod values; diff --git a/src/optimizer/rule/implementation/dql/top_k.rs b/src/optimizer/rule/implementation/dql/top_k.rs new file mode 100644 index 00000000..ddb2f8b8 --- /dev/null +++ b/src/optimizer/rule/implementation/dql/top_k.rs @@ -0,0 +1,19 @@ +use crate::errors::DatabaseError; +use crate::optimizer::core::memo::{Expression, GroupExpression}; +use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; +use crate::optimizer::core::rule::{ImplementationRule, MatchPattern}; +use crate::optimizer::core::statistics_meta::StatisticMetaLoader; +use crate::planner::operator::{Operator, PhysicalOption}; +use crate::single_mapping; +use crate::storage::Transaction; +use std::sync::LazyLock; + +static TOPK_PATTERN: LazyLock = LazyLock::new(|| Pattern { + predicate: |op| matches!(op, Operator::TopK(_)), + children: PatternChildrenPredicate::None, +}); + +#[derive(Clone)] +pub struct TopKImplementation; + +single_mapping!(TopKImplementation, TOPK_PATTERN, PhysicalOption::TopK); diff --git a/src/optimizer/rule/implementation/mod.rs b/src/optimizer/rule/implementation/mod.rs index a511fe82..09e63837 100644 --- a/src/optimizer/rule/implementation/mod.rs +++ b/src/optimizer/rule/implementation/mod.rs @@ -32,6 +32,7 @@ use crate::optimizer::rule::implementation::dql::sort::SortImplementation; use crate::optimizer::rule::implementation::dql::table_scan::{ IndexScanImplementation, SeqScanImplementation, }; +use crate::optimizer::rule::implementation::dql::top_k::TopKImplementation; use crate::optimizer::rule::implementation::dql::values::ValuesImplementation; use crate::planner::operator::Operator; use crate::storage::Transaction; @@ -50,6 +51,7 @@ pub enum ImplementationRuleImpl { FunctionScan, IndexScan, Sort, + TopK, Values, // DML Analyze, @@ -80,6 +82,7 @@ impl MatchPattern for ImplementationRuleImpl { ImplementationRuleImpl::IndexScan => IndexScanImplementation.pattern(), ImplementationRuleImpl::FunctionScan => FunctionScanImplementation.pattern(), ImplementationRuleImpl::Sort => SortImplementation.pattern(), + ImplementationRuleImpl::TopK => TopKImplementation.pattern(), ImplementationRuleImpl::Values => ValuesImplementation.pattern(), ImplementationRuleImpl::CopyFromFile => CopyFromFileImplementation.pattern(), ImplementationRuleImpl::CopyToFile => CopyToFileImplementation.pattern(), @@ -137,6 +140,9 @@ impl ImplementationRule for ImplementationRuleImpl { ImplementationRuleImpl::Sort => { SortImplementation.to_expression(operator, loader, group_expr)? } + ImplementationRuleImpl::TopK => { + TopKImplementation.to_expression(operator, loader, group_expr)? + } ImplementationRuleImpl::Values => { ValuesImplementation.to_expression(operator, loader, group_expr)? } diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 611b0c9c..79317967 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -114,6 +114,7 @@ pub enum PhysicalOption { IndexScan(IndexInfo), Sort, Limit, + TopK, Values, Insert, Update, @@ -334,6 +335,7 @@ impl fmt::Display for PhysicalOption { PhysicalOption::IndexScan(index) => write!(f, "IndexScan By {}", index), PhysicalOption::Sort => write!(f, "Sort"), PhysicalOption::Limit => write!(f, "Limit"), + PhysicalOption::TopK => write!(f, "TopK"), PhysicalOption::Values => write!(f, "Values"), PhysicalOption::Insert => write!(f, "Insert"), PhysicalOption::Update => write!(f, "Update"), diff --git a/src/planner/operator/top_k.rs b/src/planner/operator/top_k.rs index 3d9788c7..25f8e3b1 100644 --- a/src/planner/operator/top_k.rs +++ b/src/planner/operator/top_k.rs @@ -33,7 +33,7 @@ impl TopKOperator { impl fmt::Display for TopKOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "Top {}, ", self.limit)?; + write!(f, "Top {}, ", self.limit)?; if let Some(offset) = self.offset { write!(f, "Offset {}, ", offset)?; From 859da09cb1bf3689a3cd4f9f5c8dd6ad9b1c1d32 Mon Sep 17 00:00:00 2001 From: Kould Date: Tue, 5 Aug 2025 23:12:20 +0800 Subject: [PATCH 4/6] chore: codefmt --- src/execution/dql/sort.rs | 2 +- src/execution/dql/top_k.rs | 373 +++++++++++++++++-------------------- src/expression/mod.rs | 6 +- 3 files changed, 173 insertions(+), 208 deletions(-) diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index fbc6f740..8a9bb1d7 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -494,7 +494,7 @@ mod test { SortField { expr: ScalarExpression::Reference { expr: Box::new(ScalarExpression::Empty), - pos: 0, + pos: 1, }, asc: asc_2, nulls_first: nulls_first_2, diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index 7379ed51..4a5bab88 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -1,4 +1,5 @@ use crate::errors::DatabaseError; +use crate::execution::dql::sort::BumpVec; use crate::execution::{build_read, Executor, ReadExecutor}; use crate::planner::operator::sort::SortField; use crate::planner::operator::top_k::TopKOperator; @@ -7,24 +8,41 @@ use crate::storage::table_codec::BumpBytes; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; use crate::types::tuple::{Schema, Tuple}; -use ahash::{HashMap, HashMapExt}; use bumpalo::Bump; -use std::collections::BinaryHeap; +use std::cmp::Ordering; +use std::collections::BTreeSet; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; +#[derive(Eq, PartialEq, Debug)] +struct CmpItem<'a> { + key: BumpVec<'a, u8>, + tuple: Tuple, +} + +impl Ord for CmpItem<'_> { + fn cmp(&self, other: &Self) -> Ordering { + self.key.cmp(&other.key) + } +} + +impl PartialOrd for CmpItem<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[allow(clippy::mutable_key_type)] fn top_sort<'a>( arena: &'a Bump, schema: &Schema, sort_fields: &[SortField], - indices: &mut BinaryHeap<(Vec, usize)>, - tuples: &mut HashMap, + heap: &mut BTreeSet>, tuple: Tuple, keep_count: usize, - index: usize, ) -> Result<(), DatabaseError> { - let mut full_key = vec![]; + let mut full_key = BumpBytes::new_in(arena); for SortField { expr, nulls_first, @@ -43,33 +61,23 @@ fn top_sort<'a>( full_key.extend(key); } - if indices.len() < keep_count { - indices.push((full_key, index)); - tuples.insert(index, tuple); - } else if let Some((min_key, i)) = indices.peek() { - let pop_index = *i; - if full_key.as_slice() < min_key.as_slice() { - indices.pop(); - indices.push((full_key, index)); - tuples.remove(&pop_index); - tuples.insert(index, tuple); + if heap.len() < keep_count { + heap.insert(CmpItem { + key: full_key, + tuple, + }); + } else if let Some(cmp_item) = heap.last() { + if full_key.as_slice() < cmp_item.key.as_slice() { + heap.pop_last(); + heap.insert(CmpItem { + key: full_key, + tuple, + }); } } Ok(()) } -fn final_sort( - mut indices: BinaryHeap<(Vec, usize)>, - mut tuples: HashMap, -) -> Vec { - let mut result = Vec::with_capacity(indices.len()); - while let Some((_, index)) = indices.pop() { - result.push(tuples.remove(&index).unwrap()); - } - result.reverse(); - result -} - pub struct TopK { arena: Bump, sort_fields: Vec, @@ -100,6 +108,7 @@ impl From<(TopKOperator, LogicalPlan)> for TopK { } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK { + #[allow(clippy::mutable_key_type)] fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), @@ -120,34 +129,28 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK { let schema = input.output_schema().clone(); let keep_count = offset.unwrap_or(0) + limit; - let mut indices: BinaryHeap<(Vec, usize)> = - BinaryHeap::with_capacity(keep_count); - let mut tuples: HashMap = HashMap::with_capacity(keep_count); - + let mut set = BTreeSet::new(); let mut coroutine = build_read(input, cache, transaction); - let mut i: usize = 0; while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { throw!(top_sort( unsafe { &*arena }, &schema, &sort_fields, - &mut indices, - &mut tuples, + &mut set, throw!(tuple), keep_count, - i )); - i += 1; } - i = 0; - for tuple in final_sort(indices, tuples) { + let mut i: usize = 0; + + while let Some(item) = set.pop_first() { i += 1; if i - 1 < offset.unwrap_or(0) { continue; } - yield Ok(tuple); + yield Ok(item.tuple); } }, ) @@ -158,15 +161,14 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK { mod test { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; use crate::errors::DatabaseError; - use crate::execution::dql::top_k::{final_sort, top_sort}; + use crate::execution::dql::top_k::{top_sort, CmpItem}; use crate::expression::ScalarExpression; use crate::planner::operator::sort::SortField; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; - use ahash::{HashMap, HashMapExt}; use bumpalo::Bump; - use std::collections::BinaryHeap; + use std::collections::BTreeSet; use std::sync::Arc; #[test] @@ -189,190 +191,167 @@ mod test { let arena = Bump::new(); - let fn_asc_and_nulls_last_eq = |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0)]) + let fn_asc_and_nulls_last_eq = |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Int32(0)]) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Int32(1)]) } else { unreachable!() } }; - let fn_desc_and_nulls_last_eq = |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1)]) + let fn_desc_and_nulls_last_eq = |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Int32(1)]) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Int32(0)]) } else { unreachable!() } }; - let fn_asc_and_nulls_first_eq = |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Null]) + let fn_asc_and_nulls_first_eq = |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Null]) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Int32(0)]) } else { unreachable!() } }; - let fn_desc_and_nulls_first_eq = |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Null]) + let fn_desc_and_nulls_first_eq = |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Null]) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Int32(1)]) } else { unreachable!() } }; - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); - let mut tuples: HashMap = HashMap::with_capacity(2); + let mut indices = BTreeSet::new(); + top_sort( &arena, &schema, &*fn_sort_fields(true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null]), 2, - 0, )?; top_sort( &arena, &schema, &*fn_sort_fields(true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0)]), 2, - 1, )?; top_sort( &arena, &schema, &*fn_sort_fields(true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1)]), 2, - 2, )?; - fn_asc_and_nulls_first_eq(Box::new(final_sort(indices, tuples).into_iter())); + println!("{:#?}", indices); + fn_asc_and_nulls_first_eq(indices); + + let mut indices = BTreeSet::new(); - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); - let mut tuples: HashMap = HashMap::with_capacity(2); top_sort( &arena, &schema, &*fn_sort_fields(true, false), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null]), 2, - 0, )?; top_sort( &arena, &schema, &*fn_sort_fields(true, false), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0)]), 2, - 1, )?; top_sort( &arena, &schema, &*fn_sort_fields(true, false), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1)]), 2, - 2, )?; - fn_asc_and_nulls_last_eq(Box::new(final_sort(indices, tuples).into_iter())); + fn_asc_and_nulls_last_eq(indices); + + let mut indices = BTreeSet::new(); - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); - let mut tuples: HashMap = HashMap::with_capacity(2); top_sort( &arena, &schema, &*fn_sort_fields(false, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null]), 2, - 0, )?; top_sort( &arena, &schema, &*fn_sort_fields(false, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0)]), 2, - 1, )?; top_sort( &arena, &schema, &*fn_sort_fields(false, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1)]), 2, - 2, )?; - fn_desc_and_nulls_first_eq(Box::new(final_sort(indices, tuples).into_iter())); + fn_desc_and_nulls_first_eq(indices); + + let mut indices = BTreeSet::new(); - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(2); - let mut tuples: HashMap = HashMap::with_capacity(2); top_sort( &arena, &schema, &*fn_sort_fields(false, false), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null]), 2, - 0, )?; top_sort( &arena, &schema, &*fn_sort_fields(false, false), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0)]), 2, - 1, )?; top_sort( &arena, &schema, &*fn_sort_fields(false, false), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1)]), 2, - 2, )?; - fn_desc_and_nulls_last_eq(Box::new(final_sort(indices, tuples).into_iter())); + fn_desc_and_nulls_last_eq(indices); Ok(()) } @@ -393,7 +372,7 @@ mod test { SortField { expr: ScalarExpression::Reference { expr: Box::new(ScalarExpression::Empty), - pos: 0, + pos: 1, }, asc: asc_2, nulls_first: nulls_first_2, @@ -415,361 +394,347 @@ mod test { let arena = Bump::new(); let fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = - |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Null]) + |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Null, DataValue::Null]) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Null, DataValue::Int32(0)] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Null]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(0), DataValue::Null] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(0), DataValue::Int32(0)] + ) } else { unreachable!() } }; let fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq = - |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Null]) + |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(0), DataValue::Null] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(0), DataValue::Int32(0)] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Null]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(1), DataValue::Null] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(1), DataValue::Int32(0)] + ) } else { unreachable!() } }; let fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = - |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Null]) + |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!(reverse.tuple.values, vec![DataValue::Null, DataValue::Null]) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Null, DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Null, DataValue::Int32(0)] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Null]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(1), DataValue::Null] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(1), DataValue::Int32(0)] + ) } else { unreachable!() } }; let fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq = - |mut iter: Box>| { - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Null]) + |mut heap: BTreeSet>| { + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(1), DataValue::Null] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(1), DataValue::Int32(0)] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Null]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(0), DataValue::Null] + ) } else { unreachable!() } - if let Some(tuple) = iter.next() { - assert_eq!(tuple.values, vec![DataValue::Int32(0), DataValue::Int32(0)]) + if let Some(reverse) = heap.pop_first() { + assert_eq!( + reverse.tuple.values, + vec![DataValue::Int32(0), DataValue::Int32(0)] + ) } else { unreachable!() } }; - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); - let mut tuples: HashMap = HashMap::with_capacity(4); + let mut indices = BTreeSet::new(); + top_sort( &arena, &schema, &fn_sort_fields(true, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Null]), 4, - 0, )?; top_sort( &arena, &schema, &fn_sort_fields(true, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), 4, - 1, )?; top_sort( &arena, &schema, &fn_sort_fields(true, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), 4, - 2, )?; top_sort( &arena, &schema, &fn_sort_fields(true, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), 4, - 3, )?; top_sort( &arena, &schema, &fn_sort_fields(true, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), 4, - 5, )?; top_sort( &arena, &schema, &fn_sort_fields(true, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), 4, - 6, )?; - fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(Box::new( - final_sort(indices, tuples).into_iter(), - )); + fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(indices); + + let mut indices = BTreeSet::new(); - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); - let mut tuples: HashMap = HashMap::with_capacity(4); top_sort( &arena, &schema, &fn_sort_fields(true, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Null]), 4, - 0, )?; top_sort( &arena, &schema, &fn_sort_fields(true, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), 4, - 1, )?; top_sort( &arena, &schema, &fn_sort_fields(true, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), 4, - 2, )?; top_sort( &arena, &schema, &fn_sort_fields(true, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), 4, - 3, )?; top_sort( &arena, &schema, &fn_sort_fields(true, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), 4, - 5, )?; top_sort( &arena, &schema, &fn_sort_fields(true, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), 4, - 6, )?; - fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(Box::new( - final_sort(indices, tuples).into_iter(), - )); + fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(indices); + + let mut indices = BTreeSet::new(); - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); - let mut tuples: HashMap = HashMap::with_capacity(4); top_sort( &arena, &schema, &fn_sort_fields(false, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Null]), 4, - 0, )?; top_sort( &arena, &schema, &fn_sort_fields(false, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), 4, - 1, )?; top_sort( &arena, &schema, &fn_sort_fields(false, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), 4, - 2, )?; top_sort( &arena, &schema, &fn_sort_fields(false, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), 4, - 3, )?; top_sort( &arena, &schema, &fn_sort_fields(false, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), 4, - 5, )?; top_sort( &arena, &schema, &fn_sort_fields(false, true, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), 4, - 6, )?; - fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(Box::new( - final_sort(indices, tuples).into_iter(), - )); + fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(indices); + + let mut indices = BTreeSet::new(); - let mut indices: BinaryHeap<(Vec, usize)> = BinaryHeap::with_capacity(4); - let mut tuples: HashMap = HashMap::with_capacity(4); top_sort( &arena, &schema, &fn_sort_fields(false, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Null]), 4, - 0, )?; top_sort( &arena, &schema, &fn_sort_fields(false, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Null]), 4, - 1, )?; top_sort( &arena, &schema, &fn_sort_fields(false, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Null]), 4, - 2, )?; top_sort( &arena, &schema, &fn_sort_fields(false, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Null, DataValue::Int32(0)]), 4, - 3, )?; top_sort( &arena, &schema, &fn_sort_fields(false, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]), 4, - 5, )?; top_sort( &arena, &schema, &fn_sort_fields(false, false, true, true), &mut indices, - &mut tuples, Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(0)]), 4, - 6, )?; - fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(Box::new( - final_sort(indices, tuples).into_iter(), - )); + fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(indices); Ok(()) } diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 3b20f204..912797a2 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -344,14 +344,14 @@ impl ScalarExpression { pub fn referenced_columns(&self, only_column_ref: bool) -> Vec { struct ColumnRefCollector(Vec); - impl<'a> Visitor<'a> for ColumnRefCollector { + impl Visitor<'_> for ColumnRefCollector { fn visit_column_ref(&mut self, col: &ColumnRef) -> Result<(), DatabaseError> { self.0.push(col.clone()); Ok(()) } } struct OutputColumnCollector(Vec); - impl<'a> Visitor<'a> for OutputColumnCollector { + impl Visitor<'_> for OutputColumnCollector { fn visit(&mut self, expr: &ScalarExpression) -> Result<(), DatabaseError> { self.0.push(expr.output_column()); walk_expr(self, expr) @@ -372,7 +372,7 @@ impl ScalarExpression { struct TableRefChecker { found: bool, } - impl<'a> Visitor<'a> for TableRefChecker { + impl Visitor<'_> for TableRefChecker { fn visit_column_ref(&mut self, col: &ColumnRef) -> Result<(), DatabaseError> { if col.table_name().is_some() && col.id().is_some() { self.found = true; From ddd06ace7f705853d719a076929a99bce6a1dbeb Mon Sep 17 00:00:00 2001 From: wszhdshys <1925792291@qq.com> Date: Wed, 6 Aug 2025 12:23:22 +0800 Subject: [PATCH 5/6] chore: codefmt --- src/execution/dql/top_k.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index 4a5bab88..352b9123 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -23,7 +23,10 @@ struct CmpItem<'a> { impl Ord for CmpItem<'_> { fn cmp(&self, other: &Self) -> Ordering { - self.key.cmp(&other.key) + match self.key.cmp(&other.key) { + Ordering::Equal => Ordering::Greater, + order => order, + } } } From edca453b6f7b1357ebe3bd8faba7861b049abdbc Mon Sep 17 00:00:00 2001 From: wszhdshys <1925792291@qq.com> Date: Wed, 6 Aug 2025 12:32:10 +0800 Subject: [PATCH 6/6] chore: codefmt --- src/execution/dql/top_k.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index 4a5bab88..39b63613 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -23,7 +23,7 @@ struct CmpItem<'a> { impl Ord for CmpItem<'_> { fn cmp(&self, other: &Self) -> Ordering { - self.key.cmp(&other.key) + self.key.cmp(&other.key).then_with(|| Ordering::Greater) } }