Skip to content

Commit 4ec3ba4

Browse files
authored
Fio options binding (#680)
1 parent 74c09ab commit 4ec3ba4

File tree

4 files changed

+123
-3
lines changed

4 files changed

+123
-3
lines changed

awscrt/s3.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,43 @@ class S3ChecksumConfig:
142142
"""Whether to retrieve and validate response checksums."""
143143

144144

145+
@dataclass
146+
class S3FileIoOptions:
147+
"""
148+
Controls how client performance file I/O operations.
149+
Only applies to the file based workload.
150+
Notes: only applies when `send_filepath` is set from the request.
151+
"""
152+
153+
should_stream: bool = False
154+
"""
155+
Skip buffering the part in memory before sending the request.
156+
If set, set the `disk_throughput` to be reasonable align with the available disk throughput.
157+
Otherwise, the transfer may fail with connection starvation.
158+
"""
159+
160+
disk_throughput_gbps: (Optional[float]) = 0.0
161+
"""
162+
The estimated disk throughput. Only be applied when `streaming_upload` is true.
163+
in gigabits per second (Gbps).
164+
When doing upload with streaming, it's important to set the disk throughput to prevent the connection starvation.
165+
166+
Notes: There are possibilities that cannot reach the all available disk throughput:
167+
1. Disk is busy with other applications
168+
2. OS Cache may cap the throughput, use `direct_io` to get around this.
169+
"""
170+
171+
direct_io: bool = False
172+
"""
173+
Enable direct IO to bypass the OS cache. Helpful when the disk I/O outperforms the kernel cache.
174+
Notes:
175+
- Only supported on linux for now.
176+
- Only supports upload for now.
177+
- Check NOTES for O_DIRECT for additional info https://man7.org/linux/man-pages/man2/openat.2.html
178+
In summary, O_DIRECT is a potentially powerful tool that should be used with caution.
179+
"""
180+
181+
145182
class S3Client(NativeResource):
146183
"""S3 client
147184
@@ -217,6 +254,11 @@ class S3Client(NativeResource):
217254
This option is only supported on Linux, MacOS, and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It
218255
is not supported on Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms. On
219256
Linux, SO_BINDTODEVICE is used and requires kernel version >= 5.7 or root privileges.
257+
258+
fio_options: (Optional[S3FileIoOptions])
259+
If set, this controls how the client interact with file I/O.
260+
If not set, a default options will be created to avoid memory issue based on the size of file.
261+
Note: Only applies when the request created with `send_filepath`
220262
"""
221263

222264
__slots__ = ('shutdown_event', '_region')
@@ -235,7 +277,8 @@ def __init__(
235277
throughput_target_gbps=None,
236278
enable_s3express=False,
237279
memory_limit=None,
238-
network_interface_names: Optional[Sequence[str]] = None):
280+
network_interface_names: Optional[Sequence[str]] = None,
281+
fio_options: Optional['S3FileIoOptions'] = None):
239282
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
240283
assert isinstance(region, str)
241284
assert isinstance(signing_config, AwsSigningConfig) or signing_config is None
@@ -249,6 +292,7 @@ def __init__(
249292
float) or throughput_target_gbps is None
250293
assert isinstance(enable_s3express, bool) or enable_s3express is None
251294
assert isinstance(network_interface_names, Sequence) or network_interface_names is None
295+
assert isinstance(fio_options, S3FileIoOptions) or fio_options is None
252296

253297
if credential_provider and signing_config:
254298
raise ValueError("'credential_provider' has been deprecated in favor of 'signing_config'. "
@@ -288,6 +332,15 @@ def on_shutdown():
288332
# ensure this is a list, so it's simpler to process in C
289333
if not isinstance(network_interface_names, list):
290334
network_interface_names = list(network_interface_names)
335+
fio_options_set = False
336+
should_stream = False
337+
disk_throughput_gbps = 0.0
338+
direct_io = False
339+
if fio_options is not None:
340+
fio_options_set = True
341+
should_stream = fio_options.should_stream
342+
disk_throughput_gbps = fio_options.disk_throughput_gbps
343+
direct_io = fio_options.direct_io
291344

292345
self._binding = _awscrt.s3_client_new(
293346
bootstrap,
@@ -303,6 +356,10 @@ def on_shutdown():
303356
enable_s3express,
304357
memory_limit,
305358
network_interface_names,
359+
fio_options_set,
360+
should_stream,
361+
disk_throughput_gbps,
362+
direct_io,
306363
s3_client_core)
307364

308365
def make_request(
@@ -318,6 +375,7 @@ def make_request(
318375
checksum_config=None,
319376
part_size=None,
320377
multipart_upload_threshold=None,
378+
fio_options=None,
321379
on_headers=None,
322380
on_body=None,
323381
on_done=None,
@@ -398,6 +456,11 @@ def make_request(
398456
If both `part_size` and `multipart_upload_threshold` are not set,
399457
the values from `aws_s3_client_config` are used.
400458
459+
fio_options: (Optional[S3FileIoOptions])
460+
If set, this overrides the client fio_options to control how this request interact with file I/O.
461+
If not set, a default options will be created to avoid memory issue based on the size of file.
462+
Note: Only applies when the request created with `send_filepath`
463+
401464
on_headers: Optional callback invoked as the response received, and even the API request
402465
has been split into multiple parts, this callback will only be invoked once as
403466
it's just making one API request to S3.
@@ -483,6 +546,7 @@ def make_request(
483546
checksum_config=checksum_config,
484547
part_size=part_size,
485548
multipart_upload_threshold=multipart_upload_threshold,
549+
fio_options=fio_options,
486550
on_headers=on_headers,
487551
on_body=on_body,
488552
on_done=on_done,
@@ -520,6 +584,7 @@ def __init__(
520584
checksum_config=None,
521585
part_size=None,
522586
multipart_upload_threshold=None,
587+
fio_options=None,
523588
on_headers=None,
524589
on_body=None,
525590
on_done=None,
@@ -532,6 +597,7 @@ def __init__(
532597
assert callable(on_done) or on_done is None
533598
assert isinstance(part_size, int) or part_size is None
534599
assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None
600+
assert isinstance(fio_options, S3FileIoOptions) or fio_options is None
535601

536602
if type == S3RequestType.DEFAULT and not operation_name:
537603
raise ValueError("'operation_name' must be set when using S3RequestType.DEFAULT")
@@ -555,6 +621,15 @@ def __init__(
555621
if checksum_config.location is not None:
556622
checksum_location = checksum_config.location.value
557623
validate_response_checksum = checksum_config.validate_response
624+
fio_options_set = False
625+
should_stream = False
626+
disk_throughput_gbps = 0.0
627+
direct_io = False
628+
if fio_options is not None:
629+
fio_options_set = True
630+
should_stream = fio_options.should_stream
631+
disk_throughput_gbps = fio_options.disk_throughput_gbps
632+
direct_io = fio_options.direct_io
558633

559634
s3_request_core = _S3RequestCore(
560635
request,
@@ -583,6 +658,10 @@ def __init__(
583658
validate_response_checksum,
584659
part_size,
585660
multipart_upload_threshold,
661+
fio_options_set,
662+
should_stream,
663+
disk_throughput_gbps,
664+
direct_io,
586665
s3_request_core)
587666

588667
@property

source/s3_client.c

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,15 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
258258
int enable_s3express; /* p */
259259
uint64_t mem_limit; /* K */
260260
PyObject *network_interface_names_py; /* O */
261+
int fio_options_set; /* p - boolean predicate */
262+
int should_stream; /* p - boolean predicate */
263+
double disk_throughput_gbps; /* d */
264+
int direct_io; /* p - boolean predicate */
261265
PyObject *py_core; /* O */
262266

263267
if (!PyArg_ParseTuple(
264268
args,
265-
"OOOOOs#iKKdpKOO",
269+
"OOOOOs#iKKdpKOppdpO",
266270
&bootstrap_py,
267271
&signing_config_py,
268272
&credential_provider_py,
@@ -277,6 +281,10 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
277281
&enable_s3express,
278282
&mem_limit,
279283
&network_interface_names_py,
284+
&fio_options_set,
285+
&should_stream,
286+
&disk_throughput_gbps,
287+
&direct_io,
280288
&py_core)) {
281289
return NULL;
282290
}
@@ -366,6 +374,11 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
366374
}
367375
}
368376
}
377+
struct aws_s3_file_io_options fio_opts = {
378+
.should_stream = should_stream,
379+
.disk_throughput_gbps = disk_throughput_gbps,
380+
.direct_io = direct_io,
381+
};
369382

370383
struct aws_s3_client_config s3_config = {
371384
.region = region,
@@ -382,6 +395,8 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
382395
.enable_s3express = enable_s3express,
383396
.network_interface_names_array = network_interface_names,
384397
.num_network_interface_names = num_network_interface_names,
398+
/* If fio options not set, let native code to decide the default instead */
399+
.fio_opts = fio_options_set ? &fio_opts : NULL,
385400
};
386401

387402
s3_client->native = aws_s3_client_new(allocator, &s3_config);

source/s3_meta_request.c

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,14 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
332332
int validate_response_checksum; /* p - boolean predicate */
333333
uint64_t part_size; /* K */
334334
uint64_t multipart_upload_threshold; /* K */
335+
int fio_options_set; /* p - boolean predicate */
336+
int should_stream; /* p - boolean predicate */
337+
double disk_throughput_gbps; /* d */
338+
int direct_io; /* p - boolean predicate */
335339
PyObject *py_core; /* O */
336340
if (!PyArg_ParseTuple(
337341
args,
338-
"OOOizOOzzs#iipKKO",
342+
"OOOizOOzzs#iipKKppdpO",
339343
&py_s3_request,
340344
&s3_client_py,
341345
&http_request_py,
@@ -352,6 +356,10 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
352356
&validate_response_checksum,
353357
&part_size,
354358
&multipart_upload_threshold,
359+
&fio_options_set,
360+
&should_stream,
361+
&disk_throughput_gbps,
362+
&direct_io,
355363
&py_core)) {
356364
return NULL;
357365
}
@@ -407,6 +415,11 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
407415
aws_mem_release(allocator, meta_request);
408416
return NULL;
409417
}
418+
struct aws_s3_file_io_options fio_opts = {
419+
.should_stream = should_stream,
420+
.disk_throughput_gbps = disk_throughput_gbps,
421+
.direct_io = direct_io,
422+
};
410423

411424
meta_request->py_core = py_core;
412425
Py_INCREF(meta_request->py_core);
@@ -426,6 +439,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
426439
.progress_callback = s_s3_request_on_progress,
427440
.part_size = part_size,
428441
.multipart_upload_threshold = multipart_upload_threshold,
442+
/* If fio options not set, let native code to decide the default instead */
443+
.fio_opts = fio_options_set ? &fio_opts : NULL,
429444
.user_data = meta_request,
430445
};
431446

test/test_s3.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
S3RequestType,
2626
S3ResponseError,
2727
CrossProcessLock,
28+
S3FileIoOptions,
2829
create_default_s3_signing_config,
2930
get_optimized_platforms,
3031
)
@@ -590,6 +591,16 @@ def test_put_object_filepath(self):
590591
request = self._put_object_request(None, content_length)
591592
self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT, send_filepath=self.temp_put_obj_file_path)
592593

594+
def test_put_object_filepath_with_fio_options(self):
595+
content_length = os.stat(self.temp_put_obj_file_path).st_size
596+
request = self._put_object_request(None, content_length)
597+
fio_options = S3FileIoOptions(should_stream=True, disk_throughput_gbps=10.0, direct_io=True)
598+
self._test_s3_put_get_object(
599+
request,
600+
S3RequestType.PUT_OBJECT,
601+
send_filepath=self.temp_put_obj_file_path,
602+
fio_options=fio_options)
603+
593604
def test_put_object_filepath_unknown_content_length(self):
594605
content_length = os.stat(self.temp_put_obj_file_path).st_size
595606
request = self._put_object_request(None, content_length, unknown_content_length=True)

0 commit comments

Comments
 (0)