diff --git a/Cargo.lock b/Cargo.lock index 3032811869e40..7131d2fa4cfe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4627,6 +4627,7 @@ dependencies = [ "databend-storages-common-stage", "databend-storages-common-table-meta", "enum-as-inner", + "futures", "jsonb", "lexical-core", "log", diff --git a/src/query/storages/stage/Cargo.toml b/src/query/storages/stage/Cargo.toml index 9a0852c1501f8..8d4d9d84d29b4 100644 --- a/src/query/storages/stage/Cargo.toml +++ b/src/query/storages/stage/Cargo.toml @@ -34,6 +34,7 @@ databend-common-storages-parquet = { workspace = true } databend-storages-common-stage = { workspace = true } databend-storages-common-table-meta = { workspace = true } enum-as-inner = { workspace = true } +futures = { workspace = true } jsonb = { workspace = true } lexical-core = { workspace = true } log = { workspace = true } diff --git a/src/query/storages/stage/src/read/row_based/processors/reader.rs b/src/query/storages/stage/src/read/row_based/processors/reader.rs index e82e149718788..5fbc22f841d53 100644 --- a/src/query/storages/stage/src/read/row_based/processors/reader.rs +++ b/src/query/storages/stage/src/read/row_based/processors/reader.rs @@ -23,6 +23,8 @@ use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_sources::PrefetchAsyncSource; use databend_storages_common_stage::SingleFilePartition; +use futures::AsyncRead; +use futures::AsyncReadExt; use log::debug; use opendal::Operator; @@ -30,7 +32,7 @@ use crate::read::row_based::batch::BytesBatch; struct FileState { file: SingleFilePartition, - reader: opendal::Reader, + reader: opendal::FuturesAsyncReader, offset: usize, } @@ -60,15 +62,20 @@ impl BytesReader { pub async fn read_batch(&mut self) -> Result { if let Some(state) = &mut self.file_state { - let end = state.file.size.min(self.read_batch_size + state.offset) as u64; - let buffer = state.reader.read(state.offset as u64..end).await?.to_vec(); - let n = buffer.len(); + let end = state.file.size.min(self.read_batch_size + state.offset); + let mut buffer = vec![0u8; end - state.offset]; + let n = read_full(&mut state.reader, &mut buffer[..]).await?; + + // let end = state.file.size.min(self.read_batch_size + state.offset) as u64; + // let buffer = state.reader.read(state.offset as u64..end).await?.to_vec(); + // let n = buffer.len(); if n == 0 { return Err(ErrorCode::BadBytes(format!( "Unexpected EOF {} expect {} bytes, read only {} bytes.", state.file.path, state.file.size, state.offset ))); }; + buffer.truncate(n); Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, n); self.table_ctx @@ -116,7 +123,12 @@ impl PrefetchAsyncSource for BytesReader { }; let file = SingleFilePartition::from_part(&part)?.clone(); - let reader = self.op.reader(&file.path).await?; + let reader = self + .op + .reader(&file.path) + .await? + .into_futures_async_read(0..file.size as u64) + .await?; self.file_state = Some(FileState { file, @@ -130,3 +142,18 @@ impl PrefetchAsyncSource for BytesReader { } } } + +#[async_backtrace::framed] +pub async fn read_full(reader: &mut R, buf: &mut [u8]) -> Result { + let mut buf = &mut buf[0..]; + let mut n = 0; + while !buf.is_empty() { + let read = reader.read(buf).await?; + if read == 0 { + break; + } + n += read; + buf = &mut buf[read..] + } + Ok(n) +}