Skip to content

Commit fa0df07

Browse files
committed
Initial implementation of generic data (bulk) transfer from client to server
1 parent 0a1e146 commit fa0df07

File tree

7 files changed

+229
-6
lines changed

7 files changed

+229
-6
lines changed

src/api/include/pdc_client_connect.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ struct _pdc_transfer_request_metadata_query2_args {
115115
int32_t ret;
116116
};
117117

118+
struct _generic_bulk_c2s_transfer_args {
119+
int32_t ret;
120+
};
121+
118122
struct _pdc_transfer_request_status_args {
119123
uint32_t status;
120124
int32_t ret;
@@ -1135,4 +1139,6 @@ void report_avg_server_profiling_rst();
11351139
perr_t PDC_Client_transfer_pthread_create();
11361140
perr_t PDC_Client_transfer_pthread_terminate();
11371141
perr_t PDC_Client_transfer_pthread_cnt_add(int n);
1142+
1143+
perr_t PDC_data_transfer_c2s(uint32_t server_id, void *buf, uint64_t buf_size);
11381144
#endif /* PDC_CLIENT_CONNECT_H */

src/api/pdc_client_connect.c

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ static hg_id_t transfer_request_status_register_id_g;
171171
static hg_id_t transfer_request_wait_register_id_g;
172172
static hg_id_t buf_map_register_id_g;
173173
static hg_id_t buf_unmap_register_id_g;
174+
static hg_id_t generic_bulk_c2s_transfer_register_id_g;
174175

175176
static hg_id_t cont_add_del_objs_rpc_register_id_g;
176177
static hg_id_t cont_add_tags_rpc_register_id_g;
@@ -627,7 +628,6 @@ client_send_transfer_request_metadata_query_rpc_cb(const struct hg_cb_info *call
627628
}
628629

629630
static hg_return_t
630-
631631
client_send_transfer_request_metadata_query2_rpc_cb(const struct hg_cb_info *callback_info)
632632
{
633633
hg_return_t ret_value = HG_SUCCESS;
@@ -1510,6 +1510,7 @@ PDC_Client_mercury_init(hg_class_t **hg_class, hg_context_t **hg_context, int po
15101510
transfer_request_wait_register_id_g = PDC_transfer_request_wait_register(*hg_class);
15111511
buf_map_register_id_g = PDC_buf_map_register(*hg_class);
15121512
buf_unmap_register_id_g = PDC_buf_unmap_register(*hg_class);
1513+
generic_bulk_c2s_transfer_register_id_g = PDC_generic_bulk_c2s_transfer_register(*hg_class);
15131514

15141515
// Analysis and Transforms
15151516
analysis_ftn_register_id_g = PDC_analysis_ftn_register(*hg_class);
@@ -9611,3 +9612,87 @@ PDC_Client_search_obj_ref_through_dart_mpi(dart_hash_algo_t hash_algo, char *que
96119612
#endif
96129613

96139614
/******************** Collective Object Selection Query Ends *******************************/
9615+
9616+
// Generic bulk data transfer from client to server
9617+
static hg_return_t
9618+
generic_bulk_c2s_rpc_cb(const struct hg_cb_info *callback_info)
9619+
{
9620+
hg_return_t ret_value = HG_SUCCESS;
9621+
hg_handle_t handle;
9622+
struct _generic_bulk_c2s_transfer_args *transfer_args;
9623+
generic_bulk_c2s_transfer_out_t output;
9624+
9625+
FUNC_ENTER(NULL);
9626+
9627+
fprintf(stderr, "entered %s\n", __func__);
9628+
9629+
transfer_args = (struct _generic_bulk_c2s_transfer_args*)callback_info->arg;
9630+
handle = callback_info->info.forward.handle;
9631+
9632+
ret_value = HG_Get_output(handle, &output);
9633+
if (ret_value != HG_SUCCESS) {
9634+
PGOTO_ERROR(FAIL, "==CLIENT[%d]: ERROR with HG_Get_output @ line %d",
9635+
pdc_client_mpi_rank_g, __LINE__);
9636+
transfer_args->ret = -1;
9637+
goto done;
9638+
}
9639+
9640+
transfer_args->ret = output.ret;
9641+
fprintf(stderr, "received return value %d\n", output.ret);
9642+
9643+
done:
9644+
fflush(stdout);
9645+
hg_atomic_decr32(&atomic_work_todo_g);
9646+
HG_Free_output(handle, &output);
9647+
9648+
FUNC_LEAVE(ret_value);
9649+
}
9650+
9651+
perr_t
9652+
PDC_data_transfer_c2s(uint32_t server_id, void *buf, uint64_t buf_size)
9653+
{
9654+
perr_t ret_value = SUCCEED;
9655+
hg_return_t hg_ret = HG_SUCCESS;
9656+
hg_class_t *hg_class;
9657+
hg_handle_t rpc_handle;
9658+
generic_bulk_c2s_transfer_in_t in;
9659+
struct _generic_bulk_c2s_transfer_args transfer_args;
9660+
9661+
FUNC_ENTER(NULL);
9662+
9663+
debug_server_id_count[server_id]++;
9664+
9665+
hg_class = HG_Context_get_class(send_context_g);
9666+
9667+
if (PDC_Client_try_lookup_server(server_id, 0) != SUCCEED)
9668+
PGOTO_ERROR(FAIL, "==CLIENT[%d]: ERROR with PDC_Client_try_lookup_server @ line %d",
9669+
pdc_client_mpi_rank_g, __LINE__);
9670+
9671+
hg_ret = HG_Create(send_context_g, pdc_server_info_g[server_id].addr,
9672+
generic_bulk_c2s_transfer_register_id_g, &rpc_handle);
9673+
if (hg_ret != HG_SUCCESS)
9674+
PGOTO_ERROR(FAIL, "%s: Could not create rpc handle @ line %d\n", __func__, __LINE__);
9675+
9676+
in.buf_size = (hg_size_t) buf_size;
9677+
9678+
hg_ret = HG_Bulk_create(hg_class, 1, &buf, &in.buf_size, HG_BULK_READ_ONLY, &in.local_bulk_handle);
9679+
if (hg_ret != HG_SUCCESS)
9680+
PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__);
9681+
9682+
hg_atomic_set32(&atomic_work_todo_g, 1);
9683+
9684+
hg_ret = HG_Forward(rpc_handle, generic_bulk_c2s_rpc_cb, &transfer_args, &in);
9685+
if (hg_ret != HG_SUCCESS)
9686+
PGOTO_ERROR(FAIL, "%s: Could not start HG_Forward() @ line %d\n", __func__, __LINE__);
9687+
9688+
PDC_Client_check_response(&send_context_g);
9689+
9690+
if (transfer_args.ret != 1)
9691+
PGOTO_ERROR(FAIL, "%s return failed @ line %d\n", __func__, __LINE__);
9692+
9693+
HG_Destroy(rpc_handle);
9694+
9695+
done:
9696+
fflush(stdout);
9697+
FUNC_LEAVE(ret_value);
9698+
}

src/server/include/pdc_client_server_common.h

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,17 @@ typedef struct transfer_request_metadata_query2_out_t {
851851
int32_t ret;
852852
} transfer_request_metadata_query2_out_t;
853853

854+
/* Define generic_bulk_c2s_transfer_in_t*/
855+
typedef struct generic_bulk_c2s_transfer_in_t {
856+
hg_bulk_t local_bulk_handle;
857+
hg_size_t buf_size;
858+
} generic_bulk_c2s_transfer_in_t;
859+
860+
/* Define generic_bulk_c2s_transfer_out_t*/
861+
typedef struct generic_bulk_c2s_transfer_out_t {
862+
int32_t ret;
863+
} generic_bulk_c2s_transfer_out_t;
864+
854865
/* Define buf_map_in_t */
855866
typedef struct {
856867
uint32_t meta_server_id;
@@ -2975,6 +2986,42 @@ hg_proc_transfer_request_metadata_query2_out_t(hg_proc_t proc, void *data)
29752986
return ret;
29762987
}
29772988

2989+
/* Define hg_proc_generic_bulk_c2s_transfer_in_t */
2990+
static HG_INLINE hg_return_t
2991+
hg_proc_generic_bulk_c2s_transfer_in_t(hg_proc_t proc, void *data)
2992+
{
2993+
hg_return_t ret;
2994+
generic_bulk_c2s_transfer_in_t *struct_data = (generic_bulk_c2s_transfer_in_t *)data;
2995+
2996+
ret = hg_proc_hg_bulk_t(proc, &struct_data->local_bulk_handle);
2997+
if (ret != HG_SUCCESS) {
2998+
fprintf(stderr, "Proc error @ %s:%d", __func__, __LINE__);
2999+
return ret;
3000+
}
3001+
3002+
ret = hg_proc_hg_size_t(proc, &struct_data->buf_size);
3003+
if (ret != HG_SUCCESS) {
3004+
fprintf(stderr, "Proc error @ %s:%d", __func__, __LINE__);
3005+
return ret;
3006+
}
3007+
3008+
return ret;
3009+
}
3010+
3011+
/* Define hg_proc_generic_bulk_c2s_transfer_out_t */
3012+
static HG_INLINE hg_return_t
3013+
hg_proc_generic_bulk_c2s_transfer_out_t(hg_proc_t proc, void *data)
3014+
{
3015+
hg_return_t ret;
3016+
generic_bulk_c2s_transfer_out_t *struct_data = (generic_bulk_c2s_transfer_out_t *)data;
3017+
3018+
ret = hg_proc_int32_t(proc, &struct_data->ret);
3019+
if (ret != HG_SUCCESS) {
3020+
fprintf(stderr, "Proc error @ %s:%d", __func__, __LINE__);
3021+
}
3022+
return ret;
3023+
}
3024+
29783025
/* Define hg_proc_transfer_request_wait_in_t */
29793026
static HG_INLINE hg_return_t
29803027
hg_proc_transfer_request_wait_in_t(hg_proc_t proc, void *data)
@@ -4041,7 +4088,7 @@ struct bulk_args_t {
40414088
uint64_t cont_id;
40424089
hg_handle_t handle;
40434090
hg_bulk_t bulk_handle;
4044-
size_t nbytes;
4091+
hg_size_t nbytes;
40454092
int origin;
40464093
size_t ret;
40474094
pdc_metadata_t ** meta_arr;
@@ -4326,6 +4373,7 @@ hg_id_t PDC_region_transform_release_register(hg_class_t *hg_class);
43264373
hg_id_t PDC_transform_region_release_register(hg_class_t *hg_class);
43274374
hg_id_t PDC_buf_map_server_register(hg_class_t *hg_class);
43284375
hg_id_t PDC_buf_unmap_server_register(hg_class_t *hg_class);
4376+
hg_id_t PDC_generic_bulk_c2s_transfer_register(hg_class_t *hg_class);
43294377

43304378
hg_id_t PDC_test_bulk_xfer_register(hg_class_t *hg_class);
43314379
hg_id_t PDC_server_lookup_remote_server_register(hg_class_t *hg_class);

src/server/pdc_client_server_common.c

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4703,12 +4703,12 @@ HG_TEST_RPC_CB(query_kvtag, handle)
47034703
FUNC_LEAVE(ret_value);
47044704
}
47054705

4706+
47064707
/*
47074708
* Data server related
47084709
*/
47094710

47104711
static hg_return_t
4711-
47124712
update_storage_meta_bulk_cb(const struct hg_cb_info *hg_cb_info)
47134713
{
47144714
hg_return_t ret_value = HG_SUCCESS;
@@ -6534,6 +6534,86 @@ HG_TEST_RPC_CB(dart_perform_one_server, handle)
65346534
FUNC_LEAVE(ret);
65356535
}
65366536

6537+
static hg_return_t
6538+
process_generic_c2s_bulk_transfer_cb(const struct hg_cb_info *hg_cb_info)
6539+
{
6540+
hg_return_t ret_value = HG_SUCCESS;
6541+
struct bulk_args_t *bulk_args = (struct bulk_args_t *)hg_cb_info->arg;
6542+
hg_bulk_t local_bulk_handle = hg_cb_info->info.bulk.local_handle;
6543+
void *buf;
6544+
generic_bulk_c2s_transfer_out_t out;
6545+
6546+
FUNC_ENTER(NULL);
6547+
6548+
ret_value = HG_Bulk_access(local_bulk_handle, 0, bulk_args->nbytes, HG_BULK_READWRITE, 1, &buf, NULL, NULL);
6549+
if (ret_value != HG_SUCCESS)
6550+
PGOTO_ERROR(ret_value, "Could not access bulk data @ %s:%d", __func__, __LINE__);
6551+
6552+
printf("==PDC_SERVER: process generic bulk data, size %lu\n", bulk_args->nbytes);
6553+
fprintf(stderr, "first 3 values: %d %d %d\n", ((int*)buf)[0], ((int*)buf)[1], ((int*)buf)[2]);
6554+
fprintf(stderr, "last values: %d %d %d\n", ((int*)buf)[bulk_args->nbytes/4-3],
6555+
((int*)buf)[bulk_args->nbytes/4-2], ((int*)buf)[bulk_args->nbytes/4-1]);
6556+
6557+
out.ret = 1;
6558+
HG_Respond(bulk_args->handle, NULL, NULL, &out);
6559+
6560+
done:
6561+
fflush(stdout);
6562+
HG_Bulk_free(local_bulk_handle);
6563+
HG_Destroy(bulk_args->handle);
6564+
free(bulk_args);
6565+
6566+
FUNC_LEAVE(ret_value);
6567+
}
6568+
6569+
/* static hg_return_t */
6570+
/* generic_bulk_c2s_transfer_cb(hg_handle_t handle) */
6571+
HG_TEST_RPC_CB(generic_bulk_c2s_transfer, handle)
6572+
{
6573+
hg_return_t ret_value = HG_SUCCESS;
6574+
hg_bulk_t origin_bulk_handle, local_bulk_handle;
6575+
const struct hg_info *hg_info = NULL;
6576+
struct bulk_args_t *bulk_args = NULL;
6577+
generic_bulk_c2s_transfer_in_t in;
6578+
generic_bulk_c2s_transfer_out_t out;
6579+
void *buf;
6580+
6581+
FUNC_ENTER(NULL);
6582+
6583+
/* Get input parameters and data */
6584+
ret_value = HG_Get_input(handle, &in);
6585+
if (ret_value != HG_SUCCESS)
6586+
PGOTO_ERROR(ret_value, "Could not get input @ %s:%d", __func__, __LINE__);
6587+
6588+
bulk_args = (struct bulk_args_t *)malloc(sizeof(struct bulk_args_t));
6589+
bulk_args->handle = handle;
6590+
origin_bulk_handle = in.local_bulk_handle;
6591+
bulk_args->nbytes = HG_Bulk_get_size(origin_bulk_handle);
6592+
6593+
buf = malloc(bulk_args->nbytes);
6594+
6595+
fprintf(stderr, "==PDC_SERVER: %s received nbytes %lu\n", __func__, bulk_args->nbytes);
6596+
6597+
hg_info = HG_Get_info(handle);
6598+
HG_Bulk_create(hg_info->hg_class, 1, &buf, &bulk_args->nbytes, HG_BULK_WRITE_ONLY, &local_bulk_handle);
6599+
6600+
// Data should be processed in process_generic_c2s_bulk_transfer_cb, after sending a reply to the client
6601+
// HG_Bulk_transfer is non-blocking
6602+
ret_value = HG_Bulk_transfer(hg_info->context, process_generic_c2s_bulk_transfer_cb, bulk_args,
6603+
HG_BULK_PULL, hg_info->addr, origin_bulk_handle, 0,
6604+
local_bulk_handle, 0, bulk_args->nbytes, HG_OP_ID_IGNORE);
6605+
if (ret_value != HG_SUCCESS)
6606+
PGOTO_ERROR(ret_value, "Could not read bulk data");
6607+
6608+
HG_Free_input(handle, &in);
6609+
6610+
done:
6611+
fflush(stdout);
6612+
/* out.ret = ret_value; */
6613+
/* HG_Respond(handle, NULL, NULL, &out); */
6614+
FUNC_LEAVE(ret_value);
6615+
}
6616+
65376617
HG_TEST_THREAD_CB(server_lookup_client)
65386618
HG_TEST_THREAD_CB(gen_obj_id)
65396619
HG_TEST_THREAD_CB(gen_cont_id)
@@ -6594,6 +6674,7 @@ HG_TEST_THREAD_CB(send_client_storage_meta_rpc)
65946674
HG_TEST_THREAD_CB(send_shm_bulk_rpc)
65956675
HG_TEST_THREAD_CB(send_data_query_rpc)
65966676
HG_TEST_THREAD_CB(send_rpc)
6677+
HG_TEST_THREAD_CB(generic_bulk_c2s_transfer)
65976678

65986679
HG_TEST_THREAD_CB(send_nhits)
65996680
HG_TEST_THREAD_CB(send_bulk_rpc)
@@ -6639,6 +6720,7 @@ PDC_FUNC_DECLARE_REGISTER_IN_OUT(buf_map_server, buf_map_in_t, buf_map_out_t)
66396720
PDC_FUNC_DECLARE_REGISTER_IN_OUT(buf_unmap_server, buf_unmap_in_t, buf_unmap_out_t)
66406721
PDC_FUNC_DECLARE_REGISTER(buf_unmap)
66416722
PDC_FUNC_DECLARE_REGISTER(region_lock)
6723+
PDC_FUNC_DECLARE_REGISTER(generic_bulk_c2s_transfer)
66426724

66436725
PDC_FUNC_DECLARE_REGISTER_IN_OUT(region_release, region_lock_in_t, region_lock_out_t)
66446726
PDC_FUNC_DECLARE_REGISTER_IN_OUT(transform_region_release, region_transform_and_lock_in_t, region_lock_out_t)

src/server/pdc_server.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,6 +2047,7 @@ PDC_Server_mercury_register()
20472047
PDC_transform_region_release_register(hg_class_g);
20482048
PDC_region_transform_release_register(hg_class_g);
20492049
PDC_region_analysis_release_register(hg_class_g);
2050+
PDC_generic_bulk_c2s_transfer_register(hg_class_g);
20502051

20512052
// DART Index
20522053
PDC_dart_get_server_info_register(hg_class_g);

src/server/pdc_server_region/pdc_server_region_request_handler.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ transfer_request_all_bulk_transfer_read_cb2(const struct hg_cb_info *info)
2222
free(local_bulk_args2->transfer_request_id);
2323
HG_Bulk_free(local_bulk_args2->bulk_handle);
2424
HG_Destroy(local_bulk_args2->handle);
25-
free(local_bulk_args2);
2625
// printf("finishing transfer_request_all_bulk_transfer_read_cb2\n");
2726

2827
#ifdef PDC_TIMING
@@ -32,6 +31,7 @@ transfer_request_all_bulk_transfer_read_cb2(const struct hg_cb_info *info)
3231
pdc_server_timings->PDCreg_transfer_request_inner_read_all_bulk_rpc += end - start;
3332
pdc_timestamp_register(pdc_transfer_request_inner_read_all_bulk_timestamps, start, end);
3433
#endif
34+
free(local_bulk_args2);
3535

3636
FUNC_LEAVE(ret);
3737
}
@@ -331,15 +331,15 @@ transfer_request_wait_all_bulk_transfer_cb(const struct hg_cb_info *info)
331331

332332
HG_Bulk_free(local_bulk_args->bulk_handle);
333333

334-
free(local_bulk_args);
335-
336334
#ifdef PDC_TIMING
337335
double end = MPI_Wtime();
338336

339337
pdc_server_timings->PDCreg_transfer_request_wait_all_rpc += end - local_bulk_args->start_time;
340338
pdc_timestamp_register(pdc_transfer_request_wait_all_timestamps, local_bulk_args->start_time, end);
341339
#endif
342340

341+
free(local_bulk_args);
342+
343343
FUNC_LEAVE(ret);
344344
}
345345

src/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ include_directories(${TEST_EXT_INCLUDE_DIRS})
4343

4444
set(PROGRAMS
4545
pdc_init
46+
pdc_generic_bulk
4647
# create_prop
4748
# create_region
4849
# create_obj_coll

0 commit comments

Comments
 (0)