Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ run `cargo run -p tpcc --release` to run tpcc
- Tips: TPC-C currently only supports single thread
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.002 (0.025)
Payment : 0.001 (0.013)
Order-Status : 0.054 (0.159)
Delivery : 0.020 (0.034)
Stock-Level : 0.003 (0.004)
New-Order : 0.002 (0.018)
Payment : 0.001 (0.024)
Order-Status : 0.050 (0.067)
Delivery : 0.021 (0.030)
Stock-Level : 0.003 (0.005)
<TpmC>
7892 Tpmc
8101 Tpmc
```
#### 👉[check more](tpcc/README.md)

Expand Down
3 changes: 2 additions & 1 deletion src/catalog/column.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::catalog::TableName;
use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use crate::types::{ColumnId, LogicalType};
use kite_sql_serde_macros::ReferenceSerialization;
Expand Down Expand Up @@ -169,7 +170,7 @@ impl ColumnCatalog {
self.desc
.default
.as_ref()
.map(|expr| expr.eval(None))
.map(|expr| expr.eval::<&Tuple>(None))
.transpose()
}

Expand Down
107 changes: 107 additions & 0 deletions src/execution/dql/join/hash/full_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs};
use crate::execution::dql::join::hash_join::BuildState;
use crate::execution::dql::sort::BumpVec;
use crate::execution::Executor;
use crate::throw;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use ahash::HashMap;
use fixedbitset::FixedBitSet;

pub(crate) struct FullJoinState {
pub(crate) left_schema_len: usize,
pub(crate) right_schema_len: usize,
pub(crate) bits: FixedBitSet,
}

impl<'a> JoinProbeState<'a> for FullJoinState {
fn probe(
&mut self,
probe_args: ProbeArgs<'a>,
filter_args: Option<&'a FilterArgs>,
) -> Executor<'a> {
let left_schema_len = self.left_schema_len;
let bits_ptr: *mut FixedBitSet = &mut self.bits;

Box::new(
#[coroutine]
move || {
let ProbeArgs { probe_tuple, .. } = probe_args;

if let ProbeArgs {
is_keys_has_null: false,
build_state: Some(build_state),
..
} = probe_args
{
let mut has_filtered = false;
for (i, Tuple { values, pk }) in build_state.tuples.iter() {
let full_values =
Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned());

match &filter_args {
None => (),
Some(filter_args) => {
if !throw!(filter(&full_values, filter_args)) {
has_filtered = true;
unsafe {
(*bits_ptr).set(*i, true);
}
yield Ok(Self::full_right_row(left_schema_len, &probe_tuple));
continue;
}
}
}
yield Ok(Tuple::new(pk.clone(), full_values));
}
build_state.is_used = !has_filtered;
build_state.has_filted = has_filtered;
return;
}

yield Ok(Self::full_right_row(left_schema_len, &probe_tuple));
},
)
}

fn left_drop(
&mut self,
_build_map: HashMap<BumpVec<'a, DataValue>, BuildState>,
_filter_args: Option<&'a FilterArgs>,
) -> Option<Executor<'a>> {
let full_schema_len = self.right_schema_len + self.left_schema_len;
let bits_ptr: *mut FixedBitSet = &mut self.bits;

Some(Box::new(
#[coroutine]
move || {
for (_, state) in _build_map {
if state.is_used {
continue;
}
for (i, mut left_tuple) in state.tuples {
unsafe {
if !(*bits_ptr).contains(i) && state.has_filted {
continue;
}
}
left_tuple.values.resize(full_schema_len, DataValue::Null);
yield Ok(left_tuple);
}
}
},
))
}
}

impl FullJoinState {
pub(crate) fn full_right_row(left_schema_len: usize, probe_tuple: &Tuple) -> Tuple {
let full_values = Vec::from_iter(
(0..left_schema_len)
.map(|_| DataValue::Null)
.chain(probe_tuple.values.iter().cloned()),
);

Tuple::new(probe_tuple.pk.clone(), full_values)
}
}
45 changes: 45 additions & 0 deletions src/execution/dql/join/hash/inner_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs};
use crate::execution::Executor;
use crate::throw;
use crate::types::tuple::Tuple;

pub(crate) struct InnerJoinState;

impl<'a> JoinProbeState<'a> for InnerJoinState {
fn probe(
&mut self,
probe_args: ProbeArgs<'a>,
filter_args: Option<&'a FilterArgs>,
) -> Executor<'a> {
Box::new(
#[coroutine]
move || {
let ProbeArgs {
is_keys_has_null: false,
probe_tuple,
build_state: Some(build_state),
..
} = probe_args
else {
return;
};

build_state.is_used = true;
for (_, Tuple { values, pk }) in build_state.tuples.iter() {
let full_values =
Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned());

match &filter_args {
None => (),
Some(filter_args) => {
if !throw!(filter(&full_values, filter_args)) {
continue;
}
}
}
yield Ok(Tuple::new(pk.clone(), full_values));
}
},
)
}
}
71 changes: 71 additions & 0 deletions src/execution/dql/join/hash/left_anti_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::execution::dql::join::hash::left_semi_join::LeftSemiJoinState;
use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs};
use crate::execution::dql::join::hash_join::BuildState;
use crate::execution::dql::sort::BumpVec;
use crate::execution::Executor;
use crate::throw;
use crate::types::value::DataValue;
use ahash::HashMap;
use fixedbitset::FixedBitSet;

pub(crate) struct LeftAntiJoinState {
pub(crate) right_schema_len: usize,
pub(crate) inner: LeftSemiJoinState,
}

impl<'a> JoinProbeState<'a> for LeftAntiJoinState {
fn probe(
&mut self,
probe_args: ProbeArgs<'a>,
filter_args: Option<&'a FilterArgs>,
) -> Executor<'a> {
self.inner.probe(probe_args, filter_args)
}

fn left_drop(
&mut self,
_build_map: HashMap<BumpVec<'a, DataValue>, BuildState>,
filter_args: Option<&'a FilterArgs>,
) -> Option<Executor<'a>> {
let bits_ptr: *mut FixedBitSet = &mut self.inner.bits;
let right_schema_len = self.right_schema_len;
Some(Box::new(
#[coroutine]
move || {
for (
_,
BuildState {
tuples,
is_used,
has_filted,
},
) in _build_map
{
if is_used {
continue;
}
for (i, tuple) in tuples {
unsafe {
if (*bits_ptr).contains(i) && has_filted {
continue;
}
}
if let Some(filter_args) = filter_args {
let full_values = Vec::from_iter(
tuple
.values
.iter()
.cloned()
.chain((0..right_schema_len).map(|_| DataValue::Null)),
);
if !throw!(filter(&full_values, filter_args)) {
continue;
}
}
yield Ok(tuple);
}
}
},
))
}
}
90 changes: 90 additions & 0 deletions src/execution/dql/join/hash/left_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs};
use crate::execution::dql::join::hash_join::BuildState;
use crate::execution::dql::sort::BumpVec;
use crate::execution::Executor;
use crate::throw;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use ahash::HashMap;
use fixedbitset::FixedBitSet;

pub(crate) struct LeftJoinState {
pub(crate) left_schema_len: usize,
pub(crate) right_schema_len: usize,
pub(crate) bits: FixedBitSet,
}

impl<'a> JoinProbeState<'a> for LeftJoinState {
fn probe(
&mut self,
probe_args: ProbeArgs<'a>,
filter_args: Option<&'a FilterArgs>,
) -> Executor<'a> {
let bits_ptr: *mut FixedBitSet = &mut self.bits;
Box::new(
#[coroutine]
move || {
let ProbeArgs {
is_keys_has_null: false,
probe_tuple,
build_state: Some(build_state),
..
} = probe_args
else {
return;
};

let mut has_filted = false;
for (i, Tuple { values, pk }) in build_state.tuples.iter() {
let full_values =
Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned());

match &filter_args {
None => (),
Some(filter_args) => {
if !throw!(filter(&full_values, filter_args)) {
has_filted = true;
unsafe {
(*bits_ptr).set(*i, true);
}
continue;
}
}
}
yield Ok(Tuple::new(pk.clone(), full_values));
}
build_state.is_used = !has_filted;
build_state.has_filted = has_filted;
},
)
}

fn left_drop(
&mut self,
_build_map: HashMap<BumpVec<'a, DataValue>, BuildState>,
_filter_args: Option<&'a FilterArgs>,
) -> Option<Executor<'a>> {
let full_schema_len = self.right_schema_len + self.left_schema_len;
let bits_ptr: *mut FixedBitSet = &mut self.bits;

Some(Box::new(
#[coroutine]
move || {
for (_, state) in _build_map {
if state.is_used {
continue;
}
for (i, mut left_tuple) in state.tuples {
unsafe {
if !(*bits_ptr).contains(i) && state.has_filted {
continue;
}
}
left_tuple.values.resize(full_schema_len, DataValue::Null);
yield Ok(left_tuple);
}
}
},
))
}
}
Loading
Loading