Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
}
}
ScalarExpression::Constant(_) | ScalarExpression::ColumnRef { .. } => (),
ScalarExpression::Reference { .. } | ScalarExpression::Empty => unreachable!(),
ScalarExpression::Empty => unreachable!(),
ScalarExpression::Tuple(args)
| ScalarExpression::ScalaFunction(ScalarFunction { args, .. })
| ScalarExpression::Coalesce { exprs: args, .. } => {
Expand Down Expand Up @@ -390,7 +390,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
Ok(())
}
ScalarExpression::Constant(_) => Ok(()),
ScalarExpression::Reference { .. } | ScalarExpression::Empty => unreachable!(),
ScalarExpression::Empty => unreachable!(),
ScalarExpression::Tuple(args)
| ScalarExpression::ScalaFunction(ScalarFunction { args, .. })
| ScalarExpression::Coalesce { exprs: args, .. } => {
Expand Down
2 changes: 1 addition & 1 deletion src/binder/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
for expr in exprs {
// TODO: Expression Index
match self.bind_expr(&expr.expr)? {
ScalarExpression::ColumnRef(column) => columns.push(column),
ScalarExpression::ColumnRef { column, .. } => columns.push(column),
expr => {
return Err(DatabaseError::UnsupportedStmt(format!(
"'CREATE INDEX' by {}",
Expand Down
4 changes: 2 additions & 2 deletions src/binder/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
column.set_ref_table(view_name.clone(), Ulid::new(), true);

ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(mapping_column.clone())),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(ColumnRef::from(
expr: Box::new(ScalarExpression::column_expr(mapping_column.clone())),
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(ColumnRef::from(
column,
)))),
}
Expand Down
10 changes: 5 additions & 5 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T

let alias_expr = ScalarExpression::Alias {
expr: Box::new(expr),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(ColumnRef::from(
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(ColumnRef::from(
alias_column,
)))),
};
Expand Down Expand Up @@ -311,13 +311,13 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T

let columns = sub_query_schema
.iter()
.map(|column| ScalarExpression::ColumnRef(column.clone()))
.map(|column| ScalarExpression::column_expr(column.clone()))
.collect::<Vec<_>>();
ScalarExpression::Tuple(columns)
} else {
fn_check(1)?;

ScalarExpression::ColumnRef(sub_query_schema[0].clone())
ScalarExpression::column_expr(sub_query_schema[0].clone())
};
Ok((sub_query, expr))
}
Expand Down Expand Up @@ -371,7 +371,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
let source = self.context.bind_source(&table)?;
let schema_buf = self.table_schema_buf.entry(Arc::new(table)).or_default();

Ok(ScalarExpression::ColumnRef(
Ok(ScalarExpression::column_expr(
source
.column(&full_name.1, schema_buf)
.ok_or_else(|| DatabaseError::ColumnNotFound(full_name.1.to_string()))?,
Expand Down Expand Up @@ -403,7 +403,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
table_schema_buf.entry(table_name.clone()).or_default();
source.column(&full_name.1, schema_buf)
} {
*got_column = Some(ScalarExpression::ColumnRef(column));
*got_column = Some(ScalarExpression::column_expr(column));
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
slice::from_ref(ident),
Some(table_name.to_string()),
)? {
ScalarExpression::ColumnRef(catalog) => columns.push(catalog),
ScalarExpression::ColumnRef { column, .. } => columns.push(column),
_ => return Err(DatabaseError::UnsupportedStmt(ident.to_string())),
}
}
Expand Down
57 changes: 30 additions & 27 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::execution::dql::join::joins_nullable;
use crate::expression::agg::AggKind;
use crate::expression::simplify::ConstantCalculator;
use crate::expression::visitor_mut::VisitorMut;
use crate::expression::ScalarExpression::Constant;
use crate::expression::{AliasType, BinaryOperator};
use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::operator::except::ExceptOperator;
Expand Down Expand Up @@ -103,7 +102,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
}
plan
};
let mut select_list = self.normalize_select_item(&select.projection, &plan)?;
let mut select_list = self.normalize_select_item(&select.projection, &mut plan)?;

if let Some(predicate) = &select.selection {
plan = self.bind_where(plan, predicate)?;
Expand Down Expand Up @@ -159,6 +158,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
Ok(plan)
}

/// FIXME: temp values need to register BindContext.bind_table
fn bind_temp_values(&mut self, expr_rows: &[Vec<Expr>]) -> Result<LogicalPlan, DatabaseError> {
let values_len = expr_rows[0].len();

Expand All @@ -176,7 +176,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
let mut expression = self.bind_expr(expr)?;
ConstantCalculator.visit(&mut expression)?;

if let Constant(value) = expression {
if let ScalarExpression::Constant(value) = expression {
let value_type = value.logical_type();

inferred_types[col_index] = match &inferred_types[col_index] {
Expand Down Expand Up @@ -230,19 +230,19 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
LogicalType::max_logical_type(left_schema.datatype(), right_schema.datatype())?;
if &cast_type != left_schema.datatype() {
left_cast.push(ScalarExpression::TypeCast {
expr: Box::new(ScalarExpression::ColumnRef(left_schema.clone())),
expr: Box::new(ScalarExpression::column_expr(left_schema.clone())),
ty: cast_type.clone(),
});
} else {
left_cast.push(ScalarExpression::ColumnRef(left_schema.clone()));
left_cast.push(ScalarExpression::column_expr(left_schema.clone()));
}
if &cast_type != right_schema.datatype() {
right_cast.push(ScalarExpression::TypeCast {
expr: Box::new(ScalarExpression::ColumnRef(right_schema.clone())),
expr: Box::new(ScalarExpression::column_expr(right_schema.clone())),
ty: cast_type.clone(),
});
} else {
right_cast.push(ScalarExpression::ColumnRef(right_schema.clone()));
right_cast.push(ScalarExpression::column_expr(right_schema.clone()));
}
}

Expand Down Expand Up @@ -312,7 +312,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
let distinct_exprs = left_schema
.iter()
.cloned()
.map(ScalarExpression::ColumnRef)
.map(ScalarExpression::column_expr)
.collect_vec();

let union_op = Operator::Union(UnionOperator {
Expand Down Expand Up @@ -344,7 +344,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
let distinct_exprs = left_schema
.iter()
.cloned()
.map(ScalarExpression::ColumnRef)
.map(ScalarExpression::column_expr)
.collect_vec();

let except_op = Operator::Except(ExceptOperator {
Expand Down Expand Up @@ -492,8 +492,8 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
);

let alias_column_expr = ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(ColumnRef::from(
expr: Box::new(ScalarExpression::column_expr(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(ColumnRef::from(
alias_column,
)))),
};
Expand Down Expand Up @@ -548,7 +548,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
fn normalize_select_item(
&mut self,
items: &[SelectItem],
plan: &LogicalPlan,
plan: &mut LogicalPlan,
) -> Result<Vec<ScalarExpression>, DatabaseError> {
let mut select_items = vec![];

Expand Down Expand Up @@ -624,8 +624,8 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
.expr_aliases
.iter()
.filter(|(_, expr)| {
if let ScalarExpression::ColumnRef(col) = expr.unpack_alias_ref() {
if fn_not_on_using(col) {
if let ScalarExpression::ColumnRef { column, .. } = expr.unpack_alias_ref() {
if fn_not_on_using(column) {
exprs.push(ScalarExpression::clone(expr));
return true;
}
Expand All @@ -651,7 +651,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
if !fn_not_on_using(column) {
continue;
}
exprs.push(ScalarExpression::ColumnRef(column.clone()));
exprs.push(ScalarExpression::column_expr(column.clone()));
}
Ok(())
}
Expand Down Expand Up @@ -751,7 +751,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
} else {
BinaryOperator::Eq
},
left_expr: Box::new(ScalarExpression::ColumnRef(
left_expr: Box::new(ScalarExpression::column_expr(
agg.output_schema()[0].clone(),
)),
right_expr: Box::new(ScalarExpression::Constant(DataValue::Int32(
Expand Down Expand Up @@ -928,7 +928,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
}

for column in select_items {
if let ScalarExpression::ColumnRef(col) = column {
if let ScalarExpression::ColumnRef { column, .. } = column {
let _ = table_force_nullable
.iter()
.find(|(table_name, source, _)| {
Expand All @@ -937,11 +937,11 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
.entry((*table_name).clone())
.or_default();

source.column(col.name(), schema_buf).is_some()
source.column(column.name(), schema_buf).is_some()
})
.map(|(_, _, nullable)| {
if let Some(new_column) = col.nullable_for_join(*nullable) {
*col = new_column;
if let Some(new_column) = column.nullable_for_join(*nullable) {
*column = new_column;
}
});
}
Expand Down Expand Up @@ -1003,8 +1003,8 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
};
self.context.add_using(join_type, left_column, right_column);
on_keys.push((
ScalarExpression::ColumnRef(left_column.clone()),
ScalarExpression::ColumnRef(right_column.clone()),
ScalarExpression::column_expr(left_column.clone()),
ScalarExpression::column_expr(right_column.clone()),
));
}
Ok(JoinCondition::On {
Expand All @@ -1024,8 +1024,8 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
left_schema.iter().find(|column| column.name() == *name),
right_schema.iter().find(|column| column.name() == *name),
) {
let left_expr = ScalarExpression::ColumnRef(left_column.clone());
let right_expr = ScalarExpression::ColumnRef(right_column.clone());
let left_expr = ScalarExpression::column_expr(left_column.clone());
let right_expr = ScalarExpression::column_expr(right_column.clone());

self.context.add_using(join_type, left_column, right_column);
on_keys.push((left_expr, right_expr));
Expand Down Expand Up @@ -1077,7 +1077,10 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
BinaryOperator::Eq => {
match (left_expr.unpack_alias_ref(), right_expr.unpack_alias_ref()) {
// example: foo = bar
(ScalarExpression::ColumnRef(l), ScalarExpression::ColumnRef(r)) => {
(
ScalarExpression::ColumnRef { column: l, .. },
ScalarExpression::ColumnRef { column: r, .. },
) => {
// reorder left and right joins keys to pattern: (left, right)
if fn_contains(left_schema, l.summary())
&& fn_contains(right_schema, r.summary())
Expand All @@ -1099,8 +1102,8 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
});
}
}
(ScalarExpression::ColumnRef(column), _)
| (_, ScalarExpression::ColumnRef(column)) => {
(ScalarExpression::ColumnRef { column, .. }, _)
| (_, ScalarExpression::ColumnRef { column, .. }) => {
if fn_or_contains(left_schema, right_schema, column.summary()) {
accum_filter.push(ScalarExpression::Binary {
left_expr,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
slice::from_ref(ident),
Some(table_name.to_string()),
)? {
ScalarExpression::ColumnRef(column) => {
ScalarExpression::ColumnRef { column, .. } => {
let mut expr = if matches!(expression, ScalarExpression::Empty) {
let default_value = column
.default_value()?
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl<S: Storage> State<S> {
"Expression Remapper".to_string(),
HepBatchStrategy::once_topdown(),
vec![
NormalizationRuleImpl::ExpressionRemapper,
NormalizationRuleImpl::BindExpressionPosition,
// TIPS: This rule is necessary
NormalizationRuleImpl::EvaluatorBind,
],
Expand Down
4 changes: 3 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::expression::{BinaryOperator, UnaryOperator};
use crate::expression::{BinaryOperator, ScalarExpression, UnaryOperator};
use crate::types::tuple::TupleId;
use crate::types::LogicalType;
use chrono::ParseError;
Expand Down Expand Up @@ -161,6 +161,8 @@ pub enum DatabaseError {
TupleIdNotFound(TupleId),
#[error("there are more buckets: {0} than elements: {1}")]
TooManyBuckets(usize, usize),
#[error("this scalar expression: '{0}' unbind position")]
UnbindExpressionPosition(ScalarExpression),
#[error("unsupported unary operator: {0} cannot support {1} for calculations")]
UnsupportedUnaryOperator(LogicalType, UnaryOperator),
#[error("unsupported binary operator: {0} cannot support {1} for calculations")]
Expand Down
25 changes: 16 additions & 9 deletions src/execution/ddl/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::execution::dql::projection::Projection;
use crate::execution::DatabaseError;
use crate::execution::{build_read, Executor, WriteExecutor};
use crate::expression::ScalarExpression;
use crate::expression::{BindPosition, ScalarExpression};
use crate::planner::operator::create_index::CreateIndexOperator;
use crate::planner::LogicalPlan;
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
Expand All @@ -11,6 +11,7 @@ use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use crate::types::value::DataValue;
use crate::types::ColumnId;
use std::borrow::Cow;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
Expand Down Expand Up @@ -43,15 +44,21 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex {
ty,
} = self.op;

let (column_ids, column_exprs): (Vec<ColumnId>, Vec<ScalarExpression>) = columns
.into_iter()
.filter_map(|column| {
column
.id()
.map(|id| (id, ScalarExpression::ColumnRef(column)))
})
.unzip();
let (column_ids, mut column_exprs): (Vec<ColumnId>, Vec<ScalarExpression>) =
columns
.into_iter()
.filter_map(|column| {
column
.id()
.map(|id| (id, ScalarExpression::column_expr(column)))
})
.unzip();
let schema = self.input.output_schema().clone();
throw!(BindPosition::bind_exprs(
column_exprs.iter_mut(),
|| schema.iter().map(Cow::Borrowed),
|a, b| a == b
));
let index_id = match unsafe { &mut (*transaction) }.add_index_meta(
cache.0,
&table_name,
Expand Down
Loading
Loading