Skip to content

Commit 67a6eaf

Browse files
authored
Update the binding to use features from aws-c-s3 (#509)
1 parent 203ba9e commit 67a6eaf

File tree

2 files changed

+88
-240
lines changed

2 files changed

+88
-240
lines changed

source/s3_meta_request.c

Lines changed: 21 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ struct s3_meta_request_binding {
3030
**/
3131
FILE *recv_file;
3232

33-
struct aws_http_message *copied_message;
34-
3533
/* Batch up the transferred size in one sec. */
3634
uint64_t size_transferred;
3735
/* The time stamp when the progress reported */
@@ -47,9 +45,6 @@ static void s_destroy(struct s3_meta_request_binding *meta_request) {
4745
if (meta_request->recv_file) {
4846
fclose(meta_request->recv_file);
4947
}
50-
if (meta_request->copied_message) {
51-
aws_http_message_release(meta_request->copied_message);
52-
}
5348
Py_XDECREF(meta_request->py_core);
5449
aws_mem_release(aws_py_get_allocator(), meta_request);
5550
}
@@ -122,6 +117,7 @@ static int s_s3_request_on_headers(
122117
}
123118
}
124119

120+
/* To avoid reporting progress to python too often. We cache it up and only report to python after at least 1 sec. */
125121
static int s_record_progress(struct s3_meta_request_binding *request_binding, uint64_t length, bool *report_progress) {
126122
if (aws_add_u64_checked(request_binding->size_transferred, length, &request_binding->size_transferred)) {
127123
/* Wow */
@@ -151,10 +147,6 @@ static int s_s3_request_on_body(
151147
(void)meta_request;
152148
struct s3_meta_request_binding *request_binding = user_data;
153149

154-
bool report_progress;
155-
if (s_record_progress(request_binding, (uint64_t)body->len, &report_progress)) {
156-
return AWS_OP_ERR;
157-
}
158150
if (request_binding->recv_file) {
159151
/* The callback will be invoked with the right order, so we don't need to seek first. */
160152
if (fwrite((void *)body->ptr, body->len, 1, request_binding->recv_file) < 1) {
@@ -168,9 +160,7 @@ static int s_s3_request_on_body(
168160
aws_error_name(aws_last_error()));
169161
return AWS_OP_ERR;
170162
}
171-
if (!report_progress) {
172-
return AWS_OP_SUCCESS;
173-
}
163+
return AWS_OP_SUCCESS;
174164
}
175165
bool error = true;
176166
/*************** GIL ACQUIRE ***************/
@@ -179,32 +169,15 @@ static int s_s3_request_on_body(
179169
if (aws_py_gilstate_ensure(&state)) {
180170
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
181171
}
182-
if (!request_binding->recv_file) {
183-
result = PyObject_CallMethod(
184-
request_binding->py_core,
185-
"_on_body",
186-
"(y#K)",
187-
(const char *)(body->ptr),
188-
(Py_ssize_t)body->len,
189-
range_start);
190172

191-
if (!result) {
192-
PyErr_WriteUnraisable(request_binding->py_core);
193-
goto done;
194-
}
195-
Py_DECREF(result);
196-
}
197-
if (report_progress) {
198-
/* Hold the GIL before enterring here */
199-
result =
200-
PyObject_CallMethod(request_binding->py_core, "_on_progress", "(K)", request_binding->size_transferred);
201-
if (!result) {
202-
PyErr_WriteUnraisable(request_binding->py_core);
203-
} else {
204-
Py_DECREF(result);
205-
}
206-
request_binding->size_transferred = 0;
173+
result = PyObject_CallMethod(
174+
request_binding->py_core, "_on_body", "(y#K)", (const char *)(body->ptr), (Py_ssize_t)body->len, range_start);
175+
176+
if (!result) {
177+
PyErr_WriteUnraisable(request_binding->py_core);
178+
goto done;
207179
}
180+
Py_DECREF(result);
208181
error = false;
209182
done:
210183
PyGILState_Release(state);
@@ -252,8 +225,6 @@ static void s_s3_request_on_finish(
252225
PyObject *header_list = NULL;
253226
PyObject *result = NULL;
254227

255-
request_binding->copied_message = aws_http_message_release(request_binding->copied_message);
256-
257228
if (request_binding->size_transferred && (error_code == 0)) {
258229
/* report the remaining progress */
259230
result =
@@ -343,39 +314,21 @@ static void s_s3_request_on_shutdown(void *user_data) {
343314
/*************** GIL RELEASE ***************/
344315
}
345316

346-
/*
347-
* file-based python input stream for reporting the progress
348-
*/
349-
struct aws_input_py_stream_file_impl {
350-
struct aws_input_stream base;
351-
struct aws_input_stream *actual_stream;
352-
struct s3_meta_request_binding *binding;
353-
};
354-
355-
static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
356-
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
357-
size_t pre_len = dest->len;
358-
359-
if (aws_input_stream_read(impl->actual_stream, dest)) {
360-
return AWS_OP_ERR;
361-
}
317+
static void s_s3_request_on_progress(
318+
struct aws_s3_meta_request *meta_request,
319+
const struct aws_s3_meta_request_progress *progress,
320+
void *user_data) {
362321

363-
size_t actually_read = 0;
364-
if (aws_sub_size_checked(dest->len, pre_len, &actually_read)) {
365-
return AWS_OP_ERR;
366-
}
322+
struct s3_meta_request_binding *request_binding = user_data;
367323

368-
bool report_progress;
369-
struct s3_meta_request_binding *request_binding = impl->binding;
370-
if (s_record_progress(request_binding, (uint64_t)actually_read, &report_progress)) {
371-
return AWS_OP_ERR;
372-
}
324+
bool report_progress = false;
325+
s_record_progress(request_binding, progress->bytes_transferred, &report_progress);
373326

374327
if (report_progress) {
375328
/*************** GIL ACQUIRE ***************/
376329
PyGILState_STATE state;
377330
if (aws_py_gilstate_ensure(&state)) {
378-
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
331+
return; /* Python has shut down. Nothing matters anymore, but don't crash */
379332
}
380333
PyObject *result =
381334
PyObject_CallMethod(request_binding->py_core, "_on_progress", "(K)", request_binding->size_transferred);
@@ -385,113 +338,7 @@ static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct
385338
request_binding->size_transferred = 0;
386339
PyGILState_Release(state);
387340
/*************** GIL RELEASE ***************/
388-
if (!result) {
389-
return aws_py_raise_error();
390-
}
391-
}
392-
return AWS_OP_SUCCESS;
393-
}
394-
static int s_aws_input_stream_file_seek(
395-
struct aws_input_stream *stream,
396-
int64_t offset,
397-
enum aws_stream_seek_basis basis) {
398-
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
399-
return aws_input_stream_seek(impl->actual_stream, offset, basis);
400-
}
401-
402-
static int s_aws_input_stream_file_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
403-
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
404-
return aws_input_stream_get_status(impl->actual_stream, status);
405-
}
406-
407-
static int s_aws_input_stream_file_get_length(struct aws_input_stream *stream, int64_t *length) {
408-
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
409-
return aws_input_stream_get_length(impl->actual_stream, length);
410-
}
411-
412-
static void s_aws_input_stream_file_destroy(struct aws_input_py_stream_file_impl *impl) {
413-
struct aws_allocator *allocator = aws_py_get_allocator();
414-
aws_input_stream_release(impl->actual_stream);
415-
aws_mem_release(allocator, impl);
416-
}
417-
418-
static struct aws_input_stream_vtable s_aws_input_stream_file_vtable = {
419-
.seek = s_aws_input_stream_file_seek,
420-
.read = s_aws_input_stream_file_read,
421-
.get_status = s_aws_input_stream_file_get_status,
422-
.get_length = s_aws_input_stream_file_get_length,
423-
};
424-
425-
static struct aws_input_stream *s_input_stream_new_from_file(
426-
struct aws_allocator *allocator,
427-
const char *file_name,
428-
struct s3_meta_request_binding *request_binding) {
429-
struct aws_input_py_stream_file_impl *impl =
430-
aws_mem_calloc(allocator, 1, sizeof(struct aws_input_py_stream_file_impl));
431-
432-
impl->base.vtable = &s_aws_input_stream_file_vtable;
433-
aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);
434-
435-
impl->actual_stream = aws_input_stream_new_from_file(allocator, file_name);
436-
if (!impl->actual_stream) {
437-
aws_mem_release(allocator, impl);
438-
return NULL;
439-
}
440-
impl->binding = request_binding;
441-
442-
return &impl->base;
443-
}
444-
445-
/* Copy an existing HTTP message without body. */
446-
struct aws_http_message *s_copy_http_message(struct aws_allocator *allocator, struct aws_http_message *base_message) {
447-
AWS_PRECONDITION(allocator);
448-
AWS_PRECONDITION(base_message);
449-
450-
struct aws_http_message *message = aws_http_message_new_request(allocator);
451-
452-
if (message == NULL) {
453-
return NULL;
454341
}
455-
456-
struct aws_byte_cursor request_method;
457-
if (aws_http_message_get_request_method(base_message, &request_method)) {
458-
goto error_clean_up;
459-
}
460-
461-
if (aws_http_message_set_request_method(message, request_method)) {
462-
goto error_clean_up;
463-
}
464-
465-
struct aws_byte_cursor request_path;
466-
if (aws_http_message_get_request_path(base_message, &request_path)) {
467-
goto error_clean_up;
468-
}
469-
470-
if (aws_http_message_set_request_path(message, request_path)) {
471-
goto error_clean_up;
472-
}
473-
474-
size_t num_headers = aws_http_message_get_header_count(base_message);
475-
for (size_t header_index = 0; header_index < num_headers; ++header_index) {
476-
struct aws_http_header header;
477-
if (aws_http_message_get_header(base_message, &header, header_index)) {
478-
goto error_clean_up;
479-
}
480-
if (aws_http_message_add_header(message, header)) {
481-
goto error_clean_up;
482-
}
483-
}
484-
485-
return message;
486-
487-
error_clean_up:
488-
489-
if (message != NULL) {
490-
aws_http_message_release(message);
491-
message = NULL;
492-
}
493-
494-
return NULL;
495342
}
496343

497344
PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
@@ -579,37 +426,24 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
579426
Py_INCREF(meta_request->py_core);
580427

581428
if (recv_filepath) {
582-
meta_request->recv_file = aws_fopen(recv_filepath, "wb+");
429+
meta_request->recv_file = aws_fopen(recv_filepath, "wb");
583430
if (!meta_request->recv_file) {
584431
aws_translate_and_raise_io_error(errno);
585432
PyErr_SetAwsLastError();
586433
goto error;
587434
}
588435
}
589-
if (send_filepath) {
590-
if (type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
591-
/* Copy the http request from python object and replace the old pointer with new pointer */
592-
meta_request->copied_message = s_copy_http_message(allocator, http_request);
593-
struct aws_input_stream *input_body = s_input_stream_new_from_file(allocator, send_filepath, meta_request);
594-
if (!input_body) {
595-
PyErr_SetAwsLastError();
596-
goto error;
597-
}
598-
/* rewrite the input stream of the original request */
599-
aws_http_message_set_body_stream(meta_request->copied_message, input_body);
600-
/* Input body is owned by copied message */
601-
aws_input_stream_release(input_body);
602-
}
603-
}
604436

605437
struct aws_s3_meta_request_options s3_meta_request_opt = {
606438
.type = type,
607-
.message = meta_request->copied_message ? meta_request->copied_message : http_request,
439+
.message = http_request,
608440
.signing_config = signing_config,
441+
.send_filepath = aws_byte_cursor_from_c_str(send_filepath),
609442
.headers_callback = s_s3_request_on_headers,
610443
.body_callback = s_s3_request_on_body,
611444
.finish_callback = s_s3_request_on_finish,
612445
.shutdown_callback = s_s3_request_on_shutdown,
446+
.progress_callback = s_s3_request_on_progress,
613447
.user_data = meta_request,
614448
};
615449

0 commit comments

Comments
 (0)