Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 74 additions & 46 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::execution::dql::join::joins_nullable;
use crate::expression::agg::AggKind;
use crate::expression::{AliasType, BinaryOperator};
use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::operator::except::ExceptOperator;
use crate::planner::operator::function_scan::FunctionScanOperator;
use crate::planner::operator::insert::InsertOperator;
use crate::planner::operator::join::JoinCondition;
Expand Down Expand Up @@ -181,56 +182,83 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
}
true
};
match (op, is_all) {
(SetOperator::Union, true) => {
let left_schema = left_plan.output_schema();
let right_schema = right_plan.output_schema();

if !fn_eq(left_schema, right_schema) {
return Err(DatabaseError::MisMatch(
"the output types on the left",
"the output types on the right",
));

let left_schema = left_plan.output_schema();
let right_schema = right_plan.output_schema();

if !fn_eq(left_schema, right_schema) {
return Err(DatabaseError::MisMatch(
"the output types on the left",
"the output types on the right",
));
}

match op {
SetOperator::Union => {
if is_all {
Ok(UnionOperator::build(
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
} else {
let distinct_exprs = left_schema
.iter()
.cloned()
.map(ScalarExpression::ColumnRef)
.collect_vec();

let union_op = Operator::Union(UnionOperator {
left_schema_ref: left_schema.clone(),
_right_schema_ref: right_schema.clone(),
});

Ok(self.bind_distinct(
LogicalPlan::new(
union_op,
Childrens::Twins {
left: left_plan,
right: right_plan,
},
),
distinct_exprs,
))
}
Ok(UnionOperator::build(
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
}
(SetOperator::Union, false) => {
let left_schema = left_plan.output_schema();
let right_schema = right_plan.output_schema();

if !fn_eq(left_schema, right_schema) {
return Err(DatabaseError::MisMatch(
"the output types on the left",
"the output types on the right",
));
SetOperator::Except => {
if is_all {
Ok(ExceptOperator::build(
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
} else {
let distinct_exprs = left_schema
.iter()
.cloned()
.map(ScalarExpression::ColumnRef)
.collect_vec();

let except_op = Operator::Except(ExceptOperator {
left_schema_ref: left_schema.clone(),
_right_schema_ref: right_schema.clone(),
});

Ok(self.bind_distinct(
LogicalPlan::new(
except_op,
Childrens::Twins {
left: left_plan,
right: right_plan,
},
),
distinct_exprs,
))
}
let union_op = Operator::Union(UnionOperator {
left_schema_ref: left_schema.clone(),
_right_schema_ref: right_schema.clone(),
});
let distinct_exprs = left_schema
.iter()
.cloned()
.map(ScalarExpression::ColumnRef)
.collect_vec();

Ok(self.bind_distinct(
LogicalPlan::new(
union_op,
Childrens::Twins {
left: left_plan,
right: right_plan,
},
),
distinct_exprs,
))
}
(set_operator, _) => Err(DatabaseError::UnsupportedStmt(format!(
set_operator => Err(DatabaseError::UnsupportedStmt(format!(
"set operator: {:?}",
set_operator
))),
Expand Down
58 changes: 58 additions & 0 deletions src/execution/dql/except.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::execution::{build_read, Executor, ReadExecutor};
use crate::planner::LogicalPlan;
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::throw;
use ahash::{HashSet, HashSetExt};
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;

pub struct Except {
left_input: LogicalPlan,
right_input: LogicalPlan,
}

impl From<(LogicalPlan, LogicalPlan)> for Except {
fn from((left_input, right_input): (LogicalPlan, LogicalPlan)) -> Self {
Except {
left_input,
right_input,
}
}
}

impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Except {
fn execute(
self,
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
move || {
let Except {
left_input,
right_input,
} = self;

let mut coroutine = build_read(right_input, cache, transaction);

let mut except_col = HashSet::new();

while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let tuple = throw!(tuple);
except_col.insert(tuple);
}

let mut coroutine = build_read(left_input, cache, transaction);

while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let tuple = throw!(tuple);
if !except_col.contains(&tuple) {
yield Ok(tuple);
}
}
},
)
}
}
1 change: 1 addition & 0 deletions src/execution/dql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub(crate) mod aggregate;
pub(crate) mod describe;
pub(crate) mod dummy;
pub(crate) mod except;
pub(crate) mod explain;
pub(crate) mod filter;
pub(crate) mod function_scan;
Expand Down
6 changes: 6 additions & 0 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::execution::dql::aggregate::hash_agg::HashAggExecutor;
use crate::execution::dql::aggregate::simple_agg::SimpleAggExecutor;
use crate::execution::dql::describe::Describe;
use crate::execution::dql::dummy::Dummy;
use crate::execution::dql::except::Except;
use crate::execution::dql::explain::Explain;
use crate::execution::dql::filter::Filter;
use crate::execution::dql::function_scan::FunctionScan;
Expand Down Expand Up @@ -146,6 +147,11 @@ pub fn build_read<'a, T: Transaction + 'a>(

Union::from((left_input, right_input)).execute(cache, transaction)
}
Operator::Except(_) => {
let (left_input, right_input) = childrens.pop_twins();

Except::from((left_input, right_input)).execute(cache, transaction)
}
_ => unreachable!(),
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/rule/normalization/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ impl ColumnPruning {
| Operator::Limit(_)
| Operator::Join(_)
| Operator::Filter(_)
| Operator::Union(_) => {
| Operator::Union(_)
| Operator::Except(_) => {
let temp_columns = operator.referenced_columns(false);
// why?
let mut column_references = column_references;
Expand Down
6 changes: 4 additions & 2 deletions src/optimizer/rule/normalization/compilation_in_advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ impl ExpressionRemapper {
| Operator::Truncate(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
| Operator::Union(_) => (),
| Operator::Union(_)
| Operator::Except(_) => (),
}
if let Some(exprs) = operator.output_exprs() {
*output_exprs = exprs;
Expand Down Expand Up @@ -217,7 +218,8 @@ impl EvaluatorBind {
| Operator::Truncate(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
| Operator::Union(_) => (),
| Operator::Union(_)
| Operator::Except(_) => (),
}

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod operator;

use crate::catalog::{ColumnCatalog, ColumnRef, TableName};
use crate::planner::operator::except::ExceptOperator;
use crate::planner::operator::join::JoinType;
use crate::planner::operator::union::UnionOperator;
use crate::planner::operator::values::ValuesOperator;
Expand Down Expand Up @@ -169,6 +170,10 @@ impl LogicalPlan {
| Operator::Union(UnionOperator {
left_schema_ref: schema_ref,
..
})
| Operator::Except(ExceptOperator {
left_schema_ref: schema_ref,
..
}) => SchemaOutput::SchemaRef(schema_ref.clone()),
Operator::Dummy => SchemaOutput::Schema(vec![]),
Operator::ShowTable => SchemaOutput::Schema(vec![ColumnRef::from(
Expand Down
48 changes: 48 additions & 0 deletions src/planner/operator/except.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::planner::operator::Operator;
use crate::planner::{Childrens, LogicalPlan};
use crate::types::tuple::SchemaRef;
use itertools::Itertools;
use kite_sql_serde_macros::ReferenceSerialization;
use std::fmt;
use std::fmt::Formatter;

#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)]
pub struct ExceptOperator {
pub left_schema_ref: SchemaRef,
// mainly use `left_schema` as output and `right_schema` for `column pruning`
pub _right_schema_ref: SchemaRef,
}

impl ExceptOperator {
pub fn build(
left_schema_ref: SchemaRef,
right_schema_ref: SchemaRef,
left_plan: LogicalPlan,
right_plan: LogicalPlan,
) -> LogicalPlan {
LogicalPlan::new(
Operator::Except(ExceptOperator {
left_schema_ref,
_right_schema_ref: right_schema_ref,
}),
Childrens::Twins {
left: left_plan,
right: right_plan,
},
)
}
}

impl fmt::Display for ExceptOperator {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let schema = self
.left_schema_ref
.iter()
.map(|column| column.name().to_string())
.join(", ");

write!(f, "Except: [{}]", schema)?;

Ok(())
}
}
12 changes: 12 additions & 0 deletions src/planner/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod describe;
pub mod drop_index;
pub mod drop_table;
pub mod drop_view;
pub mod except;
pub mod filter;
pub mod function_scan;
pub mod insert;
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::planner::operator::describe::DescribeOperator;
use crate::planner::operator::drop_index::DropIndexOperator;
use crate::planner::operator::drop_table::DropTableOperator;
use crate::planner::operator::drop_view::DropViewOperator;
use crate::planner::operator::except::ExceptOperator;
use crate::planner::operator::function_scan::FunctionScanOperator;
use crate::planner::operator::insert::InsertOperator;
use crate::planner::operator::join::JoinCondition;
Expand Down Expand Up @@ -73,6 +75,7 @@ pub enum Operator {
ShowView,
Explain,
Describe(DescribeOperator),
Except(ExceptOperator),
Union(UnionOperator),
// DML
Insert(InsertOperator),
Expand Down Expand Up @@ -147,6 +150,10 @@ impl Operator {
| Operator::Union(UnionOperator {
left_schema_ref: schema_ref,
..
})
| Operator::Except(ExceptOperator {
left_schema_ref: schema_ref,
..
}) => Some(
schema_ref
.iter()
Expand Down Expand Up @@ -230,6 +237,10 @@ impl Operator {
Operator::Union(UnionOperator {
left_schema_ref,
_right_schema_ref,
})
| Operator::Except(ExceptOperator {
left_schema_ref,
_right_schema_ref,
}) => left_schema_ref
.iter()
.chain(_right_schema_ref.iter())
Expand Down Expand Up @@ -293,6 +304,7 @@ impl fmt::Display for Operator {
Operator::CopyFromFile(op) => write!(f, "{}", op),
Operator::CopyToFile(op) => write!(f, "{}", op),
Operator::Union(op) => write!(f, "{}", op),
Operator::Except(op) => write!(f, "{}", op),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/types/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn types(schema: &Schema) -> Vec<LogicalType> {
.collect_vec()
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Tuple {
pub pk: Option<TupleId>,
pub values: Vec<DataValue>,
Expand Down
Loading
Loading