Skip to content

Commit acc8cc4

Browse files
committed
AWS: support copies >5GB using multipart copy
1 parent 12ef9bc commit acc8cc4

File tree

4 files changed

+172
-44
lines changed

4 files changed

+172
-44
lines changed

src/aws/builder.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ use url::Url;
4242
/// Default metadata endpoint
4343
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";
4444

45+
/// AWS S3 does not support copy operations larger than 5 GiB in a single request.
46+
/// https://docs.aws.amazon.com/AmazonS3/latest/userguide/copy-object.html
47+
const MAX_SINGLE_REQUEST_COPY_SIZE: u64 = 5 * 1024 * 1024 * 1024;
48+
4549
/// A specialized `Error` for object store-related errors
4650
#[derive(Debug, thiserror::Error)]
4751
enum Error {
@@ -189,6 +193,10 @@ pub struct AmazonS3Builder {
189193
request_payer: ConfigValue<bool>,
190194
/// The [`HttpConnector`] to use
191195
http_connector: Option<Arc<dyn HttpConnector>>,
196+
/// Threshold (bytes) above which copy uses multipart copy. If not set, defaults to 5 GiB.
197+
multipart_copy_threshold: Option<ConfigValue<u64>>,
198+
/// Preferred multipart copy part size (bytes). If not set, defaults to 5 GiB.
199+
multipart_copy_part_size: Option<ConfigValue<u64>>,
192200
}
193201

194202
/// Configuration keys for [`AmazonS3Builder`]
@@ -423,6 +431,10 @@ pub enum AmazonS3ConfigKey {
423431

424432
/// Encryption options
425433
Encryption(S3EncryptionConfigKey),
434+
/// Threshold (bytes) to switch to multipart copy
435+
MultipartCopyThreshold,
436+
/// Preferred multipart copy part size (bytes)
437+
MultipartCopyPartSize,
426438
}
427439

428440
impl AsRef<str> for AmazonS3ConfigKey {
@@ -455,6 +467,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
455467
Self::RequestPayer => "aws_request_payer",
456468
Self::Client(opt) => opt.as_ref(),
457469
Self::Encryption(opt) => opt.as_ref(),
470+
Self::MultipartCopyThreshold => "aws_multipart_copy_threshold",
471+
Self::MultipartCopyPartSize => "aws_multipart_copy_part_size",
458472
}
459473
}
460474
}
@@ -499,6 +513,12 @@ impl FromStr for AmazonS3ConfigKey {
499513
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
500514
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
501515
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
516+
"aws_multipart_copy_threshold" | "multipart_copy_threshold" => {
517+
Ok(Self::MultipartCopyThreshold)
518+
}
519+
"aws_multipart_copy_part_size" | "multipart_copy_part_size" => {
520+
Ok(Self::MultipartCopyPartSize)
521+
}
502522
// Backwards compatibility
503523
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
504524
"aws_server_side_encryption" | "server_side_encryption" => Ok(Self::Encryption(
@@ -666,6 +686,12 @@ impl AmazonS3Builder {
666686
self.encryption_customer_key_base64 = Some(value.into())
667687
}
668688
},
689+
AmazonS3ConfigKey::MultipartCopyThreshold => {
690+
self.multipart_copy_threshold = Some(ConfigValue::Deferred(value.into()))
691+
}
692+
AmazonS3ConfigKey::MultipartCopyPartSize => {
693+
self.multipart_copy_part_size = Some(ConfigValue::Deferred(value.into()))
694+
}
669695
};
670696
self
671697
}
@@ -733,6 +759,14 @@ impl AmazonS3Builder {
733759
self.encryption_customer_key_base64.clone()
734760
}
735761
},
762+
AmazonS3ConfigKey::MultipartCopyThreshold => self
763+
.multipart_copy_threshold
764+
.as_ref()
765+
.map(|x| x.to_string()),
766+
AmazonS3ConfigKey::MultipartCopyPartSize => self
767+
.multipart_copy_part_size
768+
.as_ref()
769+
.map(|x| x.to_string()),
736770
}
737771
}
738772

@@ -1029,6 +1063,18 @@ impl AmazonS3Builder {
10291063
self
10301064
}
10311065

1066+
/// Set threshold (bytes) above which copy uses multipart copy
1067+
pub fn with_multipart_copy_threshold(mut self, threshold_bytes: u64) -> Self {
1068+
self.multipart_copy_threshold = Some(ConfigValue::Parsed(threshold_bytes));
1069+
self
1070+
}
1071+
1072+
/// Set preferred multipart copy part size (bytes)
1073+
pub fn with_multipart_copy_part_size(mut self, part_size_bytes: u64) -> Self {
1074+
self.multipart_copy_part_size = Some(ConfigValue::Parsed(part_size_bytes));
1075+
self
1076+
}
1077+
10321078
/// Create a [`AmazonS3`] instance from the provided values,
10331079
/// consuming `self`.
10341080
pub fn build(mut self) -> Result<AmazonS3> {
@@ -1185,6 +1231,17 @@ impl AmazonS3Builder {
11851231
S3EncryptionHeaders::default()
11861232
};
11871233

1234+
let multipart_copy_threshold = self
1235+
.multipart_copy_threshold
1236+
.map(|val| val.get())
1237+
.transpose()?
1238+
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
1239+
let multipart_copy_part_size = self
1240+
.multipart_copy_part_size
1241+
.map(|val| val.get())
1242+
.transpose()?
1243+
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
1244+
11881245
let config = S3Config {
11891246
region,
11901247
bucket,
@@ -1201,6 +1258,8 @@ impl AmazonS3Builder {
12011258
conditional_put: self.conditional_put.get()?,
12021259
encryption_headers,
12031260
request_payer: self.request_payer.get()?,
1261+
multipart_copy_threshold,
1262+
multipart_copy_part_size,
12041263
};
12051264

12061265
let http_client = http.connect(&config.client_options)?;

src/aws/client.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl From<Error> for crate::Error {
138138
pub(crate) enum PutPartPayload<'a> {
139139
Part(PutPayload),
140140
Copy(&'a Path),
141+
CopyRange(&'a Path, std::ops::Range<u64>),
141142
}
142143

143144
impl Default for PutPartPayload<'_> {
@@ -207,6 +208,10 @@ pub(crate) struct S3Config {
207208
pub conditional_put: S3ConditionalPut,
208209
pub request_payer: bool,
209210
pub(super) encryption_headers: S3EncryptionHeaders,
211+
/// Threshold in bytes above which copy will use multipart copy
212+
pub multipart_copy_threshold: u64,
213+
/// Preferred multipart copy part size in bytes (None => auto)
214+
pub multipart_copy_part_size: u64,
210215
}
211216

212217
impl S3Config {
@@ -676,7 +681,10 @@ impl S3Client {
676681
part_idx: usize,
677682
data: PutPartPayload<'_>,
678683
) -> Result<PartId> {
679-
let is_copy = matches!(data, PutPartPayload::Copy(_));
684+
let is_copy = matches!(
685+
data,
686+
PutPartPayload::Copy(_) | PutPartPayload::CopyRange(_, _)
687+
);
680688
let part = (part_idx + 1).to_string();
681689

682690
let mut request = self
@@ -690,6 +698,18 @@ impl S3Client {
690698
"x-amz-copy-source",
691699
&format!("{}/{}", self.config.bucket, encode_path(path)),
692700
),
701+
PutPartPayload::CopyRange(path, range) => {
702+
// AWS expects inclusive end for copy range header
703+
let start = range.start;
704+
let end_inclusive = range.end.saturating_sub(1);
705+
let range_value = format!("bytes={}-{}", start, end_inclusive);
706+
request
707+
.header(
708+
"x-amz-copy-source",
709+
&format!("{}/{}", self.config.bucket, encode_path(path)),
710+
)
711+
.header("x-amz-copy-source-range", &range_value)
712+
}
693713
};
694714

695715
if self
@@ -995,6 +1015,8 @@ mod tests {
9951015
conditional_put: Default::default(),
9961016
encryption_headers: Default::default(),
9971017
request_payer: false,
1018+
multipart_copy_threshold: 5 * 1024 * 1024 * 1024,
1019+
multipart_copy_part_size: 5 * 1024 * 1024 * 1024,
9981020
};
9991021

10001022
let client = S3Client::new(config, HttpClient::new(reqwest::Client::new()));

src/aws/mod.rs

Lines changed: 81 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,56 @@ impl AmazonS3 {
101101
fn path_url(&self, path: &Path) -> String {
102102
self.client.config.path_url(path)
103103
}
104+
105+
/// Perform a multipart copy operation
106+
async fn copy_multipart(
107+
&self,
108+
from: &Path,
109+
to: &Path,
110+
size: u64,
111+
mode: CompleteMultipartMode,
112+
) -> Result<()> {
113+
// Perform multipart copy using UploadPartCopy
114+
let upload_id = self
115+
.client
116+
.create_multipart(to, PutMultipartOptions::default())
117+
.await?;
118+
119+
// S3 requires minimum 5 MiB per part (except final) and max 10,000 parts
120+
let part_size = self.client.config.multipart_copy_part_size;
121+
122+
let mut parts = Vec::new();
123+
let mut offset: u64 = 0;
124+
let mut idx: usize = 0;
125+
let res = async {
126+
while offset < size {
127+
let end = std::cmp::min(offset + part_size, size);
128+
let payload = if offset == 0 && end == size {
129+
PutPartPayload::Copy(from)
130+
} else {
131+
PutPartPayload::CopyRange(from, offset..end)
132+
};
133+
let part = self.client.put_part(to, &upload_id, idx, payload).await?;
134+
parts.push(part);
135+
idx += 1;
136+
offset = end;
137+
}
138+
self.client
139+
.complete_multipart(to, &upload_id, parts, mode)
140+
.await
141+
.map(|_| ())
142+
}
143+
.await;
144+
145+
// If the multipart upload failed, make a best effort attempt to
146+
// clean it up. It's the caller's responsibility to add a
147+
// lifecycle rule if guaranteed cleanup is required, as we
148+
// cannot protect against an ill-timed process crash.
149+
if res.is_err() {
150+
let _ = self.client.abort_multipart(to, &upload_id).await;
151+
}
152+
res
153+
}
104154
}
105155

106156
#[async_trait]
@@ -310,14 +360,31 @@ impl ObjectStore for AmazonS3 {
310360
mode,
311361
extensions: _,
312362
} = options;
363+
// Determine source size to decide between single CopyObject and multipart copy
364+
let head_meta = self
365+
.client
366+
.get_opts(
367+
from,
368+
GetOptions {
369+
head: true,
370+
..Default::default()
371+
},
372+
)
373+
.await?
374+
.meta;
313375

314376
match mode {
315377
CopyMode::Overwrite => {
316-
self.client
317-
.copy_request(from, to)
318-
.idempotent(true)
319-
.send()
320-
.await?;
378+
if head_meta.size <= self.client.config.multipart_copy_threshold {
379+
self.client
380+
.copy_request(from, to)
381+
.idempotent(true)
382+
.send()
383+
.await?;
384+
} else {
385+
self.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Overwrite)
386+
.await?;
387+
}
321388
Ok(())
322389
}
323390
CopyMode::Create => {
@@ -327,45 +394,16 @@ impl ObjectStore for AmazonS3 {
327394
}
328395
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
329396
Some(S3CopyIfNotExists::Multipart) => {
330-
let upload_id = self
331-
.client
332-
.create_multipart(to, PutMultipartOptions::default())
333-
.await?;
334-
335-
let res = async {
336-
let part_id = self
337-
.client
338-
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
339-
.await?;
340-
match self
341-
.client
342-
.complete_multipart(
343-
to,
344-
&upload_id,
345-
vec![part_id],
346-
CompleteMultipartMode::Create,
347-
)
348-
.await
349-
{
350-
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
397+
return self
398+
.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Create)
399+
.await
400+
.map_err(|err| match err {
401+
Error::Precondition { .. } => Error::AlreadyExists {
351402
path: to.to_string(),
352-
source: Box::new(e),
353-
}),
354-
Ok(_) => Ok(()),
355-
Err(e) => Err(e),
356-
}
357-
}
358-
.await;
359-
360-
// If the multipart upload failed, make a best effort attempt to
361-
// clean it up. It's the caller's responsibility to add a
362-
// lifecycle rule if guaranteed cleanup is required, as we
363-
// cannot protect against an ill-timed process crash.
364-
if res.is_err() {
365-
let _ = self.client.abort_multipart(to, &upload_id).await;
366-
}
367-
368-
return res;
403+
source: Box::new(err),
404+
},
405+
other => other,
406+
});
369407
}
370408
None => {
371409
return Err(Error::NotSupported {

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,15 @@ impl Parse for u32 {
112112
}
113113
}
114114

115+
impl Parse for u64 {
116+
fn parse(v: &str) -> Result<Self> {
117+
Self::from_str(v).map_err(|_| Error::Generic {
118+
store: "Config",
119+
source: format!("failed to parse \"{v}\" as u64").into(),
120+
})
121+
}
122+
}
123+
115124
impl Parse for HeaderValue {
116125
fn parse(v: &str) -> Result<Self> {
117126
Self::from_str(v).map_err(|_| Error::Generic {

0 commit comments

Comments
 (0)