Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,43 @@ class S3ChecksumConfig:
"""Whether to retrieve and validate response checksums."""


@dataclass
class S3FileIoOptions:
"""
Controls how client performance file I/O operations.
Only applies to the file based workload.
Notes: only applies when `send_filepath` is set from the request.
"""

should_stream: bool = False
"""
Skip buffering the part in memory before sending the request.
If set, set the `disk_throughput` to be reasonable align with the available disk throughput.
Otherwise, the transfer may fail with connection starvation.
"""

disk_throughput_gbps: (Optional[float]) = 0.0
"""
The estimated disk throughput. Only be applied when `streaming_upload` is true.
in gigabits per second (Gbps).
When doing upload with streaming, it's important to set the disk throughput to prevent the connection starvation.

Notes: There are possibilities that cannot reach the all available disk throughput:
1. Disk is busy with other applications
2. OS Cache may cap the throughput, use `direct_io` to get around this.
"""

direct_io: bool = False
"""
Enable direct IO to bypass the OS cache. Helpful when the disk I/O outperforms the kernel cache.
Notes:
- Only supported on linux for now.
- Only supports upload for now.
- Check NOTES for O_DIRECT for additional info https://man7.org/linux/man-pages/man2/openat.2.html
In summary, O_DIRECT is a potentially powerful tool that should be used with caution.
"""


class S3Client(NativeResource):
"""S3 client

Expand Down Expand Up @@ -217,6 +254,11 @@ class S3Client(NativeResource):
This option is only supported on Linux, MacOS, and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It
is not supported on Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms. On
Linux, SO_BINDTODEVICE is used and requires kernel version >= 5.7 or root privileges.

fio_options: (Optional[S3FileIoOptions])
If set, this controls how the client interact with file I/O.
If not set, a default options will be created to avoid memory issue based on the size of file.
Note: Only applies when the request created with `send_filepath`
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -235,7 +277,8 @@ def __init__(
throughput_target_gbps=None,
enable_s3express=False,
memory_limit=None,
network_interface_names: Optional[Sequence[str]] = None):
network_interface_names: Optional[Sequence[str]] = None,
fio_options: Optional['S3FileIoOptions'] = None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
assert isinstance(signing_config, AwsSigningConfig) or signing_config is None
Expand All @@ -249,6 +292,7 @@ def __init__(
float) or throughput_target_gbps is None
assert isinstance(enable_s3express, bool) or enable_s3express is None
assert isinstance(network_interface_names, Sequence) or network_interface_names is None
assert isinstance(fio_options, S3FileIoOptions) or fio_options is None

if credential_provider and signing_config:
raise ValueError("'credential_provider' has been deprecated in favor of 'signing_config'. "
Expand Down Expand Up @@ -288,6 +332,15 @@ def on_shutdown():
# ensure this is a list, so it's simpler to process in C
if not isinstance(network_interface_names, list):
network_interface_names = list(network_interface_names)
fio_options_set = False
should_stream = False
disk_throughput_gbps = 0.0
direct_io = False
if fio_options is not None:
fio_options_set = True
should_stream = fio_options.should_stream
disk_throughput_gbps = fio_options.disk_throughput_gbps
direct_io = fio_options.direct_io

self._binding = _awscrt.s3_client_new(
bootstrap,
Expand All @@ -303,6 +356,10 @@ def on_shutdown():
enable_s3express,
memory_limit,
network_interface_names,
fio_options_set,
should_stream,
disk_throughput_gbps,
direct_io,
s3_client_core)

def make_request(
Expand All @@ -318,6 +375,7 @@ def make_request(
checksum_config=None,
part_size=None,
multipart_upload_threshold=None,
fio_options=None,
on_headers=None,
on_body=None,
on_done=None,
Expand Down Expand Up @@ -398,6 +456,11 @@ def make_request(
If both `part_size` and `multipart_upload_threshold` are not set,
the values from `aws_s3_client_config` are used.

fio_options: (Optional[S3FileIoOptions])
If set, this overrides the client fio_options to control how this request interact with file I/O.
If not set, a default options will be created to avoid memory issue based on the size of file.
Note: Only applies when the request created with `send_filepath`

on_headers: Optional callback invoked as the response received, and even the API request
has been split into multiple parts, this callback will only be invoked once as
it's just making one API request to S3.
Expand Down Expand Up @@ -483,6 +546,7 @@ def make_request(
checksum_config=checksum_config,
part_size=part_size,
multipart_upload_threshold=multipart_upload_threshold,
fio_options=fio_options,
on_headers=on_headers,
on_body=on_body,
on_done=on_done,
Expand Down Expand Up @@ -520,6 +584,7 @@ def __init__(
checksum_config=None,
part_size=None,
multipart_upload_threshold=None,
fio_options=None,
on_headers=None,
on_body=None,
on_done=None,
Expand All @@ -532,6 +597,7 @@ def __init__(
assert callable(on_done) or on_done is None
assert isinstance(part_size, int) or part_size is None
assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None
assert isinstance(fio_options, S3FileIoOptions) or fio_options is None

if type == S3RequestType.DEFAULT and not operation_name:
raise ValueError("'operation_name' must be set when using S3RequestType.DEFAULT")
Expand All @@ -555,6 +621,15 @@ def __init__(
if checksum_config.location is not None:
checksum_location = checksum_config.location.value
validate_response_checksum = checksum_config.validate_response
fio_options_set = False
should_stream = False
disk_throughput_gbps = 0.0
direct_io = False
if fio_options is not None:
fio_options_set = True
should_stream = fio_options.should_stream
disk_throughput_gbps = fio_options.disk_throughput_gbps
direct_io = fio_options.direct_io

s3_request_core = _S3RequestCore(
request,
Expand Down Expand Up @@ -583,6 +658,10 @@ def __init__(
validate_response_checksum,
part_size,
multipart_upload_threshold,
fio_options_set,
should_stream,
disk_throughput_gbps,
direct_io,
s3_request_core)

@property
Expand Down
17 changes: 16 additions & 1 deletion source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,15 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *network_interface_names_py; /* O */
int fio_options_set; /* p - boolean predicate */
int should_stream; /* p - boolean predicate */
double disk_throughput_gbps; /* d */
int direct_io; /* p - boolean predicate */
PyObject *py_core; /* O */

if (!PyArg_ParseTuple(
args,
"OOOOOs#iKKdpKOO",
"OOOOOs#iKKdpKOppdpO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -277,6 +281,10 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&enable_s3express,
&mem_limit,
&network_interface_names_py,
&fio_options_set,
&should_stream,
&disk_throughput_gbps,
&direct_io,
&py_core)) {
return NULL;
}
Expand Down Expand Up @@ -366,6 +374,11 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
}
}
}
struct aws_s3_file_io_options fio_opts = {
.should_stream = should_stream,
.disk_throughput_gbps = disk_throughput_gbps,
.direct_io = direct_io,
};

struct aws_s3_client_config s3_config = {
.region = region,
Expand All @@ -382,6 +395,8 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.enable_s3express = enable_s3express,
.network_interface_names_array = network_interface_names,
.num_network_interface_names = num_network_interface_names,
/* If fio options not set, let native code to decide the default instead */
.fio_opts = fio_options_set ? &fio_opts : NULL,
};

s3_client->native = aws_s3_client_new(allocator, &s3_config);
Expand Down
17 changes: 16 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,14 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
int validate_response_checksum; /* p - boolean predicate */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
int fio_options_set; /* p - boolean predicate */
int should_stream; /* p - boolean predicate */
double disk_throughput_gbps; /* d */
int direct_io; /* p - boolean predicate */
PyObject *py_core; /* O */
if (!PyArg_ParseTuple(
args,
"OOOizOOzzs#iipKKO",
"OOOizOOzzs#iipKKppdpO",
&py_s3_request,
&s3_client_py,
&http_request_py,
Expand All @@ -352,6 +356,10 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
&validate_response_checksum,
&part_size,
&multipart_upload_threshold,
&fio_options_set,
&should_stream,
&disk_throughput_gbps,
&direct_io,
&py_core)) {
return NULL;
}
Expand Down Expand Up @@ -407,6 +415,11 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
aws_mem_release(allocator, meta_request);
return NULL;
}
struct aws_s3_file_io_options fio_opts = {
.should_stream = should_stream,
.disk_throughput_gbps = disk_throughput_gbps,
.direct_io = direct_io,
};

meta_request->py_core = py_core;
Py_INCREF(meta_request->py_core);
Expand All @@ -426,6 +439,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
.progress_callback = s_s3_request_on_progress,
.part_size = part_size,
.multipart_upload_threshold = multipart_upload_threshold,
/* If fio options not set, let native code to decide the default instead */
.fio_opts = fio_options_set ? &fio_opts : NULL,
.user_data = meta_request,
};

Expand Down
11 changes: 11 additions & 0 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
S3RequestType,
S3ResponseError,
CrossProcessLock,
S3FileIoOptions,
create_default_s3_signing_config,
get_optimized_platforms,
)
Expand Down Expand Up @@ -590,6 +591,16 @@ def test_put_object_filepath(self):
request = self._put_object_request(None, content_length)
self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT, send_filepath=self.temp_put_obj_file_path)

def test_put_object_filepath_with_fio_options(self):
content_length = os.stat(self.temp_put_obj_file_path).st_size
request = self._put_object_request(None, content_length)
fio_options = S3FileIoOptions(should_stream=True, disk_throughput_gbps=10.0, direct_io=True)
self._test_s3_put_get_object(
request,
S3RequestType.PUT_OBJECT,
send_filepath=self.temp_put_obj_file_path,
fio_options=fio_options)

def test_put_object_filepath_unknown_content_length(self):
content_length = os.stat(self.temp_put_obj_file_path).st_size
request = self._put_object_request(None, content_length, unknown_content_length=True)
Expand Down