Skip to content

Commit 1853027

Browse files
committed
refactor: ColumnId -> Ulid
1 parent a857c8f commit 1853027

File tree

27 files changed

+524
-286
lines changed

27 files changed

+524
-286
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ thiserror = { version = "1" }
6767
tokio = { version = "1.36", features = ["full"], optional = true }
6868
tracing = { version = "0.1" }
6969
typetag = { version = "0.2" }
70+
ulid = { version = "1", features = ["serde"] }
7071

7172
[dev-dependencies]
7273
cargo-tarpaulin = { version = "0.27" }

src/binder/expr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::expression::{AliasType, ScalarExpression};
1818
use crate::planner::LogicalPlan;
1919
use crate::storage::Transaction;
2020
use crate::types::value::{DataValue, Utf8Type};
21-
use crate::types::LogicalType;
21+
use crate::types::{ColumnId, LogicalType};
2222

2323
macro_rules! try_alias {
2424
($context:expr, $full_name:expr) => {
@@ -231,7 +231,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
231231
sub_query: LogicalPlan,
232232
) -> Result<(ScalarExpression, LogicalPlan), DatabaseError> {
233233
let mut alias_column = ColumnCatalog::clone(&column);
234-
alias_column.set_ref_table(self.context.temp_table(), 0);
234+
alias_column.set_ref_table(self.context.temp_table(), ColumnId::new());
235235

236236
let alias_expr = ScalarExpression::Alias {
237237
expr: Box::new(ScalarExpression::ColumnRef(column)),

src/binder/mod.rs

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -382,11 +382,12 @@ pub(crate) fn is_valid_identifier(s: &str) -> bool {
382382
#[cfg(test)]
383383
pub mod test {
384384
use crate::binder::{is_valid_identifier, Binder, BinderContext};
385-
use crate::catalog::{ColumnCatalog, ColumnDesc};
385+
use crate::catalog::{ColumnCatalog, ColumnDesc, TableCatalog};
386386
use crate::errors::DatabaseError;
387387
use crate::planner::LogicalPlan;
388388
use crate::storage::rocksdb::RocksStorage;
389389
use crate::storage::{Storage, TableCache, Transaction};
390+
use crate::types::ColumnId;
390391
use crate::types::LogicalType::Integer;
391392
use crate::utils::lru::ShardingLruCache;
392393
use std::hash::RandomState;
@@ -395,6 +396,56 @@ pub mod test {
395396
use std::sync::Arc;
396397
use tempfile::TempDir;
397398

399+
pub(crate) struct TableState<S: Storage> {
400+
pub(crate) table: TableCatalog,
401+
pub(crate) table_cache: Arc<TableCache>,
402+
pub(crate) storage: S,
403+
}
404+
405+
impl<S: Storage> TableState<S> {
406+
pub(crate) fn plan<T: AsRef<str>>(&self, sql: T) -> Result<LogicalPlan, DatabaseError> {
407+
let scala_functions = Default::default();
408+
let table_functions = Default::default();
409+
let transaction = self.storage.transaction()?;
410+
let mut binder = Binder::new(
411+
BinderContext::new(
412+
&self.table_cache,
413+
&transaction,
414+
&scala_functions,
415+
&table_functions,
416+
Arc::new(AtomicUsize::new(0)),
417+
),
418+
None,
419+
);
420+
let stmt = crate::parser::parse_sql(sql)?;
421+
422+
Ok(binder.bind(&stmt[0])?)
423+
}
424+
425+
pub(crate) fn column_id_by_name(&self, name: &str) -> &ColumnId {
426+
self.table.get_column_id_by_name(name).unwrap()
427+
}
428+
}
429+
430+
pub(crate) fn build_t1_table() -> Result<TableState<RocksStorage>, DatabaseError> {
431+
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
432+
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
433+
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
434+
let table = {
435+
let transaction = storage.transaction()?;
436+
transaction
437+
.table(&table_cache, Arc::new("t1".to_string()))
438+
.unwrap()
439+
.clone()
440+
};
441+
442+
Ok(TableState {
443+
table,
444+
table_cache,
445+
storage,
446+
})
447+
}
448+
398449
pub(crate) fn build_test_catalog(
399450
table_cache: &TableCache,
400451
path: impl Into<PathBuf> + Send,
@@ -443,28 +494,6 @@ pub mod test {
443494
Ok(storage)
444495
}
445496

446-
pub fn select_sql_run<S: AsRef<str>>(sql: S) -> Result<LogicalPlan, DatabaseError> {
447-
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
448-
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
449-
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
450-
let transaction = storage.transaction()?;
451-
let scala_functions = Default::default();
452-
let table_functions = Default::default();
453-
let mut binder = Binder::new(
454-
BinderContext::new(
455-
&table_cache,
456-
&transaction,
457-
&scala_functions,
458-
&table_functions,
459-
Arc::new(AtomicUsize::new(0)),
460-
),
461-
None,
462-
);
463-
let stmt = crate::parser::parse_sql(sql)?;
464-
465-
Ok(binder.bind(&stmt[0])?)
466-
}
467-
468497
#[test]
469498
pub fn test_valid_identifier() {
470499
debug_assert!(is_valid_identifier("valid_table"));

src/binder/select.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::planner::operator::union::UnionOperator;
2828
use crate::planner::LogicalPlan;
2929
use crate::storage::Transaction;
3030
use crate::types::tuple::{Schema, SchemaRef};
31-
use crate::types::LogicalType;
31+
use crate::types::{ColumnId, LogicalType};
3232
use itertools::Itertools;
3333
use sqlparser::ast::{
3434
Distinct, Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select,
@@ -352,7 +352,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
352352
for (alias, column) in aliases_with_columns {
353353
let mut alias_column = ColumnCatalog::clone(&column);
354354
alias_column.set_name(alias.clone());
355-
alias_column.set_ref_table(table_alias.clone(), column.id().unwrap_or(0));
355+
alias_column.set_ref_table(table_alias.clone(), column.id().unwrap_or(ColumnId::new()));
356356

357357
let alias_column_expr = ScalarExpression::Alias {
358358
expr: Box::new(ScalarExpression::ColumnRef(column)),
@@ -983,31 +983,34 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
983983

984984
#[cfg(test)]
985985
mod tests {
986-
use crate::binder::test::select_sql_run;
986+
use crate::binder::test::build_t1_table;
987987
use crate::errors::DatabaseError;
988988

989989
#[test]
990990
fn test_select_bind() -> Result<(), DatabaseError> {
991-
let plan_1 = select_sql_run("select * from t1")?;
991+
let table_states = build_t1_table()?;
992+
993+
let plan_1 = table_states.plan("select * from t1")?;
992994
println!("just_col:\n {:#?}", plan_1);
993-
let plan_2 = select_sql_run("select t1.c1, t1.c2 from t1")?;
995+
let plan_2 = table_states.plan("select t1.c1, t1.c2 from t1")?;
994996
println!("table_with_col:\n {:#?}", plan_2);
995-
let plan_3 = select_sql_run("select t1.c1, t1.c2 from t1 where c1 > 2")?;
997+
let plan_3 = table_states.plan("select t1.c1, t1.c2 from t1 where c1 > 2")?;
996998
println!("table_with_col_and_c1_compare_constant:\n {:#?}", plan_3);
997-
let plan_4 = select_sql_run("select t1.c1, t1.c2 from t1 where c1 > c2")?;
999+
let plan_4 = table_states.plan("select t1.c1, t1.c2 from t1 where c1 > c2")?;
9981000
println!("table_with_col_and_c1_compare_c2:\n {:#?}", plan_4);
999-
let plan_5 = select_sql_run("select avg(t1.c1) from t1")?;
1001+
let plan_5 = table_states.plan("select avg(t1.c1) from t1")?;
10001002
println!("table_with_col_and_c1_avg:\n {:#?}", plan_5);
1001-
let plan_6 = select_sql_run("select t1.c1, t1.c2 from t1 where (t1.c1 - t1.c2) > 1")?;
1003+
let plan_6 = table_states.plan("select t1.c1, t1.c2 from t1 where (t1.c1 - t1.c2) > 1")?;
10021004
println!("table_with_col_nested:\n {:#?}", plan_6);
10031005

1004-
let plan_7 = select_sql_run("select * from t1 limit 1")?;
1006+
let plan_7 = table_states.plan("select * from t1 limit 1")?;
10051007
println!("limit:\n {:#?}", plan_7);
10061008

1007-
let plan_8 = select_sql_run("select * from t1 offset 2")?;
1009+
let plan_8 = table_states.plan("select * from t1 offset 2")?;
10081010
println!("offset:\n {:#?}", plan_8);
10091011

1010-
let plan_9 = select_sql_run("select c1, c3 from t1 inner join t2 on c1 = c3 and c1 > 1")?;
1012+
let plan_9 =
1013+
table_states.plan("select c1, c3 from t1 inner join t2 on c1 = c3 and c1 > 1")?;
10111014
println!("join:\n {:#?}", plan_9);
10121015

10131016
Ok(())

src/catalog/column.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ impl ColumnCatalog {
138138
}
139139
}
140140

141+
#[cfg(test)]
142+
impl ColumnSummary {
143+
pub(crate) fn column_id(&self) -> Option<&ColumnId> {
144+
if let ColumnRelation::Table { column_id, .. } = &self.relation {
145+
Some(column_id)
146+
} else {
147+
None
148+
}
149+
}
150+
}
151+
141152
/// The descriptor of a column.
142153
#[derive(Debug, Clone, PartialEq, Eq, Hash, ReferenceSerialization)]
143154
pub struct ColumnDesc {

src/catalog/table.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
use itertools::Itertools;
2-
use serde::{Deserialize, Serialize};
3-
use std::collections::BTreeMap;
4-
use std::sync::Arc;
5-
use std::{slice, vec};
6-
71
use crate::catalog::{ColumnCatalog, ColumnRef, ColumnRelation};
82
use crate::errors::DatabaseError;
93
use crate::types::index::{IndexMeta, IndexMetaRef, IndexType};
104
use crate::types::tuple::SchemaRef;
115
use crate::types::{ColumnId, LogicalType};
6+
use itertools::Itertools;
7+
use serde::{Deserialize, Serialize};
8+
use std::collections::BTreeMap;
9+
use std::sync::Arc;
10+
use std::{slice, vec};
11+
use ulid::Generator;
1212

1313
pub type TableName = Arc<String>;
1414

@@ -41,9 +41,9 @@ impl TableCatalog {
4141
self.columns.get(id).map(|i| &self.schema_ref[*i])
4242
}
4343

44-
#[allow(dead_code)]
45-
pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option<ColumnId> {
46-
self.column_idxs.get(name).map(|(id, _)| id).cloned()
44+
#[cfg(test)]
45+
pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option<&ColumnId> {
46+
self.column_idxs.get(name).map(|(id, _)| id)
4747
}
4848

4949
pub(crate) fn get_column_by_name(&self, name: &str) -> Option<&ColumnRef> {
@@ -87,17 +87,15 @@ impl TableCatalog {
8787
}
8888

8989
/// Add a column to the table catalog.
90-
pub(crate) fn add_column(&mut self, mut col: ColumnCatalog) -> Result<ColumnId, DatabaseError> {
90+
pub(crate) fn add_column(
91+
&mut self,
92+
mut col: ColumnCatalog,
93+
generator: &mut Generator,
94+
) -> Result<ColumnId, DatabaseError> {
9195
if self.column_idxs.contains_key(col.name()) {
9296
return Err(DatabaseError::DuplicateColumn(col.name().to_string()));
9397
}
94-
95-
let col_id = self
96-
.columns
97-
.iter()
98-
.last()
99-
.map(|(column_id, _)| column_id + 1)
100-
.unwrap_or(0);
98+
let col_id = generator.generate().unwrap();
10199

102100
col.summary.relation = ColumnRelation::Table {
103101
column_id: col_id,
@@ -155,8 +153,11 @@ impl TableCatalog {
155153
indexes: vec![],
156154
schema_ref: Arc::new(vec![]),
157155
};
156+
let mut generator = Generator::new();
158157
for col_catalog in columns.into_iter() {
159-
let _ = table_catalog.add_column(col_catalog)?;
158+
let _ = table_catalog
159+
.add_column(col_catalog, &mut generator)
160+
.unwrap();
160161
}
161162

162163
Ok(table_catalog)

src/db.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,8 @@ pub(crate) mod test {
399399
true,
400400
ColumnDesc::new(LogicalType::Integer, false, false, None).unwrap(),
401401
);
402-
column.set_ref_table(Arc::new("a".to_string()), 0);
402+
let number_column_id = schema[0].summary.column_id().unwrap();
403+
column.set_ref_table(Arc::new("a".to_string()), *number_column_id);
403404

404405
debug_assert_eq!(schema, Arc::new(vec![ColumnRef::from(column)]));
405406
debug_assert_eq!(

src/execution/dml/copy_from_file.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,20 @@ fn return_result(size: usize, tx: Sender<Tuple>) -> Result<(), DatabaseError> {
108108

109109
#[cfg(test)]
110110
mod tests {
111+
use super::*;
112+
use crate::binder::copy::ExtSource;
111113
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnRelation, ColumnSummary};
112114
use crate::db::DataBaseBuilder;
115+
use crate::errors::DatabaseError;
116+
use crate::storage::Storage;
117+
use crate::types::LogicalType;
113118
use sqlparser::ast::CharLengthUnits;
114119
use std::io::Write;
115120
use std::ops::{Coroutine, CoroutineState};
116121
use std::pin::Pin;
117122
use std::sync::Arc;
118123
use tempfile::TempDir;
119-
120-
use super::*;
121-
use crate::binder::copy::ExtSource;
122-
use crate::errors::DatabaseError;
123-
use crate::storage::Storage;
124-
use crate::types::LogicalType;
124+
use ulid::Ulid;
125125

126126
#[test]
127127
fn read_csv() -> Result<(), DatabaseError> {
@@ -135,7 +135,7 @@ mod tests {
135135
summary: ColumnSummary {
136136
name: "a".to_string(),
137137
relation: ColumnRelation::Table {
138-
column_id: 0,
138+
column_id: Ulid::new(),
139139
table_name: Arc::new("t1".to_string()),
140140
},
141141
},
@@ -146,7 +146,7 @@ mod tests {
146146
summary: ColumnSummary {
147147
name: "b".to_string(),
148148
relation: ColumnRelation::Table {
149-
column_id: 1,
149+
column_id: Ulid::new(),
150150
table_name: Arc::new("t1".to_string()),
151151
},
152152
},
@@ -157,7 +157,7 @@ mod tests {
157157
summary: ColumnSummary {
158158
name: "c".to_string(),
159159
relation: ColumnRelation::Table {
160-
column_id: 2,
160+
column_id: Ulid::new(),
161161
table_name: Arc::new("t1".to_string()),
162162
},
163163
},

src/expression/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,7 +1303,7 @@ mod test {
13031303
use crate::function::numbers::Numbers;
13041304
use crate::serdes::{ReferenceSerialization, ReferenceTables};
13051305
use crate::storage::rocksdb::{RocksStorage, RocksTransaction};
1306-
use crate::storage::{Storage, TableCache};
1306+
use crate::storage::{Storage, TableCache, Transaction};
13071307
use crate::types::evaluator::boolean::BooleanNotUnaryEvaluator;
13081308
use crate::types::evaluator::int32::Int32PlusBinaryEvaluator;
13091309
use crate::types::evaluator::{BinaryEvaluatorBox, UnaryEvaluatorBox};
@@ -1345,6 +1345,12 @@ mod test {
13451345

13461346
let mut cursor = Cursor::new(Vec::new());
13471347
let mut reference_tables = ReferenceTables::new();
1348+
let c3_column_id = {
1349+
let table = transaction
1350+
.table(&table_cache, Arc::new("t1".to_string()))
1351+
.unwrap();
1352+
*table.get_column_id_by_name("c3").unwrap()
1353+
};
13481354

13491355
fn_assert(
13501356
&mut cursor,
@@ -1374,7 +1380,7 @@ mod test {
13741380
summary: ColumnSummary {
13751381
name: "c3".to_string(),
13761382
relation: ColumnRelation::Table {
1377-
column_id: 2,
1383+
column_id: c3_column_id,
13781384
table_name: Arc::new("t1".to_string()),
13791385
},
13801386
},

0 commit comments

Comments
 (0)