Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 38 additions & 55 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use futures::StreamExt;

use crate::path::Path;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutResult,
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
PutOptions, PutResult,
};
use crate::{PutPayload, Result};

Expand Down Expand Up @@ -85,57 +85,43 @@ impl ObjectStore for ChunkedStore {

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let r = self.inner.get_opts(location, options).await?;
let stream = match r.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
}
GetResultPayload::Stream(stream) => {
let buffer = BytesMut::new();
futures::stream::unfold(
(stream, buffer, false, self.chunk_size),
|(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
// Keep accumulating bytes until we reach capacity as long as
// the stream can provide them:
if exhausted {
return None;
let buffer = BytesMut::new();
let payload = futures::stream::unfold(
(r.payload, buffer, false, self.chunk_size),
|(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
// Keep accumulating bytes until we reach capacity as long as
// the stream can provide them:
if exhausted {
return None;
}
while buffer.len() < chunk_size {
match stream.next().await {
None => {
exhausted = true;
let slice = buffer.split_off(0).freeze();
return Some((Ok(slice), (stream, buffer, exhausted, chunk_size)));
}
while buffer.len() < chunk_size {
match stream.next().await {
None => {
exhausted = true;
let slice = buffer.split_off(0).freeze();
return Some((
Ok(slice),
(stream, buffer, exhausted, chunk_size),
));
}
Some(Ok(bytes)) => {
buffer.put(bytes);
}
Some(Err(e)) => {
return Some((
Err(crate::Error::Generic {
store: "ChunkedStore",
source: Box::new(e),
}),
(stream, buffer, exhausted, chunk_size),
))
}
};
Some(Ok(bytes)) => {
buffer.put(bytes);
}
// Return the chunked values as the next value in the stream
let slice = buffer.split_to(chunk_size).freeze();
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
},
)
.boxed()
}
};
Ok(GetResult {
payload: GetResultPayload::Stream(stream),
..r
})
Some(Err(e)) => {
return Some((
Err(crate::Error::Generic {
store: "ChunkedStore",
source: Box::new(e),
}),
(stream, buffer, exhausted, chunk_size),
))
}
};
}
// Return the chunked values as the next value in the stream
let slice = buffer.split_to(chunk_size).freeze();
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
},
)
.boxed();
Ok(GetResult { payload, ..r })
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
Expand Down Expand Up @@ -196,10 +182,7 @@ mod tests {

for chunk_size in [10, 20, 31] {
let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
let mut s = match store.get(&location).await.unwrap().payload {
GetResultPayload::Stream(s) => s,
_ => unreachable!(),
};
let mut s = store.get(&location).await.unwrap().payload;

let mut remaining = 1001;
while let Some(next) = s.next().await {
Expand Down
7 changes: 3 additions & 4 deletions src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use crate::client::retry::RetryContext;
use crate::client::{HttpResponse, HttpResponseBody};
use crate::path::Path;
use crate::{
Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ObjectMeta, Result,
RetryConfig,
Attribute, Attributes, GetOptions, GetRange, GetResult, ObjectMeta, Result, RetryConfig,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -185,10 +184,10 @@ impl<T: GetClient> GetContext<T> {
.map_err(Self::err)?;

let attributes = get_attributes(T::HEADER_CONFIG, &parts.headers).map_err(Self::err)?;
let stream = self.retry_stream(body, meta.e_tag.clone(), range.clone());
let payload = self.retry_stream(body, meta.e_tag.clone(), range.clone());

Ok(GetResult {
payload: GetResultPayload::Stream(stream),
payload,
meta,
range,
attributes,
Expand Down
94 changes: 20 additions & 74 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
//!
//! [`BufReader`]: buffered::BufReader
//! [`BufWriter`]: buffered::BufWriter
//! [`Read`]: std::io::Read
//! [`Seek`]: std::io::Seek
//!
//! # Adapters
//!
Expand Down Expand Up @@ -238,7 +240,7 @@
//! assert_eq!(object.len() as u64, meta.size);
//!
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
//! let stream = object_store.get(&path).await.unwrap().payload;
//!
//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
Expand Down Expand Up @@ -575,8 +577,6 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use std::fmt::{Debug, Formatter};
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -1035,10 +1035,9 @@ impl GetOptions {
}

/// Result for a get request
#[derive(Debug)]
pub struct GetResult {
/// The [`GetResultPayload`]
pub payload: GetResultPayload,
/// The payload.
pub payload: BoxStream<'static, Result<Bytes>>,
/// The [`ObjectMeta`] for this object
pub meta: ObjectMeta,
/// The range of bytes returned by this request
Expand All @@ -1049,82 +1048,29 @@ pub struct GetResult {
pub attributes: Attributes,
}

/// The kind of a [`GetResult`]
///
/// This special cases the case of a local file, as some systems may
/// be able to optimise the case of a file already present on local disk
pub enum GetResultPayload {
/// The file, path
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
File(std::fs::File, std::path::PathBuf),
/// An opaque stream of bytes
Stream(BoxStream<'static, Result<Bytes>>),
}

impl Debug for GetResultPayload {
impl std::fmt::Debug for GetResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
Self::File(_, _) => write!(f, "GetResultPayload(File)"),
Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
}
let Self {
payload: _,
meta,
range,
attributes,
} = self;

f.debug_struct("GetResult")
.field("payload", &"<STREAM>")
.field("meta", meta)
.field("range", range)
.field("attributes", attributes)
.finish()
}
}

impl GetResult {
/// Collects the data into a [`Bytes`]
pub async fn bytes(self) -> Result<Bytes> {
let len = self.range.end - self.range.start;
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || {
file.seek(SeekFrom::Start(self.range.start as _))
.map_err(|source| local::Error::Seek {
source,
path: path.clone(),
})?;

let mut buffer = if let Ok(len) = len.try_into() {
Vec::with_capacity(len)
} else {
Vec::new()
};
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;

Ok(buffer.into())
})
.await
}
GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
}
}

/// Converts this into a byte stream
///
/// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
/// otherwise will return the [`GetResultPayload::Stream`].
///
/// # Tokio Compatibility
///
/// Tokio discourages performing blocking IO on a tokio worker thread, however,
/// no major operating systems have stable async file APIs. Therefore if called from
/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
///
/// If not called from a tokio context, this will perform IO on the current thread with
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
}
collect_bytes(self.payload, Some(len)).await
}
}

Expand Down
13 changes: 3 additions & 10 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
//! An object store that limits the maximum concurrency of the wrapped implementation

use crate::{
BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, Path, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt,
UploadPart,
BoxStream, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, Path,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -200,13 +199,7 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
}

fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult {
let payload = match r.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
v @ GetResultPayload::File(_, _) => v,
GetResultPayload::Stream(s) => {
GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed())
}
};
let payload = PermitWrapper::new(r.payload, permit).boxed();
GetResult { payload, ..r }
}

Expand Down
9 changes: 6 additions & 3 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
Attributes, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
};

/// A specialized `Error` for filesystem object store-related errors
Expand Down Expand Up @@ -414,8 +414,11 @@ impl ObjectStore for LocalFileSystem {
None => 0..meta.size,
};

const CHUNK_SIZE: usize = 8 * 1024;
let payload = chunked_stream(file, path, range.clone(), CHUNK_SIZE);

Ok(GetResult {
payload: GetResultPayload::File(file, path),
payload,
attributes: Attributes::default(),
range,
meta,
Expand Down
8 changes: 4 additions & 4 deletions src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use parking_lot::RwLock;
use crate::multipart::{MultipartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult,
Result, UpdateVersion, UploadPart,
path::Path, Attributes, GetRange, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult, Result,
UpdateVersion, UploadPart,
};
use crate::{GetOptions, PutPayload};

Expand Down Expand Up @@ -262,7 +262,7 @@ impl ObjectStore for InMemory {
let stream = futures::stream::once(futures::future::ready(Ok(data)));

Ok(GetResult {
payload: GetResultPayload::Stream(stream.boxed()),
payload: stream.boxed(),
attributes: entry.attributes,
meta,
range,
Expand Down
Loading