Skip to content

Commit 8c2710c

Browse files
committed
refactor: introduce ObjectStoreExt trait
See #385.
1 parent 70cd178 commit 8c2710c

File tree

13 files changed

+49
-42
lines changed

13 files changed

+49
-42
lines changed

src/aws/dynamo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ pub(crate) use tests::integration_test;
528528
mod tests {
529529
use super::*;
530530
use crate::aws::AmazonS3;
531-
use crate::ObjectStore;
531+
use crate::{ObjectStore, ObjectStoreExt};
532532
use rand::distr::Alphanumeric;
533533
use rand::{rng, Rng};
534534

src/aws/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ mod tests {
517517
use crate::integration::*;
518518
use crate::tests::*;
519519
use crate::ClientOptions;
520+
use crate::ObjectStoreExt;
520521
use base64::prelude::BASE64_STANDARD;
521522
use base64::Engine;
522523
use http::HeaderMap;

src/azure/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ mod tests {
309309
use super::*;
310310
use crate::integration::*;
311311
use crate::tests::*;
312+
use crate::ObjectStoreExt;
312313
use bytes::Bytes;
313314

314315
#[tokio::test]

src/buffered.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader {
210210

211211
/// An async buffered writer compatible with the tokio IO traits
212212
///
213-
/// This writer adaptively uses [`ObjectStore::put`] or
213+
/// This writer adaptively uses [`ObjectStore::put_opts`] or
214214
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
215215
/// been written.
216216
///
217217
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
218-
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
218+
/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will instead be
219219
/// streamed using [`ObjectStore::put_multipart`]
220220
pub struct BufWriter {
221221
capacity: usize,
@@ -242,7 +242,7 @@ enum BufWriterState {
242242
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
243243
/// Write to a multipart upload
244244
Write(Option<WriteMultipart>),
245-
/// [`ObjectStore::put`]
245+
/// [`ObjectStore::put_opts`]
246246
Flush(BoxFuture<'static, crate::Result<()>>),
247247
}
248248

@@ -489,7 +489,7 @@ mod tests {
489489
use super::*;
490490
use crate::memory::InMemory;
491491
use crate::path::Path;
492-
use crate::{Attribute, GetOptions};
492+
use crate::{Attribute, GetOptions, ObjectStoreExt};
493493
use itertools::Itertools;
494494
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
495495

src/chunked.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ mod tests {
185185
use crate::local::LocalFileSystem;
186186
use crate::memory::InMemory;
187187
use crate::path::Path;
188+
use crate::ObjectStoreExt;
188189

189190
use super::*;
190191

src/gcp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ mod test {
287287

288288
use crate::integration::*;
289289
use crate::tests::*;
290+
use crate::ObjectStoreExt;
290291

291292
use super::*;
292293

src/integration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::multipart::MultipartStore;
2929
use crate::path::Path;
3030
use crate::{
3131
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload,
32-
ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart,
32+
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart,
3333
};
3434
use bytes::Bytes;
3535
use futures::stream::FuturesUnordered;

src/lib.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@
252252
//!
253253
//! # Put Object
254254
//!
255-
//! Use the [`ObjectStore::put`] method to atomically write data.
255+
//! Use the [`ObjectStoreExt::put`] method to atomically write data.
256256
//!
257257
//! ```
258258
//! # use object_store::local::LocalFileSystem;
259-
//! # use object_store::{ObjectStore, PutPayload};
259+
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
260260
//! # use std::sync::Arc;
261261
//! # use object_store::path::Path;
262262
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
@@ -338,7 +338,7 @@
338338
//!
339339
//! ```
340340
//! # use object_store::local::LocalFileSystem;
341-
//! # use object_store::{ObjectStore, PutPayloadMut};
341+
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
342342
//! # use std::sync::Arc;
343343
//! # use bytes::Bytes;
344344
//! # use tokio::io::AsyncWriteExt;
@@ -575,6 +575,7 @@ use bytes::Bytes;
575575
use chrono::{DateTime, Utc};
576576
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
577577
use std::fmt::{Debug, Formatter};
578+
use std::future::Future;
578579
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
579580
use std::io::{Read, Seek, SeekFrom};
580581
use std::ops::Range;
@@ -587,19 +588,15 @@ pub type DynObjectStore = dyn ObjectStore;
587588
pub type MultipartId = String;
588589

589590
/// Universal API to multiple object store services.
591+
///
592+
/// For more convience methods, check [`ObjectStoreExt`].
590593
#[async_trait]
591594
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
592-
/// Save the provided bytes to the specified location
595+
/// Save the provided `payload` to `location` with the given options
593596
///
594597
/// The operation is guaranteed to be atomic, it will either successfully
595598
/// write the entirety of `payload` to `location`, or fail. No clients
596599
/// should be able to observe a partially written object
597-
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
598-
self.put_opts(location, payload, PutOptions::default())
599-
.await
600-
}
601-
602-
/// Save the provided `payload` to `location` with the given options
603600
async fn put_opts(
604601
&self,
605602
location: &Path,
@@ -609,7 +606,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
609606

610607
/// Perform a multipart upload
611608
///
612-
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
609+
/// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
613610
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
614611
///
615612
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
@@ -620,7 +617,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
620617

621618
/// Perform a multipart upload with options
622619
///
623-
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
620+
/// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
624621
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
625622
///
626623
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
@@ -696,7 +693,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
696693
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
697694
/// # let root = tempfile::TempDir::new().unwrap();
698695
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
699-
/// # use object_store::{ObjectStore, ObjectMeta};
696+
/// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
700697
/// # use object_store::path::Path;
701698
/// # use futures::{StreamExt, TryStreamExt};
702699
/// #
@@ -803,10 +800,6 @@ macro_rules! as_ref_impl {
803800
($type:ty) => {
804801
#[async_trait]
805802
impl ObjectStore for $type {
806-
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
807-
self.as_ref().put(location, payload).await
808-
}
809-
810803
async fn put_opts(
811804
&self,
812805
location: &Path,
@@ -901,6 +894,30 @@ macro_rules! as_ref_impl {
901894
as_ref_impl!(Arc<dyn ObjectStore>);
902895
as_ref_impl!(Box<dyn ObjectStore>);
903896

897+
/// Extension trait for [`ObjectStore`] with convinience functions.
898+
pub trait ObjectStoreExt {
899+
/// Save the provided bytes to the specified location
900+
///
901+
/// The operation is guaranteed to be atomic, it will either successfully
902+
/// write the entirety of `payload` to `location`, or fail. No clients
903+
/// should be able to observe a partially written object
904+
fn put(
905+
&self,
906+
location: &Path,
907+
payload: PutPayload,
908+
) -> impl Future<Output = Result<PutResult>> + Send;
909+
}
910+
911+
impl<T> ObjectStoreExt for T
912+
where
913+
T: ObjectStore + ?Sized,
914+
{
915+
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
916+
self.put_opts(location, payload, PutOptions::default())
917+
.await
918+
}
919+
}
920+
904921
/// Result of a list call that includes objects, prefixes (directories) and a
905922
/// token for the next set of results. Individual result sets may be limited to
906923
/// 1,000 objects based on the underlying object storage's limitations.

src/limit.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
7171

7272
#[async_trait]
7373
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
74-
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
75-
let _permit = self.semaphore.acquire().await.unwrap();
76-
self.inner.put(location, payload).await
77-
}
78-
7974
async fn put_opts(
8075
&self,
8176
location: &Path,

src/local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,7 @@ mod tests {
10891089
#[cfg(target_family = "unix")]
10901090
use tempfile::NamedTempFile;
10911091

1092-
use crate::integration::*;
1092+
use crate::{integration::*, ObjectStoreExt};
10931093

10941094
use super::*;
10951095

0 commit comments

Comments
 (0)