diff --git a/src/insert.rs b/src/insert.rs index 2af4a56b..e01a366f 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -27,18 +27,73 @@ const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 2048; const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capacity -/// Performs one `INSERT`. -/// -/// The [`Insert::end`] must be called to finalize the `INSERT`. -/// Otherwise, the whole `INSERT` will be aborted. -/// -/// Rows are being sent progressively to spread network load. -#[must_use] -pub struct Insert { - state: InsertState, +/// Inserted rows builder +pub struct RowsBuilder { buffer: BytesMut, #[cfg(feature = "lz4")] compression: Compression, +} + +impl RowsBuilder { + #[cfg(feature = "lz4")] + pub fn new(compression: Compression) -> Self { + Self { + buffer: BytesMut::with_capacity(BUFFER_SIZE), + compression, + } + } + + #[cfg(not(feature = "lz4"))] + pub fn new() -> Self { + Self { + buffer: BytesMut::with_capacity(BUFFER_SIZE), + } + } + + pub fn add_row(&mut self, row: &T) -> Result { + let old_buf_size = self.buffer.len(); + rowbinary::serialize_into(&mut self.buffer, row)?; + let written = self.buffer.len() - old_buf_size; + Ok(written) + } + + pub fn build(mut self) -> Result { + // Hyper uses non-trivial and inefficient schema of buffering chunks. + // It's difficult to determine when allocations occur. + // So, instead we control it manually here and rely on the system allocator. + self.take_and_prepare_chunk() + } + + #[inline] + pub fn len(&self) -> usize { + self.buffer.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() + } + + #[cfg(feature = "lz4")] + fn take_and_prepare_chunk(&mut self) -> Result { + Ok(if self.compression.is_lz4() { + let compressed = crate::compression::lz4::compress(&self.buffer)?; + self.buffer.clear(); + compressed + } else { + mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze() + }) + } + + #[cfg(not(feature = "lz4"))] + fn take_and_prepare_chunk(mut self) -> Result { + Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze()) + } +} + +/// inserted rows sender +pub struct RowsSender { + state: RowsSenderState, send_timeout: Option, end_timeout: Option, // Use boxed `Sleep` to reuse a timer entry, it improves performance. @@ -47,7 +102,214 @@ pub struct Insert { _marker: PhantomData T>, // TODO: test contravariance. } -enum InsertState { +// It should be a regular function, but it decreases performance. +macro_rules! timeout { + ($self:expr, $timeout:ident, $fut:expr) => {{ + if let Some(timeout) = $self.$timeout { + $self.sleep.as_mut().reset(Instant::now() + timeout); + } + + tokio::select! { + res = $fut => Some(res), + _ = &mut $self.sleep, if $self.$timeout.is_some() => None, + } + }}; +} + +impl RowsSender { + pub fn new(client: &Client, table: &str) -> Self + where + T: Row, + { + let fields = row::join_column_names::() + .expect("the row type must be a struct or a wrapper around it"); + + // TODO: what about escaping a table name? + // https://clickhouse.com/docs/en/sql-reference/syntax#identifiers + let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields); + + Self { + state: RowsSenderState::NotStarted { + client: Box::new(client.clone()), + sql, + }, + send_timeout: None, + end_timeout: None, + sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))), + _marker: PhantomData, + } + } + + /// Send one chunk and keep sender active + pub async fn send_chunk(&mut self, chunk: Bytes) -> Result<()> { + debug_assert!(matches!(self.state, RowsSenderState::Active { .. })); + + let sender = self.state.sender().unwrap(); // checked above + + let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) { + Some(true) => return Ok(()), + Some(false) => false, // an actual error will be returned from `wait_handle` + None => true, + }; + + // Error handling. + self.abort(); + + // TODO: is it required to wait the handle in the case of timeout? + let res = self.wait_handle().await; + + if is_timed_out { + Err(Error::TimedOut) + } else { + res?; // a real error should be here. + Err(Error::Network("channel closed".into())) + } + } + + /// Wait and terminate the sender + pub async fn terminate(&mut self) -> Result<()> { + self.state.terminated(); + self.wait_handle().await + } + + /// Send all in once, and terminate sender after + pub async fn send_all(mut self, chunk: Bytes) -> Result<()> { + self.send_chunk(chunk).await?; + self.terminate().await + } + + #[cold] + #[track_caller] + #[inline(never)] + pub fn init(&mut self) -> Result<()> { + match self.state { + RowsSenderState::Active { .. } => return Ok(()), + RowsSenderState::NotStarted { .. } => {} + _ => panic!("write() after error"), + }; + + let (client, sql) = self.state.client_with_sql().unwrap(); // checked above + + let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?; + let mut pairs = url.query_pairs_mut(); + pairs.clear(); + + if let Some(database) = &client.database { + pairs.append_pair("database", database); + } + + pairs.append_pair("query", sql); + + if client.compression.is_lz4() { + pairs.append_pair("decompress", "1"); + } + + for (name, value) in &client.options { + pairs.append_pair(name, value); + } + + drop(pairs); + + let mut builder = Request::post(url.as_str()); + builder = with_request_headers(builder, &client.headers, &client.products_info); + builder = with_authentication(builder, &client.authentication); + + let (sender, body) = RequestBody::chunked(); + + let request = builder + .body(body) + .map_err(|err| Error::InvalidParams(Box::new(err)))?; + + let future = client.http.request(request); + // TODO: introduce `Executor` to allow bookkeeping of spawned tasks. + let handle = + tokio::spawn(async move { Response::new(future, Compression::None).finish().await }); + + self.state = RowsSenderState::Active { handle, sender }; + Ok(()) + } + + /// Sets timeouts for different operations. + /// + /// `send_timeout` restricts time on sending a data chunk to a socket. + /// `None` disables the timeout, it's a default. + /// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`. + /// + /// `end_timeout` restricts time on waiting for a response from the CH + /// server. Thus, it includes all work needed to handle `INSERT` by the + /// CH server, e.g. handling all materialized views and so on. + /// `None` disables the timeout, it's a default. + /// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`. + /// + /// These timeouts are much more performant (~x10) than wrapping `write()` + /// and `end()` calls into `tokio::time::timeout()`. + pub fn with_timeouts( + mut self, + send_timeout: Option, + end_timeout: Option, + ) -> Self { + self.set_timeouts(send_timeout, end_timeout); + self + } + + /// Similar to [`Client::with_option`], but for this particular INSERT + /// statement only. + /// + /// # Panics + /// If called after the request is started, e.g., after [`Insert::write`]. + pub fn with_option(mut self, name: impl Into, value: impl Into) -> Self { + self.set_option(name, value); + self + } + + #[track_caller] + pub(crate) fn set_option(&mut self, name: impl Into, value: impl Into) { + self.state.set_option(name, value); + } + + pub(crate) fn set_timeouts( + &mut self, + send_timeout: Option, + end_timeout: Option, + ) { + self.send_timeout = send_timeout; + self.end_timeout = end_timeout; + } + + async fn wait_handle(&mut self) -> Result<()> { + match self.state.handle() { + Some(handle) => { + let result = match timeout!(self, end_timeout, &mut *handle) { + Some(Ok(res)) => res, + Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()), + Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))), + None => { + // We can do nothing useful here, so just shut down the background task. + handle.abort(); + Err(Error::TimedOut) + } + }; + self.state = RowsSenderState::Completed; + result + } + _ => Ok(()), + } + } + + fn abort(&mut self) { + if let Some(sender) = self.state.sender() { + sender.abort(); + } + } +} + +impl Drop for RowsSender { + fn drop(&mut self) { + self.abort(); + } +} + +pub(crate) enum RowsSenderState { NotStarted { client: Box, sql: String, @@ -62,60 +324,60 @@ enum InsertState { Completed, } -impl InsertState { +impl RowsSenderState { fn sender(&mut self) -> Option<&mut ChunkSender> { match self { - InsertState::Active { sender, .. } => Some(sender), + RowsSenderState::Active { sender, .. } => Some(sender), _ => None, } } fn handle(&mut self) -> Option<&mut JoinHandle>> { match self { - InsertState::Active { handle, .. } | InsertState::Terminated { handle } => Some(handle), + RowsSenderState::Active { handle, .. } | RowsSenderState::Terminated { handle } => { + Some(handle) + } _ => None, } } fn client_with_sql(&self) -> Option<(&Client, &str)> { match self { - InsertState::NotStarted { client, sql } => Some((client, sql)), + RowsSenderState::NotStarted { client, sql } => Some((client, sql)), _ => None, } } fn terminated(&mut self) { replace_with_or_abort(self, |_self| match _self { - InsertState::NotStarted { .. } => InsertState::Completed, // empty insert - InsertState::Active { handle, .. } => InsertState::Terminated { handle }, + RowsSenderState::NotStarted { .. } => RowsSenderState::Completed, // empty insert + RowsSenderState::Active { handle, .. } => RowsSenderState::Terminated { handle }, _ => unreachable!(), }); } - fn with_option(&mut self, name: impl Into, value: impl Into) { - assert!(matches!(self, InsertState::NotStarted { .. })); + fn set_option(&mut self, name: impl Into, value: impl Into) { + assert!(matches!(self, RowsSenderState::NotStarted { .. })); replace_with_or_abort(self, |_self| match _self { - InsertState::NotStarted { mut client, sql } => { + RowsSenderState::NotStarted { mut client, sql } => { client.add_option(name, value); - InsertState::NotStarted { client, sql } + RowsSenderState::NotStarted { client, sql } } _ => unreachable!(), }); } } -// It should be a regular function, but it decreases performance. -macro_rules! timeout { - ($self:expr, $timeout:ident, $fut:expr) => {{ - if let Some(timeout) = $self.$timeout { - $self.sleep.as_mut().reset(Instant::now() + timeout); - } - - tokio::select! { - res = $fut => Some(res), - _ = &mut $self.sleep, if $self.$timeout.is_some() => None, - } - }}; +/// Performs one `INSERT`. +/// +/// The [`Insert::end`] must be called to finalize the `INSERT`. +/// Otherwise, the whole `INSERT` will be aborted. +/// +/// Rows are being sent progressively to spread network load. +#[must_use] +pub struct Insert { + builder: RowsBuilder, + sender: RowsSender, } impl Insert { @@ -124,26 +386,14 @@ impl Insert { where T: Row, { - let fields = row::join_column_names::() - .expect("the row type must be a struct or a wrapper around it"); + let sender = RowsSender::new(client, table); - // TODO: what about escaping a table name? - // https://clickhouse.com/docs/en/sql-reference/syntax#identifiers - let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields); + #[cfg(feature = "lz4")] + let builder = RowsBuilder::new(client.compression); + #[cfg(not(feature = "lz4"))] + let builder = RowsBuilder::new(); - Ok(Self { - state: InsertState::NotStarted { - client: Box::new(client.clone()), - sql, - }, - buffer: BytesMut::with_capacity(BUFFER_SIZE), - #[cfg(feature = "lz4")] - compression: client.compression, - send_timeout: None, - end_timeout: None, - sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))), - _marker: PhantomData, - }) + Ok(Self { sender, builder }) } /// Sets timeouts for different operations. @@ -169,6 +419,14 @@ impl Insert { self } + pub(crate) fn set_timeouts( + &mut self, + send_timeout: Option, + end_timeout: Option, + ) { + self.sender.set_timeouts(send_timeout, end_timeout); + } + /// Similar to [`Client::with_option`], but for this particular INSERT /// statement only. /// @@ -176,19 +434,10 @@ impl Insert { /// If called after the request is started, e.g., after [`Insert::write`]. #[track_caller] pub fn with_option(mut self, name: impl Into, value: impl Into) -> Self { - self.state.with_option(name, value); + self.sender.set_option(name, value); self } - pub(crate) fn set_timeouts( - &mut self, - send_timeout: Option, - end_timeout: Option, - ) { - self.send_timeout = send_timeout; - self.end_timeout = end_timeout; - } - /// Serializes the provided row into an internal buffer. /// Once the buffer is full, it's sent to a background task writing to the /// socket. @@ -214,8 +463,9 @@ impl Insert { async move { result?; - if self.buffer.len() >= MIN_CHUNK_SIZE { - self.send_chunk().await?; + if self.builder.len() >= MIN_CHUNK_SIZE { + let chunk = self.builder.take_and_prepare_chunk()?; + self.sender.send_chunk(chunk).await?; } Ok(()) } @@ -226,21 +476,14 @@ impl Insert { where T: Serialize, { - match self.state { - InsertState::NotStarted { .. } => self.init_request(), - InsertState::Active { .. } => Ok(()), - _ => panic!("write() after error"), - }?; - - let old_buf_size = self.buffer.len(); - let result = rowbinary::serialize_into(&mut self.buffer, row); - let written = self.buffer.len() - old_buf_size; + // Lazily init sender after first + self.sender.init()?; + let result = self.builder.add_row(row); if result.is_err() { - self.abort(); + self.sender.abort(); } - - result.and(Ok(written)) + result } /// Ends `INSERT`, the server starts processing the data. @@ -250,135 +493,10 @@ impl Insert { /// /// NOTE: If it isn't called, the whole `INSERT` is aborted. pub async fn end(mut self) -> Result<()> { - if !self.buffer.is_empty() { - self.send_chunk().await?; + if !self.builder.is_empty() { + let chunk = self.builder.take_and_prepare_chunk()?; + self.sender.send_chunk(chunk).await?; } - self.state.terminated(); - self.wait_handle().await - } - - async fn send_chunk(&mut self) -> Result<()> { - debug_assert!(matches!(self.state, InsertState::Active { .. })); - - // Hyper uses non-trivial and inefficient schema of buffering chunks. - // It's difficult to determine when allocations occur. - // So, instead we control it manually here and rely on the system allocator. - let chunk = self.take_and_prepare_chunk()?; - - let sender = self.state.sender().unwrap(); // checked above - - let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) { - Some(true) => return Ok(()), - Some(false) => false, // an actual error will be returned from `wait_handle` - None => true, - }; - - // Error handling. - - self.abort(); - - // TODO: is it required to wait the handle in the case of timeout? - let res = self.wait_handle().await; - - if is_timed_out { - Err(Error::TimedOut) - } else { - res?; // a real error should be here. - Err(Error::Network("channel closed".into())) - } - } - - async fn wait_handle(&mut self) -> Result<()> { - match self.state.handle() { - Some(handle) => { - let result = match timeout!(self, end_timeout, &mut *handle) { - Some(Ok(res)) => res, - Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()), - Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))), - None => { - // We can do nothing useful here, so just shut down the background task. - handle.abort(); - Err(Error::TimedOut) - } - }; - self.state = InsertState::Completed; - result - } - _ => Ok(()), - } - } - - #[cfg(feature = "lz4")] - fn take_and_prepare_chunk(&mut self) -> Result { - Ok(if self.compression.is_lz4() { - let compressed = crate::compression::lz4::compress(&self.buffer)?; - self.buffer.clear(); - compressed - } else { - mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze() - }) - } - - #[cfg(not(feature = "lz4"))] - fn take_and_prepare_chunk(&mut self) -> Result { - Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze()) - } - - #[cold] - #[track_caller] - #[inline(never)] - fn init_request(&mut self) -> Result<()> { - debug_assert!(matches!(self.state, InsertState::NotStarted { .. })); - let (client, sql) = self.state.client_with_sql().unwrap(); // checked above - - let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?; - let mut pairs = url.query_pairs_mut(); - pairs.clear(); - - if let Some(database) = &client.database { - pairs.append_pair("database", database); - } - - pairs.append_pair("query", sql); - - if client.compression.is_lz4() { - pairs.append_pair("decompress", "1"); - } - - for (name, value) in &client.options { - pairs.append_pair(name, value); - } - - drop(pairs); - - let mut builder = Request::post(url.as_str()); - builder = with_request_headers(builder, &client.headers, &client.products_info); - builder = with_authentication(builder, &client.authentication); - - let (sender, body) = RequestBody::chunked(); - - let request = builder - .body(body) - .map_err(|err| Error::InvalidParams(Box::new(err)))?; - - let future = client.http.request(request); - // TODO: introduce `Executor` to allow bookkeeping of spawned tasks. - let handle = - tokio::spawn(async move { Response::new(future, Compression::None).finish().await }); - - self.state = InsertState::Active { handle, sender }; - Ok(()) - } - - fn abort(&mut self) { - if let Some(sender) = self.state.sender() { - sender.abort(); - } - } -} - -impl Drop for Insert { - fn drop(&mut self) { - self.abort(); + self.sender.terminate().await } } diff --git a/src/lib.rs b/src/lib.rs index 72ba0000..88e3b116 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -308,9 +308,14 @@ impl Client { inserter::Inserter::new(self, table) } - /// Starts a new SELECT/DDL query. + /// Starts a new SELECT/DDL query with sql rendering pub fn query(&self, query: &str) -> query::Query { - query::Query::new(self, query) + query::Query::new(self, query, true) + } + + /// Starts a new SELECT/DDL query with a raw sql + pub fn query_with_raw_sql(&self, query: &str) -> query::Query { + query::Query::new(self, query, false) } /// Starts a new WATCH query. diff --git a/src/query.rs b/src/query.rs index 374eebb9..e75206f2 100644 --- a/src/query.rs +++ b/src/query.rs @@ -26,10 +26,10 @@ pub struct Query { } impl Query { - pub(crate) fn new(client: &Client, template: &str) -> Self { + pub(crate) fn new(client: &Client, template: &str, need_render: bool) -> Self { Self { client: client.clone(), - sql: SqlBuilder::new(template), + sql: SqlBuilder::new_with_need_render(template, need_render), } } diff --git a/src/sql/mod.rs b/src/sql/mod.rs index d4a7e3b8..03c03d66 100644 --- a/src/sql/mod.rs +++ b/src/sql/mod.rs @@ -47,24 +47,31 @@ impl fmt::Display for SqlBuilder { } impl SqlBuilder { + #[cfg(test)] pub(crate) fn new(template: &str) -> Self { + Self::new_with_need_render(template, true) + } + + pub(crate) fn new_with_need_render(template: &str, need_render: bool) -> Self { let mut parts = Vec::new(); let mut rest = template; - while let Some(idx) = rest.find('?') { - if rest[idx + 1..].starts_with('?') { - parts.push(Part::Text(rest[..idx + 1].to_string())); - rest = &rest[idx + 2..]; - continue; - } else if idx != 0 { - parts.push(Part::Text(rest[..idx].to_string())); - } + if need_render { + while let Some(idx) = rest.find('?') { + if rest[idx + 1..].starts_with('?') { + parts.push(Part::Text(rest[..idx + 1].to_string())); + rest = &rest[idx + 2..]; + continue; + } else if idx != 0 { + parts.push(Part::Text(rest[..idx].to_string())); + } - rest = &rest[idx + 1..]; - if let Some(restfields) = rest.strip_prefix("fields") { - parts.push(Part::Fields); - rest = restfields; - } else { - parts.push(Part::Arg); + rest = &rest[idx + 1..]; + if let Some(restfields) = rest.strip_prefix("fields") { + parts.push(Part::Fields); + rest = restfields; + } else { + parts.push(Part::Arg); + } } }