Skip to content

Commit 7ba54ec

Browse files
authored
refactor: Use TupleValueSerializable to simplify DataValue serialization (#289)
1 parent d7d27e5 commit 7ba54ec

File tree

17 files changed

+888
-493
lines changed

17 files changed

+888
-493
lines changed

src/binder/select.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
159159
Ok(plan)
160160
}
161161

162-
fn bind_temp_values(
163-
&mut self,
164-
expr_rows: &Vec<Vec<Expr>>,
165-
) -> Result<LogicalPlan, DatabaseError> {
162+
fn bind_temp_values(&mut self, expr_rows: &[Vec<Expr>]) -> Result<LogicalPlan, DatabaseError> {
166163
let values_len = expr_rows[0].len();
167164

168165
let mut inferred_types: Vec<Option<LogicalType>> = vec![None; values_len];

src/catalog/table.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,6 @@ impl TableCatalog {
8989
&self.primary_key_indices
9090
}
9191

92-
pub(crate) fn types(&self) -> Vec<LogicalType> {
93-
self.columns()
94-
.map(|column| column.datatype().clone())
95-
.collect_vec()
96-
}
97-
9892
/// Add a column to the table catalog.
9993
pub(crate) fn add_column(
10094
&mut self,

src/execution/ddl/add_column.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::types::value::DataValue;
99
use crate::{
1010
planner::operator::alter_table::add_column::AddColumnOperator, storage::Transaction, throw,
1111
};
12+
use itertools::Itertools;
1213
use std::ops::Coroutine;
1314
use std::ops::CoroutineState;
1415
use std::pin::Pin;
@@ -69,9 +70,14 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
6970
}
7071
drop(coroutine);
7172

73+
let serializers = types.iter().map(|ty| ty.serializable()).collect_vec();
7274
for tuple in tuples {
73-
throw!(unsafe { &mut (*transaction) }
74-
.append_tuple(table_name, tuple, &types, true));
75+
throw!(unsafe { &mut (*transaction) }.append_tuple(
76+
table_name,
77+
tuple,
78+
&serializers,
79+
true
80+
));
7581
}
7682
let col_id = throw!(unsafe { &mut (*transaction) }.add_column(
7783
cache.0,

src/execution/ddl/drop_column.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
66
use crate::throw;
77
use crate::types::tuple::Tuple;
88
use crate::types::tuple_builder::TupleBuilder;
9+
use itertools::Itertools;
910
use std::ops::Coroutine;
1011
use std::ops::CoroutineState;
1112
use std::pin::Pin;
@@ -66,11 +67,12 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
6667
tuples.push(tuple);
6768
}
6869
drop(coroutine);
70+
let serializers = types.iter().map(|ty| ty.serializable()).collect_vec();
6971
for tuple in tuples {
7072
throw!(unsafe { &mut (*transaction) }.append_tuple(
7173
&table_name,
7274
tuple,
73-
&types,
75+
&serializers,
7476
true
7577
));
7678
}

src/execution/dml/copy_from_file.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use crate::execution::{Executor, WriteExecutor};
55
use crate::planner::operator::copy_from_file::CopyFromFileOperator;
66
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
77
use crate::throw;
8-
use crate::types::tuple::{types, Tuple};
8+
use crate::types::tuple::Tuple;
99
use crate::types::tuple_builder::TupleBuilder;
10+
use itertools::Itertools;
1011
use std::fs::File;
1112
use std::io::BufReader;
1213
use std::sync::mpsc;
@@ -33,7 +34,12 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile {
3334
Box::new(
3435
#[coroutine]
3536
move || {
36-
let types = types(&self.op.schema_ref);
37+
let serializers = self
38+
.op
39+
.schema_ref
40+
.iter()
41+
.map(|column| column.datatype().serializable())
42+
.collect_vec();
3743
let (tx, rx) = mpsc::channel();
3844
let (tx1, rx1) = mpsc::channel();
3945
// # Cancellation
@@ -50,7 +56,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile {
5056
throw!(unsafe { &mut (*transaction) }.append_tuple(
5157
table.name(),
5258
chunk,
53-
&types,
59+
&serializers,
5460
false
5561
));
5662
size += 1;

src/execution/dml/insert.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
9999
index_metas.push((index_meta, exprs));
100100
}
101101

102-
let types = table_catalog.types();
102+
let serializers = table_catalog
103+
.columns()
104+
.map(|column| column.datatype().serializable())
105+
.collect_vec();
103106
let pk_indices = table_catalog.primary_keys_indices();
104107
let mut coroutine = build_read(input, cache, transaction);
105108

@@ -147,7 +150,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
147150
throw!(unsafe { &mut (*transaction) }.append_tuple(
148151
&table_name,
149152
tuple,
150-
&types,
153+
&serializers,
151154
is_overwrite
152155
));
153156
inserted_count += 1;

src/execution/dml/update.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ use crate::planner::LogicalPlan;
88
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
99
use crate::throw;
1010
use crate::types::index::Index;
11-
use crate::types::tuple::types;
1211
use crate::types::tuple::Tuple;
1312
use crate::types::tuple_builder::TupleBuilder;
1413
use crate::types::value::DataValue;
14+
use itertools::Itertools;
1515
use std::collections::HashMap;
1616
use std::ops::Coroutine;
1717
use std::ops::CoroutineState;
@@ -62,7 +62,10 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
6262
}
6363

6464
let input_schema = input.output_schema().clone();
65-
let types = types(&input_schema);
65+
let serializers = input_schema
66+
.iter()
67+
.map(|column| column.datatype().serializable())
68+
.collect_vec();
6669
let mut updated_count = 0;
6770

6871
if let Some(table_catalog) =
@@ -133,7 +136,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
133136
throw!(unsafe { &mut (*transaction) }.append_tuple(
134137
&table_name,
135138
tuple,
136-
&types,
139+
&serializers,
137140
is_overwrite
138141
));
139142
updated_count += 1;

src/execution/dql/index_scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan {
5050
columns,
5151
self.index_by,
5252
self.ranges,
53-
with_pk,
53+
with_pk
5454
));
5555

5656
while let Some(tuple) = throw!(iter.next_tuple()) {
File renamed without changes.

src/execution/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub(crate) mod ddl;
22
pub(crate) mod dml;
33
pub(crate) mod dql;
4-
pub(crate) mod marco;
4+
pub(crate) mod execute_macro;
55

66
use self::ddl::add_column::AddColumn;
77
use self::dql::join::nested_loop_join::NestedLoopJoin;

0 commit comments

Comments
 (0)