Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/binding/generate_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def generate_global_attr_impl(input):
text += f" g_default_attr.{attr_name} = get_env_or(\"{env_var}\", {attr['default_value']});\n"
if type_name == "std::string":
text += f" LCI_Log(LOG_INFO, \"env\", \"set {attr_name} to be %s\\n\", g_default_attr.{attr_name}.c_str());\n"
elif type_name in ["double", "float"]:
text += f" LCI_Log(LOG_INFO, \"env\", \"set {attr_name} to be %f\\n\", static_cast<double>(g_default_attr.{attr_name}));\n"
else:
text += f" LCI_Log(LOG_INFO, \"env\", \"set {attr_name} to be %d\\n\", static_cast<int>(g_default_attr.{attr_name}));\n"

Expand Down Expand Up @@ -495,4 +497,4 @@ def generate_binding(input):
if len(input) == 0:
print("No input found. Exiting.")
exit(1)
generate_binding(input)
generate_binding(input)
1 change: 1 addition & 0 deletions src/binding/input/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
attr("size_t", "net_max_sends", default_value="LCI_BACKEND_MAX_SENDS_DEFAULT", comment="The maximum number of sends that can be posted to the underlying network queue at the same time."),
attr("size_t", "net_max_recvs", default_value="LCI_BACKEND_MAX_RECVS_DEFAULT", comment="The maximum number of receives that can be posted to the underlying network queue at the same time."),
attr("size_t", "net_max_cqes", default_value="LCI_BACKEND_MAX_CQES_DEFAULT", comment="The maximum number of CQEs that can reside in the underlying network queue at the same time."),
attr("double", "net_send_reserved_pct", default_value=0.25, comment="Fraction of send queue slots reserved for high-priority operations (0.0-1.0)."),
attr("uint64_t", "ofi_lock_mode", comment="For the OFI backend: the lock mode for the device."),
attr("bool", "alloc_default_endpoint", default_value=1, comment="Whether to allocate the default endpoint."),
attr("int", "uid", default_value=-1, inout_trait="out", comment="A unique device id across the entire process."),
Expand Down
45 changes: 34 additions & 11 deletions src/network/endpoint_inline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,22 @@ inline error_t endpoint_impl_t::post_sends(int rank, void* buffer, size_t size,
void* user_context, bool allow_retry,
bool force_post)
{
// allow_retry is used by upper layer to decide whether to push the
// operation to backlog queue if the operation fails
// if allow_retry is false, the operation will be pushed to backlog queue
// if it fails
// force_post is used by backlog queue to force post the operations
// in the backlog queue
// We consider the operation high priority if it is either allow_retry is
// false or force_post is true, to minimize the chance of being pushed back to
// backlog queue.
error_t error;
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
error = post_sends_impl(rank, buffer, size, imm_data, user_context);
bool high_priority = !allow_retry || force_post;
error = post_sends_impl(rank, buffer, size, imm_data, user_context,
high_priority);
}
if (error.is_retry()) {
if (error.errorcode == errorcode_t::retry_lock) {
Expand Down Expand Up @@ -50,7 +61,9 @@ inline error_t endpoint_impl_t::post_send(int rank, void* buffer, size_t size,
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
error = post_send_impl(rank, buffer, size, mr, imm_data, user_context);
bool high_priority = !allow_retry || force_post;
error = post_send_impl(rank, buffer, size, mr, imm_data, user_context,
high_priority);
}
if (error.is_retry()) {
if (error.errorcode == errorcode_t::retry_lock) {
Expand Down Expand Up @@ -85,7 +98,9 @@ inline error_t endpoint_impl_t::post_puts(int rank, void* buffer, size_t size,
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
error = post_puts_impl(rank, buffer, size, offset, rmr, user_context);
bool high_priority = !allow_retry || force_post;
error = post_puts_impl(rank, buffer, size, offset, rmr, user_context,
high_priority);
}
if (error.is_retry()) {
LCI_PCOUNTER_ADD(net_write_post_retry, 1);
Expand Down Expand Up @@ -115,7 +130,9 @@ inline error_t endpoint_impl_t::post_put(int rank, void* buffer, size_t size,
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
error = post_put_impl(rank, buffer, size, mr, offset, rmr, user_context);
bool high_priority = !allow_retry || force_post;
error = post_put_impl(rank, buffer, size, mr, offset, rmr, user_context,
high_priority);
}
if (error.is_retry()) {
LCI_PCOUNTER_ADD(net_write_post_retry, 1);
Expand All @@ -137,14 +154,15 @@ inline error_t endpoint_impl_t::post_put(int rank, void* buffer, size_t size,

inline error_t endpoint_impl_t::post_putImms_fallback(
int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr,
net_imm_data_t imm_data, void* user_context)
net_imm_data_t imm_data, void* user_context, bool high_priority)
{
// fallback to post_put
LCI_DBG_Log(
LOG_TRACE, "network",
"fallback to post_puts imm_data %x user_context %p (ignored for sends)\n",
imm_data, user_context);
error_t error = post_puts_impl(rank, buffer, size, offset, rmr, user_context);
error_t error = post_puts_impl(rank, buffer, size, offset, rmr, user_context,
high_priority);
if (!error.is_retry()) {
LCI_Assert(error.is_done(), "Unexpected error %d\n", error);
// we do not allow retry for post_sends
Expand All @@ -167,12 +185,13 @@ inline error_t endpoint_impl_t::post_putImms(int rank, void* buffer,
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
bool high_priority = !allow_retry || force_post;
if (net_context_attr.support_putimm) {
error = post_putImms_impl(rank, buffer, size, offset, rmr, imm_data,
user_context);
user_context, high_priority);
} else {
error = post_putImms_fallback(rank, buffer, size, offset, rmr, imm_data,
user_context);
user_context, high_priority);
}
}
if (error.is_retry()) {
Expand Down Expand Up @@ -210,7 +229,8 @@ inline error_t endpoint_impl_t::post_putImm_fallback(int rank, void* buffer,
ectx->imm_data = imm_data;
ectx->signal_count = 1;
ectx->internal_ctx = static_cast<internal_context_t*>(user_context);
error_t error = post_put_impl(rank, buffer, size, mr, offset, rmr, ectx);
error_t error = post_put_impl(rank, buffer, size, mr, offset, rmr, ectx,
true /* high_priority */);
if (error.is_retry()) {
delete ectx;
}
Expand All @@ -227,9 +247,10 @@ inline error_t endpoint_impl_t::post_putImm(int rank, void* buffer, size_t size,
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
bool high_priority = !allow_retry || force_post;
if (net_context_attr.support_putimm) {
error = post_putImm_impl(rank, buffer, size, mr, offset, rmr, imm_data,
user_context);
user_context, high_priority);
} else {
error = post_putImm_fallback(rank, buffer, size, mr, offset, rmr,
imm_data, user_context);
Expand Down Expand Up @@ -263,7 +284,9 @@ inline error_t endpoint_impl_t::post_get(int rank, void* buffer, size_t size,
if (!force_post && !backlog_queue.is_empty(rank)) {
error = errorcode_t::retry_backlog;
} else {
error = post_get_impl(rank, buffer, size, mr, offset, rmr, user_context);
bool high_priority = !allow_retry || force_post;
error = post_get_impl(rank, buffer, size, mr, offset, rmr, user_context,
high_priority);
}
if (error.is_retry()) {
LCI_PCOUNTER_ADD(net_read_post_retry, 1);
Expand Down
10 changes: 7 additions & 3 deletions src/network/ibv/backend_ibv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ ibv_device_impl_t::ibv_device_impl_t(net_context_t net_context_,
device_t::attr_t attr_)
: device_impl_t(net_context_, attr_)
{
net_context_attr = net_context.get_attr();
ibv_net_context_impl_t* p_net_context =
static_cast<ibv_net_context_impl_t*>(net_context.p_impl);

Expand Down Expand Up @@ -271,6 +270,11 @@ ibv_device_impl_t::ibv_device_impl_t(net_context_t net_context_,
}

ib_qps.resize(get_rank_n());
qp_remaining_slots = std::vector<padded_atomic_t<int>>(get_rank_n());
for (auto& slot : qp_remaining_slots) {
slot.val.store(static_cast<int>(attr.net_max_sends),
std::memory_order_relaxed);
}
struct bootstrap_data_t {
int source_rank;
int target_rank;
Expand Down Expand Up @@ -597,11 +601,11 @@ ibv_endpoint_impl_t::ibv_endpoint_impl_t(device_t device_, attr_t attr_)
p_ibv_device(reinterpret_cast<ibv_device_impl_t*>(device.p_impl)),
ib_qps(p_ibv_device->ib_qps),
ib_qp_extras(&p_ibv_device->ib_qp_extras),
net_context_attr(p_ibv_device->net_context_attr),
qp_remaining_slots(&p_ibv_device->qp_remaining_slots),
qps_lock(&p_ibv_device->qps_lock)
{
}

ibv_endpoint_impl_t::~ibv_endpoint_impl_t() {}

} // namespace lci
} // namespace lci
29 changes: 17 additions & 12 deletions src/network/ibv/backend_ibv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class ibv_device_impl_t : public lci::device_impl_t
qp2rank_map_t qp2rank_map;
std::vector<struct ibv_qp*> ib_qps;
std::vector<LCISI_ibv_qp_extra_t> ib_qp_extras;
std::vector<padded_atomic_t<int>> qp_remaining_slots;

net_context_attr_t net_context_attr;
ibv_mr_impl_t odp_mr;
LCIU_CACHE_PADDING(0);
spinlock_t srq_lock;
Expand All @@ -118,35 +118,40 @@ class ibv_endpoint_impl_t : public lci::endpoint_impl_t
ibv_endpoint_impl_t(device_t device_, attr_t attr_);
~ibv_endpoint_impl_t() override;
error_t post_sends_impl(int rank, void* buffer, size_t size,
net_imm_data_t imm_data, void* user_context) override;
net_imm_data_t imm_data, void* user_context,
bool high_priority) override;
error_t post_send_impl(int rank, void* buffer, size_t size, mr_t mr,
net_imm_data_t imm_data, void* user_context) override;
net_imm_data_t imm_data, void* user_context,
bool high_priority) override;
error_t post_puts_impl(int rank, void* buffer, size_t size, uint64_t offset,
rmr_t rmr, void* user_context) override;
rmr_t rmr, void* user_context,
bool high_priority) override;
error_t post_put_impl(int rank, void* buffer, size_t size, mr_t mr,
uint64_t offset, rmr_t rmr,
void* user_context) override;
uint64_t offset, rmr_t rmr, void* user_context,
bool high_priority) override;
error_t post_putImms_impl(int rank, void* buffer, size_t size,
uint64_t offset, rmr_t rmr, net_imm_data_t imm_data,
void* user_context) override;
void* user_context, bool high_priority) override;
error_t post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr,
uint64_t offset, rmr_t rmr, net_imm_data_t imm_data,
void* user_context) override;
void* user_context, bool high_priority) override;
error_t post_get_impl(int rank, void* buffer, size_t size, mr_t mr,
uint64_t offset, rmr_t rmr,
void* user_context) override;
uint64_t offset, rmr_t rmr, void* user_context,
bool high_priority) override;

ibv_device_impl_t* p_ibv_device;
std::vector<struct ibv_qp*> ib_qps;
std::vector<LCISI_ibv_qp_extra_t>* ib_qp_extras;
net_context_attr_t net_context_attr;
std::vector<padded_atomic_t<int>>* qp_remaining_slots;
spinlock_t* qps_lock;

private:
bool try_lock_qp(int rank);
void unlock_qp(int rank);
bool try_acquire_slot(int rank, bool high_priority);
void release_slot(int rank);
};

} // namespace lci

#endif // LCI_BACKEND_IBV_BACKEND_IBV_HPP
#endif // LCI_BACKEND_IBV_BACKEND_IBV_HPP
Loading
Loading