-
Notifications
You must be signed in to change notification settings - Fork 135
feat!(query): RowBinaryWithNamesAndTypes
#221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
31d109a
3a66d7a
cf72759
5a60295
b338d88
8ae3629
acced9e
c20af77
c4a608e
0d416cf
65cb92f
fbfbd99
1d5c01a
227617e
986643f
b26006e
8567200
a1181a0
b77f45d
04c7a20
1f6c9e6
9bafc9a
c53ba74
00ff574
bd71a77
6ba6abf
fb49a24
52d0953
5ffae76
a922d0d
90132cb
49af48c
926213b
da08827
1b893a8
14f8550
5509b12
38d771d
446eb7c
fc9a49b
19760f3
5f51dc7
8f3f3b2
d189a78
ccfac33
1544b7b
b7b45c5
bd99890
bcc1e46
a879945
b094dd0
c449ee2
e1706f4
3c08c77
244d587
d5af0b8
ee8f37e
6d0e771
9f495e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
#![cfg(feature = "time")] | ||
slvrtrn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
use clickhouse::validation_mode::ValidationMode; | ||
use clickhouse::{Client, Compression, Row}; | ||
use criterion::black_box; | ||
use serde::Deserialize; | ||
use serde_repr::Deserialize_repr; | ||
use time::OffsetDateTime; | ||
|
||
#[derive(Debug, Clone, Deserialize_repr)] | ||
#[repr(i8)] | ||
pub enum PaymentType { | ||
CSH = 1, | ||
CRE = 2, | ||
NOC = 3, | ||
DIS = 4, | ||
UNK = 5, | ||
} | ||
|
||
#[derive(Debug, Clone, Row, Deserialize)] | ||
#[allow(dead_code)] | ||
pub struct TripSmall { | ||
trip_id: u32, | ||
#[serde(with = "clickhouse::serde::time::datetime")] | ||
pickup_datetime: OffsetDateTime, | ||
#[serde(with = "clickhouse::serde::time::datetime")] | ||
dropoff_datetime: OffsetDateTime, | ||
pickup_longitude: Option<f64>, | ||
pickup_latitude: Option<f64>, | ||
dropoff_longitude: Option<f64>, | ||
dropoff_latitude: Option<f64>, | ||
passenger_count: u8, | ||
trip_distance: f32, | ||
fare_amount: f32, | ||
extra: f32, | ||
tip_amount: f32, | ||
tolls_amount: f32, | ||
total_amount: f32, | ||
payment_type: PaymentType, | ||
pickup_ntaname: String, | ||
dropoff_ntaname: String, | ||
} | ||
|
||
async fn bench(name: &str, compression: Compression, validation_mode: ValidationMode) { | ||
slvrtrn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
let start = std::time::Instant::now(); | ||
let (sum_trip_ids, dec_mbytes, rec_mbytes) = do_bench(compression, validation_mode).await; | ||
assert_eq!(sum_trip_ids, 3630387815532582); | ||
let elapsed = start.elapsed(); | ||
let throughput = dec_mbytes / elapsed.as_secs_f64(); | ||
println!("{name:>8} {validation_mode:>10} {elapsed:>7.3?} {throughput:>4.0} MiB/s {rec_mbytes:>4.0} MiB"); | ||
} | ||
|
||
async fn do_bench(compression: Compression, validation_mode: ValidationMode) -> (u64, f64, f64) { | ||
let client = Client::default() | ||
.with_compression(compression) | ||
.with_validation_mode(validation_mode) | ||
.with_url("http://localhost:8123"); | ||
|
||
let mut cursor = client | ||
.query("SELECT * FROM nyc_taxi.trips_small ORDER BY trip_id DESC") | ||
.fetch::<TripSmall>() | ||
.unwrap(); | ||
|
||
let mut sum = 0; | ||
while let Some(row) = cursor.next().await.unwrap() { | ||
sum += row.trip_id as u64; | ||
black_box(&row); | ||
} | ||
|
||
let dec_bytes = cursor.decoded_bytes(); | ||
let dec_mbytes = dec_bytes as f64 / 1024.0 / 1024.0; | ||
let recv_bytes = cursor.received_bytes(); | ||
let recv_mbytes = recv_bytes as f64 / 1024.0 / 1024.0; | ||
(sum, dec_mbytes, recv_mbytes) | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
println!("compress validation elapsed throughput received"); | ||
bench("none", Compression::None, ValidationMode::First(1)).await; | ||
bench("lz4", Compression::Lz4, ValidationMode::First(1)).await; | ||
bench("none", Compression::None, ValidationMode::Each).await; | ||
bench("lz4", Compression::Lz4, ValidationMode::Each).await; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,14 @@ | ||
use crate::validation_mode::ValidationMode; | ||
|
||
use crate::{ | ||
bytes_ext::BytesExt, | ||
cursors::RawCursor, | ||
error::{Error, Result}, | ||
response::Response, | ||
rowbinary, | ||
}; | ||
use clickhouse_types::data_types::Column; | ||
use clickhouse_types::error::TypesError; | ||
use clickhouse_types::parse_rbwnat_columns_header; | ||
use serde::Deserialize; | ||
use std::marker::PhantomData; | ||
|
||
|
@@ -13,15 +17,59 @@ use std::marker::PhantomData; | |
pub struct RowCursor<T> { | ||
raw: RawCursor, | ||
bytes: BytesExt, | ||
columns: Vec<Column>, | ||
rows_to_validate: u64, | ||
|
||
_marker: PhantomData<T>, | ||
} | ||
|
||
impl<T> RowCursor<T> { | ||
pub(crate) fn new(response: Response) -> Self { | ||
pub(crate) fn new(response: Response, validation_mode: ValidationMode) -> Self { | ||
Self { | ||
_marker: PhantomData, | ||
raw: RawCursor::new(response), | ||
bytes: BytesExt::default(), | ||
_marker: PhantomData, | ||
columns: Vec::new(), | ||
rows_to_validate: match validation_mode { | ||
ValidationMode::First(n) => n as u64, | ||
ValidationMode::Each => u64::MAX, | ||
}, | ||
} | ||
} | ||
|
||
#[cold] | ||
#[inline(never)] | ||
async fn read_columns(&mut self) -> Result<()> { | ||
loop { | ||
if self.bytes.remaining() > 0 { | ||
let mut slice = self.bytes.slice(); | ||
match parse_rbwnat_columns_header(&mut slice) { | ||
Ok(columns) if !columns.is_empty() => { | ||
self.bytes.set_remaining(slice.len()); | ||
self.columns = columns; | ||
return Ok(()); | ||
} | ||
Ok(_) => { | ||
// TODO: or panic instead? | ||
slvrtrn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return Err(Error::BadResponse( | ||
"Expected at least one column in the header".to_string(), | ||
)); | ||
} | ||
Err(TypesError::NotEnoughData(_)) => {} | ||
Err(err) => { | ||
return Err(Error::ColumnsHeaderParserError(err.into())); | ||
} | ||
} | ||
} | ||
match self.raw.next().await? { | ||
Some(chunk) => self.bytes.extend(chunk), | ||
None if self.columns.is_empty() => { | ||
return Err(Error::BadResponse( | ||
"Could not read columns header".to_string(), | ||
)); | ||
} | ||
// if the result set is empty, there is only the columns header | ||
None => return Ok(()), | ||
} | ||
} | ||
} | ||
|
||
|
@@ -32,20 +80,37 @@ impl<T> RowCursor<T> { | |
/// # Cancel safety | ||
/// | ||
/// This method is cancellation safe. | ||
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>> | ||
pub async fn next<'cursor, 'data: 'cursor>(&'cursor mut self) -> Result<Option<T>> | ||
where | ||
T: Deserialize<'b>, | ||
T: Deserialize<'data>, | ||
{ | ||
loop { | ||
let mut slice = super::workaround_51132(self.bytes.slice()); | ||
|
||
match rowbinary::deserialize_from(&mut slice) { | ||
Ok(value) => { | ||
self.bytes.set_remaining(slice.len()); | ||
return Ok(Some(value)); | ||
if self.bytes.remaining() > 0 { | ||
if self.columns.is_empty() { | ||
self.read_columns().await?; | ||
if self.bytes.remaining() == 0 { | ||
continue; | ||
} | ||
} | ||
let mut slice = super::workaround_51132(self.bytes.slice()); | ||
let (result, not_enough_data) = match self.rows_to_validate { | ||
0 => rowbinary::deserialize_from::<T>(&mut slice, &[]), | ||
u64::MAX => rowbinary::deserialize_from::<T>(&mut slice, &self.columns), | ||
_ => { | ||
let result = rowbinary::deserialize_from::<T>(&mut slice, &self.columns); | ||
self.rows_to_validate -= 1; | ||
result | ||
} | ||
}; | ||
if !not_enough_data { | ||
return match result { | ||
Ok(value) => { | ||
self.bytes.set_remaining(slice.len()); | ||
Ok(Some(value)) | ||
} | ||
Err(err) => Err(err), | ||
}; | ||
slvrtrn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
Err(Error::NotEnoughData) => {} | ||
Err(err) => return Err(err), | ||
} | ||
|
||
match self.raw.next().await? { | ||
|
@@ -70,8 +135,7 @@ impl<T> RowCursor<T> { | |
self.raw.received_bytes() | ||
} | ||
|
||
/// Returns the total size in bytes decompressed since the cursor was | ||
/// created. | ||
/// Returns the total size in bytes decompressed since the cursor was created. | ||
#[inline] | ||
pub fn decoded_bytes(&self) -> u64 { | ||
self.raw.decoded_bytes() | ||
|
Uh oh!
There was an error while loading. Please reload this page.