diff --git a/.gitignore b/.gitignore index 89730e41e..0ccf616c8 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,9 @@ src/install # ignore vscode files .vscode +# ignore intellij files +.idea + # ignore macos files .DS_Store @@ -24,4 +27,4 @@ docs/build **/*venv # ignore .zed -.zed \ No newline at end of file +.zed diff --git a/src/api/include/pdc_client_connect.h b/src/api/include/pdc_client_connect.h index 64b7460e9..e8999ab2f 100644 --- a/src/api/include/pdc_client_connect.h +++ b/src/api/include/pdc_client_connect.h @@ -208,7 +208,7 @@ uint32_t PDC_get_client_data_server(); perr_t PDC_Client_read_server_addr_from_file(); /** - * Client request of an obj id by sending object name + * Client request of an obj id by sending object name. * * \param obj_name [IN] Name of the object * \param cont_id[IN] Container ID (obtained from metadata server) @@ -220,32 +220,37 @@ perr_t PDC_Client_read_server_addr_from_file(); perr_t PDC_Client_send_name_recv_id(const char *obj_name, uint64_t cont_id, pdcid_t obj_create_prop, pdcid_t *meta_id, uint32_t *data_server_id, uint32_t *metadata_server_id); -perr_t PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, int obj_ndim, - uint64_t *obj_dims, int remote_ndim, uint64_t *remote_offset, +/** + * The bulk_handle pointer is set to the bulk handle created. + * The caller is responsible for calling HG_Bulk_free + */ +perr_t PDC_Client_transfer_request(hg_bulk_t *bulk_handle, void *buf, pdcid_t obj_id, uint32_t data_server_id, + int obj_ndim, uint64_t *obj_dims, int remote_ndim, uint64_t *remote_offset, uint64_t *remote_size, size_t unit, pdc_access_t access_type, pdcid_t *metadata_id); int PDC_Client_get_var_type_size(pdc_var_type_t dtype); -perr_t PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t data_server_id, - char *bulk_buf, hg_size_t bulk_size, uint64_t *metadata_id, +perr_t PDC_Client_transfer_request_all(hg_bulk_t *bulk_handle, int n_objs, pdc_access_t access_type, + uint32_t data_server_id, char *bulk_buf, hg_size_t bulk_size, + uint64_t *metadata_id, #ifdef ENABLE_MPI MPI_Comm comm); #else int comm); #endif -perr_t PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, int n_objs, - uint32_t metadata_server_id, uint8_t is_write, +perr_t PDC_Client_transfer_request_metadata_query(hg_bulk_t *bulk_handle, char *buf, uint64_t total_buf_size, + int n_objs, uint32_t metadata_server_id, uint8_t is_write, uint64_t *output_buf_size, uint64_t *query_id); -perr_t PDC_Client_transfer_request_metadata_query2(char *buf, uint64_t total_buf_size, uint64_t query_id, - uint32_t metadata_server_id); +perr_t PDC_Client_transfer_request_metadata_query2(hg_bulk_t *bulk_handle, char *buf, uint64_t total_buf_size, + uint64_t query_id, uint32_t metadata_server_id); perr_t PDC_Client_transfer_request_status(pdcid_t transfer_request_id, uint32_t data_server_id, pdc_transfer_status_t *completed); -perr_t PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, +perr_t PDC_Client_transfer_request_wait_all(hg_bulk_t *bulk_handle, int n_objs, pdcid_t *transfer_request_id, uint32_t data_server_id); perr_t PDC_Client_transfer_request_wait(pdcid_t transfer_request_id, uint32_t data_server_id, diff --git a/src/api/pdc.c b/src/api/pdc.c index 6f3a462fb..642f19fc2 100644 --- a/src/api/pdc.c +++ b/src/api/pdc.c @@ -213,8 +213,8 @@ PDCclose(pdcid_t pdcid) pdc_id_list_g = (struct pdc_id_list *)(intptr_t)PDC_free(pdc_id_list_g); - // Finalize METADATA - PDC_Client_finalize(); + if (PDC_Client_finalize() != SUCCEED) + PGOTO_ERROR(FAIL, "Error with PDC_Client_finalize"); done: FUNC_LEAVE(ret_value); diff --git a/src/api/pdc_client_connect.c b/src/api/pdc_client_connect.c index ef503178e..049114d35 100644 --- a/src/api/pdc_client_connect.c +++ b/src/api/pdc_client_connect.c @@ -1626,13 +1626,11 @@ PDC_Client_finalize() LOG_INFO("T_memcpy: %.2f\n", memcpy_time_g); #endif - hg_ret = HG_Context_destroy(send_context_g); - if (hg_ret != HG_SUCCESS) + if (HG_Context_destroy(send_context_g) != HG_SUCCESS) PGOTO_ERROR(FAIL, "Error with HG_Context_destroy"); - hg_ret = HG_Finalize(send_class_g); - if (hg_ret != HG_SUCCESS) - LOG_WARNING("Error with HG_Finalize\n"); + if (HG_Finalize(send_class_g) != HG_SUCCESS) + PGOTO_ERROR(FAIL, "Error with HG_Finalize"); done: FUNC_LEAVE(ret_value); @@ -2974,8 +2972,9 @@ PDC_Client_flush_obj_all() } perr_t -PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t data_server_id, char *bulk_buf, - hg_size_t bulk_size, uint64_t *metadata_id, +PDC_Client_transfer_request_all(hg_bulk_t *bulk_handle, int n_objs, pdc_access_t access_type, + uint32_t data_server_id, char *bulk_buf, hg_size_t bulk_size, + uint64_t *metadata_id, #ifdef ENABLE_MPI MPI_Comm comm) #else @@ -3015,8 +3014,9 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d transfer_request_all_register_id_g, &client_send_transfer_request_all_handle); // Create bulk handles - hg_ret = HG_Bulk_create(hg_class, 1, (void **)&bulk_buf, &bulk_size, HG_BULK_READWRITE, + hg_ret = HG_Bulk_create(hg_class, 1, (void **)&bulk_buf, &bulk_size, HG_BULK_READWRITE, &(in.local_bulk_handle)); + *bulk_handle = in.local_bulk_handle; if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "Could not create local bulk data handle"); @@ -3076,8 +3076,8 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d } perr_t -PDC_Client_transfer_request_metadata_query2(char *buf, uint64_t total_buf_size, uint64_t query_id, - uint32_t metadata_server_id) +PDC_Client_transfer_request_metadata_query2(hg_bulk_t *bulk_handle, char *buf, uint64_t total_buf_size, + uint64_t query_id, uint32_t metadata_server_id) { FUNC_ENTER(NULL); @@ -3110,6 +3110,7 @@ PDC_Client_transfer_request_metadata_query2(char *buf, uint64_t total_buf_size, // For sending metadata hg_ret = HG_Bulk_create(hg_class, 1, (void **)&buf, (hg_size_t *)&(in.total_buf_size), HG_BULK_READWRITE, &(in.local_bulk_handle)); + *bulk_handle = in.local_bulk_handle; if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "Could not create local bulk data handle"); @@ -3137,8 +3138,8 @@ PDC_Client_transfer_request_metadata_query2(char *buf, uint64_t total_buf_size, } perr_t -PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, int n_objs, - uint32_t metadata_server_id, uint8_t is_write, +PDC_Client_transfer_request_metadata_query(hg_bulk_t *bulk_handle, char *buf, uint64_t total_buf_size, + int n_objs, uint32_t metadata_server_id, uint8_t is_write, uint64_t *output_buf_size, uint64_t *query_id) { FUNC_ENTER(NULL); @@ -3174,6 +3175,7 @@ PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, i // For sending metadata hg_ret = HG_Bulk_create(hg_class, 1, (void **)&buf, (hg_size_t *)&(in.total_buf_size), HG_BULK_READWRITE, &(in.local_bulk_handle)); + *bulk_handle = in.local_bulk_handle; if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "Could not create local bulk data handle"); @@ -3203,7 +3205,8 @@ PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, i } perr_t -PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, uint32_t data_server_id) +PDC_Client_transfer_request_wait_all(hg_bulk_t *bulk_handle, int n_objs, pdcid_t *transfer_request_id, + uint32_t data_server_id) { FUNC_ENTER(NULL); @@ -3238,6 +3241,7 @@ PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, u // For sending metadata hg_ret = HG_Bulk_create(hg_class, 1, (void **)&transfer_request_id, (hg_size_t *)&(in.total_buf_size), HG_BULK_READWRITE, &(in.local_bulk_handle)); + *bulk_handle = in.local_bulk_handle; if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "Could not create local bulk data handle"); @@ -3269,8 +3273,8 @@ PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, u } perr_t -PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, int obj_ndim, - uint64_t *obj_dims, int remote_ndim, uint64_t *remote_offset, +PDC_Client_transfer_request(hg_bulk_t *bulk_handle, void *buf, pdcid_t obj_id, uint32_t data_server_id, + int obj_ndim, uint64_t *obj_dims, int remote_ndim, uint64_t *remote_offset, uint64_t *remote_size, size_t unit, pdc_access_t access_type, pdcid_t *metadata_id) { @@ -3328,6 +3332,7 @@ PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, // Create bulk handle hg_ret = HG_Bulk_create(hg_class, 1, (void **)&buf, (hg_size_t *)&total_data_size, HG_BULK_READWRITE, &(in.local_bulk_handle)); + *bulk_handle = in.local_bulk_handle; if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "Could not create local bulk data handle"); diff --git a/src/api/pdc_obj/pdc_cont.c b/src/api/pdc_obj/pdc_cont.c index f325da333..589c037ba 100644 --- a/src/api/pdc_obj/pdc_cont.c +++ b/src/api/pdc_obj/pdc_cont.c @@ -69,7 +69,9 @@ PDCcont_create(const char *cont_name, pdcid_t cont_prop_id) PGOTO_ERROR(0, "PDC pub container memory allocation failed"); p->cont_info_pub->name = strdup(cont_name); - id_info = PDC_find_id(cont_prop_id); + id_info = PDC_find_id(cont_prop_id); + if (id_info == NULL) + PGOTO_ERROR(0, "Failed to find cont prop using pdcid"); cont_prop = (struct _pdc_cont_prop *)(id_info->obj_ptr); p->cont_pt = (struct _pdc_cont_prop *)PDC_calloc(1, sizeof(struct _pdc_cont_prop)); diff --git a/src/api/pdc_obj/pdc_obj.c b/src/api/pdc_obj/pdc_obj.c index e2712ed0e..8623d805c 100644 --- a/src/api/pdc_obj/pdc_obj.c +++ b/src/api/pdc_obj/pdc_obj.c @@ -169,7 +169,9 @@ PDC_obj_create(pdcid_t cont_id, const char *obj_name, pdcid_t obj_prop_id, _pdc_ meta_id = p->cont->cont_info_pub->meta_id; } - id_info = PDC_find_id(obj_prop_id); + id_info = PDC_find_id(obj_prop_id); + if (id_info == NULL) + PGOTO_ERROR(0, "Failed to find obj prop using pdcid"); obj_prop = (struct _pdc_obj_prop *)(id_info->obj_ptr); /* struct _pdc_obj_prop field */ diff --git a/src/api/pdc_obj/pdc_prop.c b/src/api/pdc_obj/pdc_prop.c index 76db3c8b9..0ebdb5d44 100644 --- a/src/api/pdc_obj/pdc_prop.c +++ b/src/api/pdc_obj/pdc_prop.c @@ -74,8 +74,10 @@ PDCprop_create(pdc_prop_type_t type, pdcid_t pdcid) new_id_c = PDC_id_register(PDC_CONT_PROP, p); p->cont_prop_id = new_id_c; id_info = PDC_find_id(pdcid); - pdc_class = (struct _pdc_class *)(id_info->obj_ptr); - p->pdc = (struct _pdc_class *)PDC_calloc(1, sizeof(struct _pdc_class)); + if (id_info == NULL) + PGOTO_ERROR(0, "Failed to find prop using pdcid"); + pdc_class = (struct _pdc_class *)(id_info->obj_ptr); + p->pdc = (struct _pdc_class *)PDC_calloc(1, sizeof(struct _pdc_class)); if (p->pdc == NULL) PGOTO_ERROR(0, "PDC class allocation failed"); if (pdc_class->name) @@ -105,8 +107,10 @@ PDCprop_create(pdc_prop_type_t type, pdcid_t pdcid) new_id_o = PDC_id_register(PDC_OBJ_PROP, q); q->obj_prop_pub->obj_prop_id = new_id_o; id_info = PDC_find_id(pdcid); - pdc_class = (struct _pdc_class *)(id_info->obj_ptr); - q->pdc = (struct _pdc_class *)PDC_calloc(1, sizeof(struct _pdc_class)); + if (id_info == NULL) + PGOTO_ERROR(0, "Failed to find pdc obj using pdcid"); + pdc_class = (struct _pdc_class *)(id_info->obj_ptr); + q->pdc = (struct _pdc_class *)PDC_calloc(1, sizeof(struct _pdc_class)); if (q->pdc == NULL) PGOTO_ERROR(0, "PDC class allocation failed"); if (pdc_class->name) diff --git a/src/api/pdc_region/pdc_region_transfer.c b/src/api/pdc_region/pdc_region_transfer.c index 2a773a299..98c4aa98b 100644 --- a/src/api/pdc_region/pdc_region_transfer.c +++ b/src/api/pdc_region/pdc_region_transfer.c @@ -47,8 +47,10 @@ #define PDC_MERGE_TRANSFER_MIN_COUNT 50 -// pdc region transfer class. Contains essential information for performing non-blocking PDC client I/O -// perations. +/** + * PDC region transfer class. Contains essential information for performing non-blocking PDC client I/O + * operations + */ typedef struct pdc_transfer_request { pdcid_t obj_id; pdcid_t local_obj_id; @@ -109,6 +111,13 @@ typedef struct pdc_transfer_request { // Tang: for merging transfer requests with transfer start_all/wait_all pdcid_t merged_request_id; int is_done; + + // list of bulk handles used for region request + hg_bulk_t *bulk_handles; + // current number of bulk handles stored in array + uint32_t num_bulk_handles; + // current length of the array + uint32_t bulk_handles_capacity; } pdc_transfer_request; // We pack all arguments for a start_all call to the same data server in a single structure, so we do not need @@ -145,6 +154,86 @@ typedef struct pdc_transfer_request_wait_all_pkg { struct pdc_transfer_request_wait_all_pkg *next; } pdc_transfer_request_wait_all_pkg; +#define REGION_TRANSFER_INIT_BULK_HANDLES 2 + +/** + * @brief Initialize a pdc_transfer_request's bulk handle array. + * + * This function allocates memory for an initial number of bulk handles in a transfer request. + * Each entry is initialized to HG_BULK_NULL. The initial capacity is set to + * REGION_TRANSFER_INIT_BULK_HANDLES. + * + * \param tr Pointer to a pdc_transfer_request structure to initialize. + * + * \return SUCCEED on success, FAIL on failure (e.g., if `tr` is NULL or memory allocation fails). + */ +perr_t +PDCregion_transfer_init_bulk_handles(pdc_transfer_request *tr) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + + if (tr == NULL) + PGOTO_ERROR(FAIL, "tr was NULL"); + + tr->bulk_handles_capacity = REGION_TRANSFER_INIT_BULK_HANDLES; + tr->num_bulk_handles = 0; + tr->bulk_handles = (hg_bulk_t *)PDC_malloc(sizeof(hg_bulk_t) * tr->bulk_handles_capacity); + + for (int i = 0; i < tr->bulk_handles_capacity; i++) + tr->bulk_handles[i] = HG_BULK_NULL; + +done: + FUNC_LEAVE(ret_value); +} + +/** + * @brief Add a new bulk handle to a transfer request. + * + * If the internal array is full, it reallocates the array with increased capacity + * (doubling it each time). New elements are initialized to HG_BULK_NULL. + * + * \param tr Pointer to a pdc_transfer_request structure where the bulk handle should be added. + * \param bulk_handle The bulk handle to add to the transfer request. + * + * \return SUCCEED on success, FAIL on error (e.g., invalid input or reallocation failure). + */ +perr_t +PDCregion_transfer_add_bulk_handle(pdc_transfer_request *tr, hg_bulk_t bulk_handle) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + + if (tr == NULL) + PGOTO_ERROR(FAIL, "Invalid pdc transfer request"); + if (tr->bulk_handles == NULL && tr->num_bulk_handles > 0) + PGOTO_ERROR(FAIL, "PDC transfer request has bulk handles but bulk_handles is NULL"); + + // Grow array if needed + if (tr->num_bulk_handles >= tr->bulk_handles_capacity) { + size_t new_capacity = + tr->bulk_handles_capacity > 0 ? tr->bulk_handles_capacity * 2 : REGION_TRANSFER_INIT_BULK_HANDLES; + hg_bulk_t *new_array = (hg_bulk_t *)PDC_realloc(tr->bulk_handles, new_capacity * sizeof(hg_bulk_t)); + if (new_array == NULL) + PGOTO_ERROR(FAIL, "Failed to reallocate bulk handle array"); + + // Initialize the new slots to HG_BULK_NULL + for (size_t i = tr->bulk_handles_capacity; i < new_capacity; i++) + new_array[i] = HG_BULK_NULL; + + tr->bulk_handles = new_array; + tr->bulk_handles_capacity = new_capacity; + } + + tr->bulk_handles[tr->num_bulk_handles] = bulk_handle; + tr->num_bulk_handles++; + +done: + FUNC_LEAVE(ret_value); +} + static int sort_by_data_server_start_all(const void *elem1, const void *elem2) { @@ -247,7 +336,8 @@ PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, p p->consistency = obj2->obj_pt->obj_prop_pub->consistency; p->merged_request_id = 0; p->is_done = 0; - unit = p->unit; + PDCregion_transfer_init_bulk_handles(p); + unit = p->unit; p->local_region_ndim = reg1->ndim; p->local_region_offset = (uint64_t *)PDC_malloc( @@ -296,6 +386,18 @@ PDCregion_transfer_close(pdcid_t transfer_request_id) PGOTO_DONE(ret_value); transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); + for (int i = 0; i < transfer_request->num_bulk_handles; i++) { + if (transfer_request->bulk_handles[i] == HG_BULK_NULL) + LOG_WARNING("Bulk handle added to transfer request was NULL\n"); + else { + if (HG_Bulk_free(transfer_request->bulk_handles[i]) != HG_SUCCESS) + LOG_WARNING("Failed to free bulk handle added to transfer request\n"); + + transfer_request->bulk_handles[i] = HG_BULK_NULL; + } + } + if (transfer_request->bulk_handles) + transfer_request->bulk_handles = PDC_free(transfer_request->bulk_handles); if (transfer_request->local_region_offset) transfer_request->local_region_offset = (uint64_t *)PDC_free(transfer_request->local_region_offset); if (transfer_request->metadata_id) @@ -742,8 +844,9 @@ register_metadata(pdc_transfer_request_start_all_pkg **transfer_request_input, i pdc_transfer_request_start_all_pkg **transfer_requests; pdc_transfer_request_start_all_pkg * transfer_request_head, *transfer_request_front_head, *transfer_request_end, **transfer_request_output, *previous = NULL; - uint64_t total_buf_size, output_buf_size, query_id; - char * buf, *output_buf; + uint64_t total_buf_size, output_buf_size, query_id; + char * buf, *output_buf; + hg_bulk_t bulk_handle; transfer_request_output = NULL; transfer_request_front_head = NULL; @@ -769,14 +872,17 @@ register_metadata(pdc_transfer_request_start_all_pkg **transfer_request_input, i n_objs = i - index; pack_region_metadata_query(transfer_requests + index, n_objs, &buf, &total_buf_size); PDC_Client_transfer_request_metadata_query( - buf, total_buf_size, n_objs, transfer_requests[index]->transfer_request->metadata_server_id, - is_write, &output_buf_size, &query_id); + &bulk_handle, buf, total_buf_size, n_objs, + transfer_requests[index]->transfer_request->metadata_server_id, is_write, &output_buf_size, + &query_id); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); buf = (char *)PDC_free(buf); if (query_id) { output_buf = (char *)PDC_malloc(output_buf_size); PDC_Client_transfer_request_metadata_query2( - output_buf, output_buf_size, query_id, + &bulk_handle, output_buf, output_buf_size, query_id, transfer_requests[index]->transfer_request->metadata_server_id); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); unpack_region_metadata_query(output_buf, transfer_requests + index, &transfer_request_head, &transfer_request_end, &output_size); output_buf = (char *)PDC_free(output_buf); @@ -796,15 +902,18 @@ register_metadata(pdc_transfer_request_start_all_pkg **transfer_request_input, i n_objs = size - index; pack_region_metadata_query(transfer_requests + index, n_objs, &buf, &total_buf_size); PDC_Client_transfer_request_metadata_query( - buf, total_buf_size, n_objs, transfer_requests[index]->transfer_request->metadata_server_id, - is_write, &output_buf_size, &query_id); + &bulk_handle, buf, total_buf_size, n_objs, + transfer_requests[index]->transfer_request->metadata_server_id, is_write, &output_buf_size, + &query_id); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); buf = (char *)PDC_free(buf); // If it is a valid query ID, then it means regions are overlapping. if (query_id) { output_buf = (char *)PDC_malloc(output_buf_size); PDC_Client_transfer_request_metadata_query2( - output_buf, output_buf_size, query_id, + &bulk_handle, output_buf, output_buf_size, query_id, transfer_requests[index]->transfer_request->metadata_server_id); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); unpack_region_metadata_query(output_buf, transfer_requests + index, &transfer_request_head, &transfer_request_end, &output_size); output_buf = (char *)PDC_free(output_buf); @@ -1192,6 +1301,7 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ char * bulk_buf; size_t bulk_buf_size; int * bulk_buf_ref; + hg_bulk_t bulk_handle; if (size == 0) PGOTO_DONE(ret_value); @@ -1208,9 +1318,10 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ &bulk_buf_size, read_bulk_buf + index); bulk_buf_ref = (int *)PDC_malloc(sizeof(int)); bulk_buf_ref[0] = n_objs; - PDC_Client_transfer_request_all(n_objs, transfer_requests[index]->transfer_request->access_type, - transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, - metadata_id + index, comm); + PDC_Client_transfer_request_all( + &bulk_handle, n_objs, transfer_requests[index]->transfer_request->access_type, + transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, metadata_id + index, comm); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); for (j = index; j < i; ++j) { // All requests share the same bulk buffer, reference counter is also shared among all // requests. @@ -1235,9 +1346,10 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ &bulk_buf_size, read_bulk_buf + index); bulk_buf_ref = (int *)PDC_malloc(sizeof(int)); bulk_buf_ref[0] = n_objs; - PDC_Client_transfer_request_all(n_objs, transfer_requests[index]->transfer_request->access_type, - transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, - metadata_id + index, comm); + PDC_Client_transfer_request_all( + &bulk_handle, n_objs, transfer_requests[index]->transfer_request->access_type, + transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, metadata_id + index, comm); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); for (j = index; j < size; ++j) { // All requests share the same bulk buffer, reference counter is also shared among all @@ -1463,6 +1575,7 @@ PDCregion_transfer_start_common(pdcid_t transfer_request_id, pdc_transfer_request *transfer_request; size_t unit; int i; + hg_bulk_t bulk_handle; transferinfo = PDC_find_id(transfer_request_id); if (NULL == transferinfo) @@ -1524,10 +1637,12 @@ PDCregion_transfer_start_common(pdcid_t transfer_request_id, transfer_request->read_bulk_buf[i] = transfer_request->output_buf[i]; } ret_value = PDC_Client_transfer_request( - transfer_request->output_buf[i], transfer_request->obj_id, transfer_request->obj_servers[i], - transfer_request->obj_ndim, transfer_request->obj_dims, transfer_request->remote_region_ndim, - transfer_request->output_offsets[i], transfer_request->output_sizes[i], unit, - transfer_request->access_type, transfer_request->metadata_id + i); + &bulk_handle, transfer_request->output_buf[i], transfer_request->obj_id, + transfer_request->obj_servers[i], transfer_request->obj_ndim, transfer_request->obj_dims, + transfer_request->remote_region_ndim, transfer_request->output_offsets[i], + transfer_request->output_sizes[i], unit, transfer_request->access_type, + transfer_request->metadata_id + i); + PDCregion_transfer_add_bulk_handle(transfer_request, bulk_handle); } } else if (transfer_request->region_partition == PDC_OBJ_STATIC) { @@ -1543,10 +1658,12 @@ PDCregion_transfer_start_common(pdcid_t transfer_request_id, // Submit transfer request to server by designating data server ID, remote region info, and contiguous // memory buffer for copy. ret_value = PDC_Client_transfer_request( - transfer_request->new_buf, transfer_request->obj_id, transfer_request->data_server_id, - transfer_request->obj_ndim, transfer_request->obj_dims, transfer_request->remote_region_ndim, - transfer_request->remote_region_offset, transfer_request->remote_region_size, unit, - transfer_request->access_type, transfer_request->metadata_id); + &bulk_handle, transfer_request->new_buf, transfer_request->obj_id, + transfer_request->data_server_id, transfer_request->obj_ndim, transfer_request->obj_dims, + transfer_request->remote_region_ndim, transfer_request->remote_region_offset, + transfer_request->remote_region_size, unit, transfer_request->access_type, + transfer_request->metadata_id); + PDCregion_transfer_add_bulk_handle(transfer_request, bulk_handle); } // For POSIX consistency, we block here until the data is received by the server @@ -1723,6 +1840,7 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) uint64_t * metadata_ids, merge_off = 0, cur_off = 0; pdc_transfer_request_wait_all_pkg **transfer_requests, *transfer_request_head, *transfer_request_end, *temp; + hg_bulk_t bulk_handle; struct _pdc_id_info **transferinfo; pdc_transfer_request *transfer_request, *merged_request; @@ -1812,8 +1930,9 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) metadata_ids[j] = transfer_requests[j]->metadata_id; } - PDC_Client_transfer_request_wait_all(n_objs, metadata_ids + index, + PDC_Client_transfer_request_wait_all(&bulk_handle, n_objs, metadata_ids + index, transfer_requests[index]->data_server_id); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); for (j = index; j < i; ++j) { if (transfer_requests[j]->transfer_request->region_partition == PDC_REGION_STATIC || transfer_requests[j]->transfer_request->region_partition == PDC_REGION_DYNAMIC || @@ -1854,8 +1973,9 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) for (j = index; j < total_requests; ++j) { metadata_ids[j] = transfer_requests[j]->metadata_id; } - PDC_Client_transfer_request_wait_all(n_objs, metadata_ids + index, + PDC_Client_transfer_request_wait_all(&bulk_handle, n_objs, metadata_ids + index, transfer_requests[index]->data_server_id); + PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle); for (j = index; j < total_requests; ++j) { if (transfer_requests[j]->transfer_request->region_partition == PDC_REGION_STATIC || transfer_requests[j]->transfer_request->region_partition == PDC_REGION_DYNAMIC || diff --git a/src/tests/region/region_transfer_all_append_3D.c b/src/tests/region/region_transfer_all_append_3D.c index e50bcd362..019a86fd4 100644 --- a/src/tests/region/region_transfer_all_append_3D.c +++ b/src/tests/region/region_transfer_all_append_3D.c @@ -86,11 +86,6 @@ main(int argc, char **argv) dims[1] = DIM1; dims[2] = DIM2; - // create a pdc - pdc = PDCinit("pdc"); - LOG_INFO("create a new pdc\n"); - - // create a container property // create a pdc TASSERT((pdc = PDCinit("pdc")) != 0, "Call to PDCinit succeeded", "Call to PDCinit failed"); // create a container property diff --git a/src/tests/region/region_transfer_all_split_wait.c b/src/tests/region/region_transfer_all_split_wait.c index b583a20c2..894976329 100644 --- a/src/tests/region/region_transfer_all_split_wait.c +++ b/src/tests/region/region_transfer_all_split_wait.c @@ -70,10 +70,6 @@ main(int argc, char **argv) dims[0] = BUF_LEN; - // create a pdc - pdc = PDCinit("pdc"); - LOG_INFO("create a new pdc\n"); - // create a container property TASSERT((pdc = PDCinit("pdc")) != 0, "Call to PDCinit succeeded", "Call to PDCinit failed"); // create a container property