Skip to content

Commit f4a52c8

Browse files
TingDaoKgraebm
andauthored
Support request level part size configuration (#539)
- Add support for request level override for part size and upload threshold - Add support for on_done callback with the did_validate checksum and the checksum algorithm. - Submodules update for several S3 related improvements: - Fix Get-Object with partNumber - Skip HeaderRequest for ranged get - Cancel/Pause s3 request now faster (will cancel the ongoing HTTP requests now) Co-authored-by: Michael Graeb <graebm@amazon.com>
1 parent 4e02ae7 commit f4a52c8

File tree

9 files changed

+130
-18
lines changed

9 files changed

+130
-18
lines changed

awscrt/s3.py

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,14 @@ class S3Client(NativeResource):
182182
for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED`
183183
184184
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
185-
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5 MiB.
186-
(8*1024*1024 by default)
185+
Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits.
186+
(max number of parts per upload is 10,000, minimum upload part size is 5 MiB)
187187
188188
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
189+
This only affects :attr:`S3RequestType.PUT_OBJECT` request.
189190
Uploads over this size will use the multipart upload strategy.
190191
Uploads this size or less will use a single request.
191-
If not set, `part_size` is used as the threshold.
192+
If not set, maximal of `part_size` and 5 MiB will be used.
192193
193194
throughput_target_gbps (Optional[float]): Throughput target in
194195
Gigabits per second (Gbps) that we are trying to reach.
@@ -296,6 +297,8 @@ def make_request(
296297
signing_config=None,
297298
credential_provider=None,
298299
checksum_config=None,
300+
part_size=None,
301+
multipart_upload_threshold=None,
299302
on_headers=None,
300303
on_body=None,
301304
on_done=None,
@@ -347,6 +350,20 @@ def make_request(
347350
348351
checksum_config (Optional[S3ChecksumConfig]): Optional checksum settings.
349352
353+
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
354+
If not set, the part size configured for the client will be used.
355+
Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits.
356+
(max number of parts per upload is 10,000, minimum upload part size is 5 MiB)
357+
358+
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
359+
This only affects :attr:`S3RequestType.PUT_OBJECT` request.
360+
Uploads over this size will use the multipart upload strategy.
361+
Uploads this size or less will use a single request.
362+
If set, this should be at least `part_size`.
363+
If not set, `part_size` adjusted by client will be used as the threshold.
364+
If both `part_size` and `multipart_upload_threshold` are not set,
365+
the values from `aws_s3_client_config` are used.
366+
350367
on_headers: Optional callback invoked as the response received, and even the API request
351368
has been split into multiple parts, this callback will only be invoked once as
352369
it's just making one API request to S3.
@@ -401,6 +418,13 @@ def make_request(
401418
this is the final response's status code. If the operation
402419
failed for another reason, None is returned.
403420
421+
* `did_validate_checksum` (bool):
422+
Was the server side checksum compared against a calculated checksum of the response body.
423+
This may be false even if :attr:`S3ChecksumConfig.validate_response` was set because
424+
the object was uploaded without a checksum, or downloaded differently from how it's uploaded.
425+
426+
* `checksum_validation_algorithm` (Optional[S3ChecksumAlgorithm]): The checksum algorithm used to validate the response.
427+
404428
* `**kwargs` (dict): Forward-compatibility kwargs.
405429
406430
on_progress: Optional callback invoked when part of the transfer is done to report the progress.
@@ -423,6 +447,8 @@ def make_request(
423447
signing_config=signing_config,
424448
credential_provider=credential_provider,
425449
checksum_config=checksum_config,
450+
part_size=part_size,
451+
multipart_upload_threshold=multipart_upload_threshold,
426452
on_headers=on_headers,
427453
on_body=on_body,
428454
on_done=on_done,
@@ -458,6 +484,8 @@ def __init__(
458484
signing_config=None,
459485
credential_provider=None,
460486
checksum_config=None,
487+
part_size=None,
488+
multipart_upload_threshold=None,
461489
on_headers=None,
462490
on_body=None,
463491
on_done=None,
@@ -468,14 +496,21 @@ def __init__(
468496
assert callable(on_headers) or on_headers is None
469497
assert callable(on_body) or on_body is None
470498
assert callable(on_done) or on_done is None
499+
assert isinstance(part_size, int) or part_size is None
500+
assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None
471501

472502
super().__init__()
473503

474504
self._finished_future = Future()
475505
self.shutdown_event = threading.Event()
476506

477-
checksum_algorithm = 0 # 0 means NONE in C
478-
checksum_location = 0 # 0 means NONE in C
507+
# C layer uses 0 to indicate defaults
508+
if part_size is None:
509+
part_size = 0
510+
if multipart_upload_threshold is None:
511+
multipart_upload_threshold = 0
512+
checksum_algorithm = 0
513+
checksum_location = 0
479514
validate_response_checksum = False
480515
if checksum_config is not None:
481516
if checksum_config.algorithm is not None:
@@ -509,6 +544,8 @@ def __init__(
509544
checksum_algorithm,
510545
checksum_location,
511546
validate_response_checksum,
547+
part_size,
548+
multipart_upload_threshold,
512549
s3_request_core)
513550

514551
@property
@@ -623,15 +660,22 @@ def _on_body(self, chunk, offset):
623660
def _on_shutdown(self):
624661
self._shutdown_event.set()
625662

626-
def _on_finish(self, error_code, status_code, error_headers, error_body, error_operation_name):
663+
def _on_finish(
664+
self,
665+
error_code,
666+
status_code,
667+
error_headers,
668+
error_body,
669+
error_operation_name,
670+
did_validate_checksum,
671+
checksum_validation_algorithm):
627672
# If C layer gives status_code 0, that means "unknown"
628673
if status_code == 0:
629674
status_code = None
630675

631676
error = None
632677
if error_code:
633678
error = awscrt.exceptions.from_code(error_code)
634-
635679
if isinstance(error, awscrt.exceptions.AwsCrtError):
636680
if (error.name == "AWS_ERROR_CRT_CALLBACK_EXCEPTION"
637681
and self._python_callback_exception is not None):
@@ -651,13 +695,21 @@ def _on_finish(self, error_code, status_code, error_headers, error_body, error_o
651695
self._finished_future.set_exception(error)
652696
else:
653697
self._finished_future.set_result(None)
698+
699+
if checksum_validation_algorithm:
700+
checksum_validation_algorithm = S3ChecksumAlgorithm(checksum_validation_algorithm)
701+
else:
702+
checksum_validation_algorithm = None
703+
654704
if self._on_done_cb:
655705
self._on_done_cb(
656706
error=error,
657707
error_headers=error_headers,
658708
error_body=error_body,
659709
error_operation_name=error_operation_name,
660-
status_code=status_code)
710+
status_code=status_code,
711+
did_validate_checksum=did_validate_checksum,
712+
checksum_validation_algorithm=checksum_validation_algorithm)
661713

662714
def _on_progress(self, progress):
663715
if self._on_progress_cb:

crt/aws-c-common

crt/aws-lc

crt/s2n

Submodule s2n updated from 95753f0 to a9a07a2

source/s3_meta_request.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,15 @@ static void s_s3_request_on_finish(
263263
result = PyObject_CallMethod(
264264
request_binding->py_core,
265265
"_on_finish",
266-
"(iiOy#s)",
266+
"(iiOy#sOi)",
267267
error_code,
268268
meta_request_result->response_status,
269269
header_list ? header_list : Py_None,
270270
(const char *)(error_body.buffer),
271271
(Py_ssize_t)error_body.len,
272-
operation_name);
272+
operation_name,
273+
meta_request_result->did_validate ? Py_True : Py_False,
274+
(int)meta_request_result->validation_algorithm);
273275

274276
if (result) {
275277
Py_DECREF(result);
@@ -372,10 +374,12 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
372374
enum aws_s3_checksum_algorithm checksum_algorithm; /* i */
373375
enum aws_s3_checksum_location checksum_location; /* i */
374376
int validate_response_checksum; /* p - boolean predicate */
377+
uint64_t part_size; /* K */
378+
uint64_t multipart_upload_threshold; /* K */
375379
PyObject *py_core; /* O */
376380
if (!PyArg_ParseTuple(
377381
args,
378-
"OOOizOOzzs#iipO",
382+
"OOOizOOzzs#iipKKO",
379383
&py_s3_request,
380384
&s3_client_py,
381385
&http_request_py,
@@ -390,6 +394,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
390394
&checksum_algorithm,
391395
&checksum_location,
392396
&validate_response_checksum,
397+
&part_size,
398+
&multipart_upload_threshold,
393399
&py_core)) {
394400
return NULL;
395401
}
@@ -470,6 +476,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
470476
.finish_callback = s_s3_request_on_finish,
471477
.shutdown_callback = s_s3_request_on_shutdown,
472478
.progress_callback = s_s3_request_on_progress,
479+
.part_size = part_size,
480+
.multipart_upload_threshold = multipart_upload_threshold,
473481
.user_data = meta_request,
474482
};
475483

test/test_s3.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ def setUp(self):
260260
self.done_error_headers = None
261261
self.done_error_body = None
262262
self.done_error_operation_name = None
263+
self.done_did_validate_checksum = None
264+
self.done_checksum_validation_algorithm = None
263265

264266
self.files = FileCreator()
265267
self.temp_put_obj_file_path = self.files.create_file_with_size("temp_put_obj_10mb", 10 * MB)
@@ -305,12 +307,23 @@ def _on_request_headers(self, status_code, headers, **kargs):
305307
def _on_request_body(self, chunk, offset, **kargs):
306308
self.received_body_len = self.received_body_len + len(chunk)
307309

308-
def _on_request_done(self, error, error_headers, error_body, error_operation_name, status_code, **kwargs):
310+
def _on_request_done(
311+
self,
312+
error,
313+
error_headers,
314+
error_body,
315+
error_operation_name,
316+
status_code,
317+
did_validate_checksum,
318+
checksum_validation_algorithm,
319+
**kwargs):
309320
self.done_error = error
310321
self.done_error_headers = error_headers
311322
self.done_error_body = error_body
312323
self.done_error_operation_name = error_operation_name
313324
self.done_status_code = status_code
325+
self.done_did_validate_checksum = did_validate_checksum
326+
self.done_checksum_validation_algorithm = checksum_validation_algorithm
314327

315328
def _on_progress(self, progress):
316329
self.transferred_len += progress
@@ -323,6 +336,11 @@ def _validate_successful_response(self, is_put_object):
323336
self.assertIsNone(self.done_error_headers)
324337
self.assertIsNone(self.done_error_body)
325338
self.assertIsNone(self.done_error_operation_name)
339+
self.assertIsInstance(self.done_did_validate_checksum, bool)
340+
if self.done_did_validate_checksum:
341+
self.assertIsInstance(self.done_checksum_validation_algorithm, S3ChecksumAlgorithm)
342+
else:
343+
self.assertIsNone(self.done_checksum_validation_algorithm)
326344
headers = HttpHeaders(self.response_headers)
327345
self.assertIsNone(headers.get("Content-Range"))
328346
body_length = headers.get("Content-Length")
@@ -452,6 +470,38 @@ def test_put_object_multiple_times(self):
452470
del s3_client
453471
self.assertTrue(client_shutdown_event.wait(self.timeout))
454472

473+
def test_put_object_request_override_part_size(self):
474+
s3_client = s3_client_new(False, self.region, 5 * MB)
475+
476+
tempfile = self.files.create_file_with_size("temp_file_override", 10 * MB)
477+
path = "/put_object_test_py_10MB_override.txt"
478+
content_length = os.stat(tempfile).st_size
479+
request = self._put_object_request(None, content_length, path=path)
480+
# Override the threshold to 10 MB, which will result in a single part upload
481+
s3_request = s3_client.make_request(
482+
request=request,
483+
type=S3RequestType.PUT_OBJECT,
484+
send_filepath=tempfile,
485+
on_headers=self._on_request_headers,
486+
on_body=self._on_request_body,
487+
on_done=self._on_request_done,
488+
multipart_upload_threshold=10 * MB)
489+
try:
490+
s3_request.finished_future.result(self.timeout)
491+
except Exception as e:
492+
# failed
493+
self.assertTrue(False)
494+
495+
# Etag headers for a MPU will be formatted with `-[part number]`
496+
etag = HttpHeaders(self.response_headers).get("Etag")
497+
# make sure we uploaded as single part as we override the threshold
498+
self.assertFalse("-" in etag)
499+
500+
del s3_request
501+
client_shutdown_event = s3_client.shutdown_event
502+
del s3_client
503+
self.assertTrue(client_shutdown_event.wait(self.timeout))
504+
455505
def test_get_object_filepath(self):
456506
request = self._get_object_request(self.get_test_object_path)
457507
request_type = S3RequestType.GET_OBJECT
@@ -559,6 +609,8 @@ def test_put_get_with_checksum(self):
559609
download_checksum_config = S3ChecksumConfig(validate_response=True)
560610
self._test_s3_put_get_object(download_request, S3RequestType.GET_OBJECT,
561611
checksum_config=download_checksum_config)
612+
self.assertTrue(self.done_did_validate_checksum)
613+
self.assertEqual(self.done_checksum_validation_algorithm, S3ChecksumAlgorithm.CRC32)
562614
self.assertEqual(HttpHeaders(self.response_headers).get('x-amz-checksum-crc32'),
563615
crc32_base64_str)
564616

0 commit comments

Comments
 (0)