diff --git a/src/chunked.rs b/src/chunked.rs index 2bb30b90..2e71ce5c 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -28,8 +28,8 @@ use futures::StreamExt; use crate::path::Path; use crate::{ - GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutResult, + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutResult, }; use crate::{PutPayload, Result}; @@ -85,57 +85,43 @@ impl ObjectStore for ChunkedStore { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let r = self.inner.get_opts(location, options).await?; - let stream = match r.payload { - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - GetResultPayload::File(file, path) => { - crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size) - } - GetResultPayload::Stream(stream) => { - let buffer = BytesMut::new(); - futures::stream::unfold( - (stream, buffer, false, self.chunk_size), - |(mut stream, mut buffer, mut exhausted, chunk_size)| async move { - // Keep accumulating bytes until we reach capacity as long as - // the stream can provide them: - if exhausted { - return None; + let buffer = BytesMut::new(); + let payload = futures::stream::unfold( + (r.payload, buffer, false, self.chunk_size), + |(mut stream, mut buffer, mut exhausted, chunk_size)| async move { + // Keep accumulating bytes until we reach capacity as long as + // the stream can provide them: + if exhausted { + return None; + } + while buffer.len() < chunk_size { + match stream.next().await { + None => { + exhausted = true; + let slice = buffer.split_off(0).freeze(); + return Some((Ok(slice), (stream, buffer, exhausted, chunk_size))); } - while buffer.len() < chunk_size { - match stream.next().await { - None => { - exhausted = true; - let slice = buffer.split_off(0).freeze(); - return Some(( - Ok(slice), - (stream, buffer, exhausted, chunk_size), - )); - } - Some(Ok(bytes)) => { - buffer.put(bytes); - } - Some(Err(e)) => { - return Some(( - Err(crate::Error::Generic { - store: "ChunkedStore", - source: Box::new(e), - }), - (stream, buffer, exhausted, chunk_size), - )) - } - }; + Some(Ok(bytes)) => { + buffer.put(bytes); } - // Return the chunked values as the next value in the stream - let slice = buffer.split_to(chunk_size).freeze(); - Some((Ok(slice), (stream, buffer, exhausted, chunk_size))) - }, - ) - .boxed() - } - }; - Ok(GetResult { - payload: GetResultPayload::Stream(stream), - ..r - }) + Some(Err(e)) => { + return Some(( + Err(crate::Error::Generic { + store: "ChunkedStore", + source: Box::new(e), + }), + (stream, buffer, exhausted, chunk_size), + )) + } + }; + } + // Return the chunked values as the next value in the stream + let slice = buffer.split_to(chunk_size).freeze(); + Some((Ok(slice), (stream, buffer, exhausted, chunk_size))) + }, + ) + .boxed(); + Ok(GetResult { payload, ..r }) } async fn get_range(&self, location: &Path, range: Range) -> Result { @@ -196,10 +182,7 @@ mod tests { for chunk_size in [10, 20, 31] { let store = ChunkedStore::new(Arc::clone(&store), chunk_size); - let mut s = match store.get(&location).await.unwrap().payload { - GetResultPayload::Stream(s) => s, - _ => unreachable!(), - }; + let mut s = store.get(&location).await.unwrap().payload; let mut remaining = 1001; while let Some(next) = s.next().await { diff --git a/src/client/get.rs b/src/client/get.rs index 51d4e1bf..8e5b393d 100644 --- a/src/client/get.rs +++ b/src/client/get.rs @@ -20,8 +20,7 @@ use crate::client::retry::RetryContext; use crate::client::{HttpResponse, HttpResponseBody}; use crate::path::Path; use crate::{ - Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ObjectMeta, Result, - RetryConfig, + Attribute, Attributes, GetOptions, GetRange, GetResult, ObjectMeta, Result, RetryConfig, }; use async_trait::async_trait; use bytes::Bytes; @@ -185,10 +184,10 @@ impl GetContext { .map_err(Self::err)?; let attributes = get_attributes(T::HEADER_CONFIG, &parts.headers).map_err(Self::err)?; - let stream = self.retry_stream(body, meta.e_tag.clone(), range.clone()); + let payload = self.retry_stream(body, meta.e_tag.clone(), range.clone()); Ok(GetResult { - payload: GetResultPayload::Stream(stream), + payload, meta, range, attributes, diff --git a/src/lib.rs b/src/lib.rs index 80e91d72..4cc172c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,6 +111,8 @@ //! //! [`BufReader`]: buffered::BufReader //! [`BufWriter`]: buffered::BufWriter +//! [`Read`]: std::io::Read +//! [`Seek`]: std::io::Seek //! //! # Adapters //! @@ -238,7 +240,7 @@ //! assert_eq!(object.len() as u64, meta.size); //! //! // Alternatively stream the bytes from object storage -//! let stream = object_store.get(&path).await.unwrap().into_stream(); +//! let stream = object_store.get(&path).await.unwrap().payload; //! //! // Count the '0's using `try_fold` from `TryStreamExt` trait //! let num_zeros = stream @@ -575,8 +577,6 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use std::fmt::{Debug, Formatter}; -#[cfg(all(feature = "fs", not(target_arch = "wasm32")))] -use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; @@ -1035,10 +1035,9 @@ impl GetOptions { } /// Result for a get request -#[derive(Debug)] pub struct GetResult { - /// The [`GetResultPayload`] - pub payload: GetResultPayload, + /// The payload. + pub payload: BoxStream<'static, Result>, /// The [`ObjectMeta`] for this object pub meta: ObjectMeta, /// The range of bytes returned by this request @@ -1049,25 +1048,21 @@ pub struct GetResult { pub attributes: Attributes, } -/// The kind of a [`GetResult`] -/// -/// This special cases the case of a local file, as some systems may -/// be able to optimise the case of a file already present on local disk -pub enum GetResultPayload { - /// The file, path - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - File(std::fs::File, std::path::PathBuf), - /// An opaque stream of bytes - Stream(BoxStream<'static, Result>), -} - -impl Debug for GetResultPayload { +impl std::fmt::Debug for GetResult { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - Self::File(_, _) => write!(f, "GetResultPayload(File)"), - Self::Stream(_) => write!(f, "GetResultPayload(Stream)"), - } + let Self { + payload: _, + meta, + range, + attributes, + } = self; + + f.debug_struct("GetResult") + .field("payload", &"") + .field("meta", meta) + .field("range", range) + .field("attributes", attributes) + .finish() } } @@ -1075,56 +1070,7 @@ impl GetResult { /// Collects the data into a [`Bytes`] pub async fn bytes(self) -> Result { let len = self.range.end - self.range.start; - match self.payload { - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - GetResultPayload::File(mut file, path) => { - maybe_spawn_blocking(move || { - file.seek(SeekFrom::Start(self.range.start as _)) - .map_err(|source| local::Error::Seek { - source, - path: path.clone(), - })?; - - let mut buffer = if let Ok(len) = len.try_into() { - Vec::with_capacity(len) - } else { - Vec::new() - }; - file.take(len as _) - .read_to_end(&mut buffer) - .map_err(|source| local::Error::UnableToReadBytes { source, path })?; - - Ok(buffer.into()) - }) - .await - } - GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await, - } - } - - /// Converts this into a byte stream - /// - /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file, - /// otherwise will return the [`GetResultPayload::Stream`]. - /// - /// # Tokio Compatibility - /// - /// Tokio discourages performing blocking IO on a tokio worker thread, however, - /// no major operating systems have stable async file APIs. Therefore if called from - /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch - /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood. - /// - /// If not called from a tokio context, this will perform IO on the current thread with - /// no additional complexity or overheads - pub fn into_stream(self) -> BoxStream<'static, Result> { - match self.payload { - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - GetResultPayload::File(file, path) => { - const CHUNK_SIZE: usize = 8 * 1024; - local::chunked_stream(file, path, self.range, CHUNK_SIZE) - } - GetResultPayload::Stream(s) => s, - } + collect_bytes(self.payload, Some(len)).await } } diff --git a/src/limit.rs b/src/limit.rs index 330a0da0..24e87f9d 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -18,9 +18,8 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, Path, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt, - UploadPart, + BoxStream, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, Path, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -200,13 +199,7 @@ impl ObjectStore for LimitStore { } fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult { - let payload = match r.payload { - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - v @ GetResultPayload::File(_, _) => v, - GetResultPayload::Stream(s) => { - GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed()) - } - }; + let payload = PermitWrapper::new(r.payload, permit).boxed(); GetResult { payload, ..r } } diff --git a/src/local.rs b/src/local.rs index ccf6e34d..b58e009b 100644 --- a/src/local.rs +++ b/src/local.rs @@ -37,8 +37,8 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, util::InvalidGetRange, - Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, + Attributes, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; /// A specialized `Error` for filesystem object store-related errors @@ -414,8 +414,11 @@ impl ObjectStore for LocalFileSystem { None => 0..meta.size, }; + const CHUNK_SIZE: usize = 8 * 1024; + let payload = chunked_stream(file, path, range.clone(), CHUNK_SIZE); + Ok(GetResult { - payload: GetResultPayload::File(file, path), + payload, attributes: Attributes::default(), range, meta, diff --git a/src/memory.rs b/src/memory.rs index f03dbc6d..3dd9ee76 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -29,9 +29,9 @@ use parking_lot::RwLock; use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ - path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, - MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult, - Result, UpdateVersion, UploadPart, + path::Path, Attributes, GetRange, GetResult, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult, Result, + UpdateVersion, UploadPart, }; use crate::{GetOptions, PutPayload}; @@ -262,7 +262,7 @@ impl ObjectStore for InMemory { let stream = futures::stream::once(futures::future::ready(Ok(data))); Ok(GetResult { - payload: GetResultPayload::Stream(stream.boxed()), + payload: stream.boxed(), attributes: entry.attributes, meta, range, diff --git a/src/throttle.rs b/src/throttle.rs index efe29491..1acf407a 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -22,8 +22,8 @@ use std::{convert::TryInto, sync::Arc}; use crate::multipart::{MultipartStore, PartId}; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, + path::Path, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, }; use crate::{GetOptions, UploadPart}; use async_trait::async_trait; @@ -311,22 +311,12 @@ fn usize_to_u32_saturate(x: usize) -> u32 { } fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult { - #[allow(clippy::infallible_destructuring_match)] - let s = match result.payload { - GetResultPayload::Stream(s) => s, - #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] - GetResultPayload::File(_, _) => unimplemented!(), - }; - - let stream = throttle_stream(s, move |bytes| { + let payload = throttle_stream(result.payload, move |bytes| { let bytes_len: u32 = usize_to_u32_saturate(bytes.len()); wait_get_per_byte * bytes_len }); - GetResult { - payload: GetResultPayload::Stream(stream), - ..result - } + GetResult { payload, ..result } } fn throttle_stream( @@ -404,7 +394,7 @@ impl MultipartUpload for ThrottledUpload { #[cfg(test)] mod tests { use super::*; - use crate::{integration::*, memory::InMemory, GetResultPayload}; + use crate::{integration::*, memory::InMemory}; use futures::TryStreamExt; use tokio::time::Duration; use tokio::time::Instant; @@ -605,10 +595,7 @@ mod tests { let res = store.get(&path).await; if n_bytes.is_some() { // need to consume bytes to provoke sleep times - let s = match res.unwrap().payload { - GetResultPayload::Stream(s) => s, - GetResultPayload::File(_, _) => unimplemented!(), - }; + let s = res.unwrap().payload; s.map_ok(|b| bytes::BytesMut::from(&b[..])) .try_concat()