Skip to content

Commit 818a9c5

Browse files
committed
Using the RBO method, the related subquery is implemented
1 parent 591119b commit 818a9c5

File tree

12 files changed

+303
-75
lines changed

12 files changed

+303
-75
lines changed

src/binder/expr.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,10 +369,13 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
369369
try_default!(&full_name.0, full_name.1);
370370
}
371371
if let Some(table) = full_name.0.or(bind_table_name) {
372-
let (source,is_parent) = self.context.bind_source::<A>(self.parent, &table, false)?;
372+
let (source, is_parent) = self.context.bind_source::<A>(self.parent, &table, false)?;
373373

374374
if is_parent {
375-
self.parent_table_col.entry(Arc::new(table.clone())).or_insert(HashSet::new()).insert(full_name.1.clone());
375+
self.parent_table_col
376+
.entry(Arc::new(table.clone()))
377+
.or_default()
378+
.insert(full_name.1.clone());
376379
}
377380

378381
let schema_buf = self.table_schema_buf.entry(Arc::new(table)).or_default();

src/binder/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
2525
use std::sync::Arc;
2626

2727
use crate::catalog::view::View;
28-
use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName};
28+
use crate::catalog::{ColumnRef, TableCatalog, TableName};
2929
use crate::db::{ScalaFunctions, TableFunctions};
3030
use crate::errors::DatabaseError;
3131
use crate::expression::ScalarExpression;
@@ -275,9 +275,9 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
275275
Ok(source)
276276
}
277277

278-
pub fn bind_source<'b: 'a, A: AsRef<[(&'static str, DataValue)]> >(
278+
pub fn bind_source<'b: 'a, A: AsRef<[(&'static str, DataValue)]>>(
279279
&self,
280-
parent: Option<&'a Binder<'a,'b,T,A>>,
280+
parent: Option<&'a Binder<'a, 'b, T, A>>,
281281
table_name: &str,
282282
is_parent: bool,
283283
) -> Result<(&'b Source, bool), DatabaseError> {
@@ -287,10 +287,10 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
287287
}) {
288288
Ok((source.1, is_parent))
289289
} else if let Some(binder) = parent {
290-
binder.context.bind_source(binder.parent, table_name,true)
290+
binder.context.bind_source(binder.parent, table_name, true)
291291
} else {
292292
Err(DatabaseError::InvalidTable(table_name.into()))
293-
}
293+
}
294294
}
295295

296296
// Tips: The order of this index is based on Aggregate being bound first.

src/binder/select.rs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::types::value::Utf8Type;
3636
use crate::types::{ColumnId, LogicalType};
3737
use itertools::Itertools;
3838
use sqlparser::ast::{
39-
CharLengthUnits, Distinct, Expr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Offset,
39+
CharLengthUnits, Distinct, Expr, Ident, Join, JoinConstraint, JoinOperator, Offset,
4040
OrderByExpr, Query, Select, SelectInto, SelectItem, SetExpr, SetOperator, SetQuantifier,
4141
TableAlias, TableFactor, TableWithJoins,
4242
};
@@ -101,8 +101,6 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
101101
};
102102
let mut select_list = self.normalize_select_item(&select.projection, &plan)?;
103103

104-
plan = self.bind_parent(plan)?;
105-
106104
if let Some(predicate) = &select.selection {
107105
plan = self.bind_where(plan, predicate)?;
108106
}
@@ -536,22 +534,6 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
536534
Ok(())
537535
}
538536

539-
fn bind_parent(
540-
&mut self,
541-
mut plan: LogicalPlan,
542-
) -> Result<LogicalPlan, DatabaseError> {
543-
for (table,columns) in self.parent_table_col.clone().into_iter() {
544-
let parent = self._bind_single_table_ref(None,table.as_str(),None)?;
545-
plan = LJoinOperator::build(
546-
plan,
547-
parent,
548-
JoinCondition::None,
549-
JoinType::Full,
550-
);
551-
}
552-
Ok(plan)
553-
}
554-
555537
fn bind_join(
556538
&mut self,
557539
mut left: LogicalPlan,
@@ -613,8 +595,6 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
613595

614596
let predicate = self.bind_expr(predicate)?;
615597

616-
children = self.bind_parent(children)?;
617-
618598
if let Some(sub_queries) = self.context.sub_queries_at_now() {
619599
for sub_query in sub_queries {
620600
let mut on_keys: Vec<(ScalarExpression, ScalarExpression)> = vec![];

src/db.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,18 @@ impl<S: Storage> State<S> {
166166

167167
let best_plan = Self::default_optimizer(source_plan)
168168
.find_best(Some(&transaction.meta_loader(meta_cache)))?;
169-
// println!("best_plan plan: {:#?}", best_plan);
169+
//println!("best_plan plan: {:#?}", best_plan);
170170

171171
Ok(best_plan)
172172
}
173173

174174
pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer {
175175
HepOptimizer::new(source_plan)
176+
.batch(
177+
"Correlated Subquery".to_string(),
178+
HepBatchStrategy::once_topdown(),
179+
vec![NormalizationRuleImpl::CorrelateSubquery],
180+
)
176181
.batch(
177182
"Column Pruning".to_string(),
178183
HepBatchStrategy::once_topdown(),

src/execution/dql/filter.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter {
3535

3636
let schema = input.output_schema().clone();
3737

38+
//println!("{:#?}114514'\n'1919810{:#?}",predicate,schema);
39+
3840
let mut coroutine = build_read(input, cache, transaction);
3941

4042
while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
4143
let tuple = throw!(tuple);
42-
44+
//println!("-> Coroutine returned: {:?}", tuple);
4345
if throw!(throw!(predicate.eval(Some((&tuple, &schema)))).is_true()) {
46+
//println!("-> throw!: {:?}", tuple);
4447
yield Ok(tuple);
4548
}
4649
}

src/execution/dql/join/nested_loop_join.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -505,12 +505,12 @@ mod test {
505505

506506
let filter = ScalarExpression::Binary {
507507
op: crate::expression::BinaryOperator::Gt,
508-
left_expr: Box::new(ScalarExpression::ColumnRef(
509-
ColumnRef::from(ColumnCatalog::new("c1".to_owned(), true, desc.clone())),
510-
)),
511-
right_expr: Box::new(ScalarExpression::ColumnRef(
512-
ColumnRef::from(ColumnCatalog::new("c4".to_owned(), true, desc.clone())),
513-
)),
508+
left_expr: Box::new(ScalarExpression::ColumnRef(ColumnRef::from(
509+
ColumnCatalog::new("c1".to_owned(), true, desc.clone()),
510+
))),
511+
right_expr: Box::new(ScalarExpression::ColumnRef(ColumnRef::from(
512+
ColumnCatalog::new("c4".to_owned(), true, desc.clone()),
513+
))),
514514
evaluator: Some(BinaryEvaluatorBox(Arc::new(Int32GtBinaryEvaluator))),
515515
ty: LogicalType::Boolean,
516516
};

src/expression/mod.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,37 +1105,33 @@ mod test {
11051105
)?;
11061106
fn_assert(
11071107
&mut cursor,
1108-
ScalarExpression::ColumnRef(
1109-
ColumnRef::from(ColumnCatalog::direct_new(
1110-
ColumnSummary {
1111-
name: "c3".to_string(),
1112-
relation: ColumnRelation::Table {
1113-
column_id: c3_column_id,
1114-
table_name: Arc::new("t1".to_string()),
1115-
is_temp: false,
1116-
},
1108+
ScalarExpression::ColumnRef(ColumnRef::from(ColumnCatalog::direct_new(
1109+
ColumnSummary {
1110+
name: "c3".to_string(),
1111+
relation: ColumnRelation::Table {
1112+
column_id: c3_column_id,
1113+
table_name: Arc::new("t1".to_string()),
1114+
is_temp: false,
11171115
},
1118-
false,
1119-
ColumnDesc::new(LogicalType::Integer, None, false, None)?,
1120-
false,
1121-
)),
1122-
),
1116+
},
1117+
false,
1118+
ColumnDesc::new(LogicalType::Integer, None, false, None)?,
1119+
false,
1120+
))),
11231121
Some((&transaction, &table_cache)),
11241122
&mut reference_tables,
11251123
)?;
11261124
fn_assert(
11271125
&mut cursor,
1128-
ScalarExpression::ColumnRef(
1129-
ColumnRef::from(ColumnCatalog::direct_new(
1130-
ColumnSummary {
1131-
name: "c4".to_string(),
1132-
relation: ColumnRelation::None,
1133-
},
1134-
false,
1135-
ColumnDesc::new(LogicalType::Boolean, None, false, None)?,
1136-
false,
1137-
)),
1138-
),
1126+
ScalarExpression::ColumnRef(ColumnRef::from(ColumnCatalog::direct_new(
1127+
ColumnSummary {
1128+
name: "c4".to_string(),
1129+
relation: ColumnRelation::None,
1130+
},
1131+
false,
1132+
ColumnDesc::new(LogicalType::Boolean, None, false, None)?,
1133+
false,
1134+
))),
11391135
Some((&transaction, &table_cache)),
11401136
&mut reference_tables,
11411137
)?;

src/optimizer/heuristic/graph.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl HepGraph {
7979
source_id: HepNodeId,
8080
children_option: Option<HepNodeId>,
8181
new_node: Operator,
82-
) {
82+
) -> HepNodeId {
8383
let new_index = self.graph.add_node(new_node);
8484
let mut order = self.graph.edges(source_id).count();
8585

@@ -95,6 +95,7 @@ impl HepGraph {
9595

9696
self.graph.add_edge(source_id, new_index, order);
9797
self.version += 1;
98+
new_index
9899
}
99100

100101
pub fn replace_node(&mut self, source_id: HepNodeId, new_node: Operator) {

0 commit comments

Comments
 (0)