From 767e38bda314613aeba0af070bb365950d7c7078 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Sat, 11 Oct 2025 12:50:11 -0500 Subject: [PATCH 1/2] ibv: reserve part of send queues for high-priority messages --- src/network/endpoint_inline.hpp | 44 +++++-- src/network/ibv/backend_ibv.cpp | 10 +- src/network/ibv/backend_ibv.hpp | 29 +++-- src/network/ibv/backend_ibv_inline.hpp | 160 ++++++++++++++++++++----- src/network/network.cpp | 4 +- src/network/network.hpp | 26 ++-- src/network/ofi/backend_ofi.hpp | 23 ++-- src/network/ofi/backend_ofi_inline.hpp | 22 ++-- 8 files changed, 231 insertions(+), 87 deletions(-) diff --git a/src/network/endpoint_inline.hpp b/src/network/endpoint_inline.hpp index a68871dc..d9d30396 100644 --- a/src/network/endpoint_inline.hpp +++ b/src/network/endpoint_inline.hpp @@ -11,11 +11,21 @@ inline error_t endpoint_impl_t::post_sends(int rank, void* buffer, size_t size, void* user_context, bool allow_retry, bool force_post) { + // allow_retry is used by upper layer to decide whether to push the + // operation to backlog queue if the operation fails + // if allow_retry is false, the operation will be pushed to backlog queue + // if it fails + // force_post is used by backlog queue to force post the operations + // in the backlog queue + // We consider the operation high priority if it is either allow_retry is false + // or force_post is true, to minimize the chance of being pushed back to + // backlog queue. error_t error; if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { - error = post_sends_impl(rank, buffer, size, imm_data, user_context); + bool high_priority = !allow_retry || force_post; + error = post_sends_impl(rank, buffer, size, imm_data, user_context, high_priority); } if (error.is_retry()) { if (error.errorcode == errorcode_t::retry_lock) { @@ -50,7 +60,9 @@ inline error_t endpoint_impl_t::post_send(int rank, void* buffer, size_t size, if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { - error = post_send_impl(rank, buffer, size, mr, imm_data, user_context); + bool high_priority = !allow_retry || force_post; + error = post_send_impl(rank, buffer, size, mr, imm_data, user_context, + high_priority); } if (error.is_retry()) { if (error.errorcode == errorcode_t::retry_lock) { @@ -85,7 +97,9 @@ inline error_t endpoint_impl_t::post_puts(int rank, void* buffer, size_t size, if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { - error = post_puts_impl(rank, buffer, size, offset, rmr, user_context); + bool high_priority = !allow_retry || force_post; + error = post_puts_impl(rank, buffer, size, offset, rmr, user_context, + high_priority); } if (error.is_retry()) { LCI_PCOUNTER_ADD(net_write_post_retry, 1); @@ -115,7 +129,9 @@ inline error_t endpoint_impl_t::post_put(int rank, void* buffer, size_t size, if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { - error = post_put_impl(rank, buffer, size, mr, offset, rmr, user_context); + bool high_priority = !allow_retry || force_post; + error = post_put_impl(rank, buffer, size, mr, offset, rmr, user_context, + high_priority); } if (error.is_retry()) { LCI_PCOUNTER_ADD(net_write_post_retry, 1); @@ -137,14 +153,15 @@ inline error_t endpoint_impl_t::post_put(int rank, void* buffer, size_t size, inline error_t endpoint_impl_t::post_putImms_fallback( int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, void* user_context) + net_imm_data_t imm_data, void* user_context, bool high_priority) { // fallback to post_put LCI_DBG_Log( LOG_TRACE, "network", "fallback to post_puts imm_data %x user_context %p (ignored for sends)\n", imm_data, user_context); - error_t error = post_puts_impl(rank, buffer, size, offset, rmr, user_context); + error_t error = post_puts_impl(rank, buffer, size, offset, rmr, user_context, + high_priority); if (!error.is_retry()) { LCI_Assert(error.is_done(), "Unexpected error %d\n", error); // we do not allow retry for post_sends @@ -167,12 +184,13 @@ inline error_t endpoint_impl_t::post_putImms(int rank, void* buffer, if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { + bool high_priority = !allow_retry || force_post; if (net_context_attr.support_putimm) { error = post_putImms_impl(rank, buffer, size, offset, rmr, imm_data, - user_context); + user_context, high_priority); } else { error = post_putImms_fallback(rank, buffer, size, offset, rmr, imm_data, - user_context); + user_context, high_priority); } } if (error.is_retry()) { @@ -210,7 +228,8 @@ inline error_t endpoint_impl_t::post_putImm_fallback(int rank, void* buffer, ectx->imm_data = imm_data; ectx->signal_count = 1; ectx->internal_ctx = static_cast(user_context); - error_t error = post_put_impl(rank, buffer, size, mr, offset, rmr, ectx); + error_t error = post_put_impl(rank, buffer, size, mr, offset, rmr, ectx, + true /* high_priority */); if (error.is_retry()) { delete ectx; } @@ -227,9 +246,10 @@ inline error_t endpoint_impl_t::post_putImm(int rank, void* buffer, size_t size, if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { + bool high_priority = !allow_retry || force_post; if (net_context_attr.support_putimm) { error = post_putImm_impl(rank, buffer, size, mr, offset, rmr, imm_data, - user_context); + user_context, high_priority); } else { error = post_putImm_fallback(rank, buffer, size, mr, offset, rmr, imm_data, user_context); @@ -263,7 +283,9 @@ inline error_t endpoint_impl_t::post_get(int rank, void* buffer, size_t size, if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { - error = post_get_impl(rank, buffer, size, mr, offset, rmr, user_context); + bool high_priority = !allow_retry || force_post; + error = post_get_impl(rank, buffer, size, mr, offset, rmr, user_context, + high_priority); } if (error.is_retry()) { LCI_PCOUNTER_ADD(net_read_post_retry, 1); diff --git a/src/network/ibv/backend_ibv.cpp b/src/network/ibv/backend_ibv.cpp index 22c8002e..4be56f9e 100644 --- a/src/network/ibv/backend_ibv.cpp +++ b/src/network/ibv/backend_ibv.cpp @@ -151,7 +151,6 @@ ibv_device_impl_t::ibv_device_impl_t(net_context_t net_context_, device_t::attr_t attr_) : device_impl_t(net_context_, attr_) { - net_context_attr = net_context.get_attr(); ibv_net_context_impl_t* p_net_context = static_cast(net_context.p_impl); @@ -271,6 +270,11 @@ ibv_device_impl_t::ibv_device_impl_t(net_context_t net_context_, } ib_qps.resize(get_rank_n()); + qp_remaining_slots = std::vector>(get_rank_n()); + for (auto &slot : qp_remaining_slots) { + slot.val.store(static_cast(attr.net_max_sends), + std::memory_order_relaxed); + } struct bootstrap_data_t { int source_rank; int target_rank; @@ -597,11 +601,11 @@ ibv_endpoint_impl_t::ibv_endpoint_impl_t(device_t device_, attr_t attr_) p_ibv_device(reinterpret_cast(device.p_impl)), ib_qps(p_ibv_device->ib_qps), ib_qp_extras(&p_ibv_device->ib_qp_extras), - net_context_attr(p_ibv_device->net_context_attr), + qp_remaining_slots(&p_ibv_device->qp_remaining_slots), qps_lock(&p_ibv_device->qps_lock) { } ibv_endpoint_impl_t::~ibv_endpoint_impl_t() {} -} // namespace lci \ No newline at end of file +} // namespace lci diff --git a/src/network/ibv/backend_ibv.hpp b/src/network/ibv/backend_ibv.hpp index ebd3264b..100103c1 100644 --- a/src/network/ibv/backend_ibv.hpp +++ b/src/network/ibv/backend_ibv.hpp @@ -100,8 +100,8 @@ class ibv_device_impl_t : public lci::device_impl_t qp2rank_map_t qp2rank_map; std::vector ib_qps; std::vector ib_qp_extras; + std::vector> qp_remaining_slots; - net_context_attr_t net_context_attr; ibv_mr_impl_t odp_mr; LCIU_CACHE_PADDING(0); spinlock_t srq_lock; @@ -118,35 +118,40 @@ class ibv_endpoint_impl_t : public lci::endpoint_impl_t ibv_endpoint_impl_t(device_t device_, attr_t attr_); ~ibv_endpoint_impl_t() override; error_t post_sends_impl(int rank, void* buffer, size_t size, - net_imm_data_t imm_data, void* user_context) override; + net_imm_data_t imm_data, void* user_context, + bool high_priority) override; error_t post_send_impl(int rank, void* buffer, size_t size, mr_t mr, - net_imm_data_t imm_data, void* user_context) override; + net_imm_data_t imm_data, void* user_context, + bool high_priority) override; error_t post_puts_impl(int rank, void* buffer, size_t size, uint64_t offset, - rmr_t rmr, void* user_context) override; + rmr_t rmr, void* user_context, + bool high_priority) override; error_t post_put_impl(int rank, void* buffer, size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - void* user_context) override; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) override; error_t post_putImms_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) override; + void* user_context, bool high_priority) override; error_t post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) override; + void* user_context, bool high_priority) override; error_t post_get_impl(int rank, void* buffer, size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - void* user_context) override; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) override; ibv_device_impl_t* p_ibv_device; std::vector ib_qps; std::vector* ib_qp_extras; - net_context_attr_t net_context_attr; + std::vector>* qp_remaining_slots; spinlock_t* qps_lock; private: bool try_lock_qp(int rank); void unlock_qp(int rank); + bool try_acquire_slot(int rank, bool high_priority); + void release_slot(int rank); }; } // namespace lci -#endif // LCI_BACKEND_IBV_BACKEND_IBV_HPP \ No newline at end of file +#endif // LCI_BACKEND_IBV_BACKEND_IBV_HPP diff --git a/src/network/ibv/backend_ibv_inline.hpp b/src/network/ibv/backend_ibv_inline.hpp index 87dfe4bc..cc0481ac 100644 --- a/src/network/ibv/backend_ibv_inline.hpp +++ b/src/network/ibv/backend_ibv_inline.hpp @@ -71,6 +71,18 @@ inline size_t ibv_device_impl_t::poll_comp_impl(net_status_t* p_statuses, "Failed status %s (%d) for wr_id %p\n", ibv_wc_status_str(wcs[i].status), wcs[i].status, (void*)wcs[i].wr_id); + // Increment SQ slots on any send-side completion + if (wcs[i].opcode != IBV_WC_RECV && + wcs[i].opcode != IBV_WC_RECV_RDMA_WITH_IMM) { + int rank = qp2rank_map.get_rank_me(wcs[i].qp_num); + LCI_DBG_Assert(rank >= 0 && rank < get_rank_n(), + "Invalid rank %d from qp_num %u\n", rank, wcs[i].qp_num); + int prev = qp_remaining_slots[rank].val.fetch_add( + 1, std::memory_order_relaxed); + LCI_DBG_Assert(prev < static_cast(attr.net_max_sends), + "Too many slots on QP for rank %d (prev %d)\n", rank, + prev); + } if (!p_statuses) continue; net_status_t& status = p_statuses[i]; memset(&status, 0, sizeof(status)); @@ -175,6 +187,33 @@ inline size_t ibv_device_impl_t::post_recvs_impl(void* buffers[], size_t size, } } +inline bool ibv_endpoint_impl_t::try_acquire_slot(int rank, bool high_priority) +{ + auto& counter = (*qp_remaining_slots)[rank].val; + int threshold = static_cast(device_attr.net_max_sends / 4); + if (!high_priority && counter.load(std::memory_order_relaxed) <= threshold) { + return false; + } + int prev = counter.fetch_sub(1, std::memory_order_relaxed); + if (prev <= 0) { + counter.fetch_add(1, std::memory_order_relaxed); + return false; + } + return true; +} + +// inline void ibv_endpoint_impl_t::acquire_slot(int rank) +// { +// [[maybe_unused]] int prev = (*qp_remaining_slots)[rank].val.fetch_sub(1, +// std::memory_order_relaxed); LCI_DBG_Assert(prev > 0, "Too many slots on QP +// for rank %d (prev %d)\n", rank, prev); +// } + +inline void ibv_endpoint_impl_t::release_slot(int rank) +{ + (*qp_remaining_slots)[rank].val.fetch_add(1, std::memory_order_relaxed); +} + inline bool ibv_endpoint_impl_t::try_lock_qp(int rank) { bool ret; @@ -198,7 +237,7 @@ inline void ibv_endpoint_impl_t::unlock_qp(int rank) inline error_t ibv_endpoint_impl_t::post_sends_impl(int rank, void* buffer, size_t size, net_imm_data_t imm_data, - void* user_context) + void* user_context, bool high_priority) { LCI_Assert(size <= net_context_attr.max_inject_size, "%lu exceed the inline message size\n" @@ -232,15 +271,23 @@ inline error_t ibv_endpoint_impl_t::post_sends_impl(int rank, void* buffer, // ninline = 0; // } + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); - if (ret == 0) + if (ret == 0) { return errorcode_t::done; - else if (ret == ENOMEM) + } else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } @@ -248,7 +295,8 @@ inline error_t ibv_endpoint_impl_t::post_sends_impl(int rank, void* buffer, inline error_t ibv_endpoint_impl_t::post_send_impl(int rank, void* buffer, size_t size, mr_t mr, net_imm_data_t imm_data, - void* user_context) + void* user_context, + bool high_priority) { struct ibv_sge list; struct ibv_send_wr wr; @@ -271,15 +319,23 @@ inline error_t ibv_endpoint_impl_t::post_send_impl(int rank, void* buffer, wr.send_flags = IBV_SEND_SIGNALED; wr.imm_data = imm_data; + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); if (ret == 0) return errorcode_t::posted; - else if (ret == ENOMEM) + else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } @@ -287,7 +343,8 @@ inline error_t ibv_endpoint_impl_t::post_send_impl(int rank, void* buffer, inline error_t ibv_endpoint_impl_t::post_puts_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, - void* user_context) + void* user_context, + bool high_priority) { LCI_Assert(size <= net_context_attr.max_inject_size, "%lu exceed the inline message size\n" @@ -316,15 +373,23 @@ inline error_t ibv_endpoint_impl_t::post_puts_impl(int rank, void* buffer, wr.wr.rdma.remote_addr = (uintptr_t)(rmr.base + offset); wr.wr.rdma.rkey = rmr.opaque_rkey; + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); if (ret == 0) return errorcode_t::done; - else if (ret == ENOMEM) + else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } @@ -333,7 +398,8 @@ inline error_t ibv_endpoint_impl_t::post_put_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, - void* user_context) + void* user_context, + bool high_priority) { struct ibv_sge list; struct ibv_send_wr wr; @@ -357,22 +423,30 @@ inline error_t ibv_endpoint_impl_t::post_put_impl(int rank, void* buffer, wr.wr.rdma.remote_addr = (uintptr_t)(rmr.base + offset); wr.wr.rdma.rkey = rmr.opaque_rkey; + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); if (ret == 0) return errorcode_t::posted; - else if (ret == ENOMEM) + else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } inline error_t ibv_endpoint_impl_t::post_putImms_impl( int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, void* user_context) + net_imm_data_t imm_data, void* user_context, bool high_priority) { LCI_Assert(size <= net_context_attr.max_inject_size, "%lu exceed the inline message size\n" @@ -401,15 +475,23 @@ inline error_t ibv_endpoint_impl_t::post_putImms_impl( wr.wr.rdma.rkey = rmr.opaque_rkey; wr.imm_data = imm_data; + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); if (ret == 0) return errorcode_t::done; - else if (ret == ENOMEM) + else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } @@ -418,7 +500,8 @@ inline error_t ibv_endpoint_impl_t::post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) + void* user_context, + bool high_priority) { struct ibv_sge list; struct ibv_send_wr wr; @@ -443,15 +526,23 @@ inline error_t ibv_endpoint_impl_t::post_putImm_impl(int rank, void* buffer, wr.wr.rdma.remote_addr = (uintptr_t)(rmr.base + offset); wr.wr.rdma.rkey = rmr.opaque_rkey; + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); if (ret == 0) return errorcode_t::posted; - else if (ret == ENOMEM) + else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } @@ -460,7 +551,8 @@ inline error_t ibv_endpoint_impl_t::post_get_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, - void* user_context) + void* user_context, + bool high_priority) { struct ibv_sge list; struct ibv_send_wr wr; @@ -484,19 +576,27 @@ inline error_t ibv_endpoint_impl_t::post_get_impl(int rank, void* buffer, wr.wr.rdma.remote_addr = (uintptr_t)(rmr.base + offset); wr.wr.rdma.rkey = rmr.opaque_rkey; + if (!try_acquire_slot(rank, high_priority)) { + return errorcode_t::retry_nomem; + } struct ibv_send_wr* bad_wr; - if (!try_lock_qp(rank)) return errorcode_t::retry_lock; + if (!try_lock_qp(rank)) { + release_slot(rank); + return errorcode_t::retry_lock; + } int ret = ibv_post_send(ib_qps[rank], &wr, &bad_wr); unlock_qp(rank); if (ret == 0) return errorcode_t::posted; - else if (ret == ENOMEM) + else if (ret == ENOMEM) { + release_slot(rank); return errorcode_t::retry_nomem; // exceed send queue capacity - else { + } else { + release_slot(rank); IBV_SAFECALL_RET(ret); } } } // namespace lci -#endif // LCI_BACKEND_IBV_INLINE_HPP \ No newline at end of file +#endif // LCI_BACKEND_IBV_INLINE_HPP diff --git a/src/network/network.cpp b/src/network/network.cpp index c860fe11..17be0537 100644 --- a/src/network/network.cpp +++ b/src/network/network.cpp @@ -17,6 +17,7 @@ net_context_impl_t::net_context_impl_t(runtime_t runtime_, attr_t attr_) device_impl_t::device_impl_t(net_context_t context_, attr_t attr_) : attr(attr_), net_context(context_), + net_context_attr(context_.get_attr()), endpoints(64), next_endpoint_idx(0), nrecvs_posted(0) @@ -47,11 +48,12 @@ endpoint_impl_t::endpoint_impl_t(device_t device_, attr_t attr_) : runtime(device_.p_impl->runtime), device(device_), attr(attr_), + net_context_attr(device.get_impl()->net_context_attr), + device_attr(device.get_attr()), pending_ops(0) { attr.uid = g_nendpoints++; endpoint.p_impl = this; - net_context_attr = device.p_impl->net_context.get_attr(); } /************************************************************************************* diff --git a/src/network/network.hpp b/src/network/network.hpp index 6de35658..fe2b7ef5 100644 --- a/src/network/network.hpp +++ b/src/network/network.hpp @@ -66,6 +66,7 @@ class device_impl_t device_t device; runtime_t runtime; net_context_t net_context; + net_context_attr_t net_context_attr; mpmc_array_t endpoints; std::atomic next_endpoint_idx; packet_pool_t packet_pool; @@ -109,27 +110,27 @@ class endpoint_impl_t virtual ~endpoint_impl_t() = default; virtual error_t post_sends_impl(int rank, void* buffer, size_t size, net_imm_data_t imm_data, - void* user_context) = 0; + void* user_context, bool high_priority) = 0; virtual error_t post_send_impl(int rank, void* buffer, size_t size, mr_t mr, net_imm_data_t imm_data, - void* user_context) = 0; + void* user_context, bool high_priority) = 0; virtual error_t post_puts_impl(int rank, void* buffer, size_t size, - uint64_t offset, rmr_t rmr, - void* user_context) = 0; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) = 0; virtual error_t post_put_impl(int rank, void* buffer, size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - void* user_context) = 0; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) = 0; virtual error_t post_putImms_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) = 0; + void* user_context, bool high_priority) = 0; virtual error_t post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, - void* user_context) = 0; + net_imm_data_t imm_data, void* user_context, + bool high_priority) = 0; virtual error_t post_get_impl(int rank, void* buffer, size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - void* user_context) = 0; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) = 0; // wrapper functions inline error_t post_sends(int rank, void* buffer, size_t size, @@ -158,7 +159,7 @@ class endpoint_impl_t inline error_t post_putImms_fallback(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context); + void* user_context, bool high_priority); inline error_t post_putImm_fallback(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, @@ -198,6 +199,7 @@ class endpoint_impl_t endpoint_t endpoint; attr_t attr; net_context_attr_t net_context_attr; + device_attr_t device_attr; int idx_in_device; private: diff --git a/src/network/ofi/backend_ofi.hpp b/src/network/ofi/backend_ofi.hpp index c7706665..1f462957 100644 --- a/src/network/ofi/backend_ofi.hpp +++ b/src/network/ofi/backend_ofi.hpp @@ -89,23 +89,26 @@ class ofi_endpoint_impl_t : public lci::endpoint_impl_t ofi_endpoint_impl_t(device_t device_, attr_t attr_); ~ofi_endpoint_impl_t() override; error_t post_sends_impl(int rank, void* buffer, size_t size, - net_imm_data_t imm_data, void* user_context) override; + net_imm_data_t imm_data, void* user_context, + bool high_priority) override; error_t post_send_impl(int rank, void* buffer, size_t size, mr_t mr, - net_imm_data_t imm_data, void* user_context) override; + net_imm_data_t imm_data, void* user_context, + bool high_priority) override; error_t post_puts_impl(int rank, void* buffer, size_t size, uint64_t offset, - rmr_t rmr, void* user_context) override; + rmr_t rmr, void* user_context, + bool high_priority) override; error_t post_put_impl(int rank, void* buffer, size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - void* user_context) override; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) override; error_t post_putImms_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) override; + void* user_context, bool high_priority) override; error_t post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) override; + void* user_context, bool high_priority) override; error_t post_get_impl(int rank, void* buffer, size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - void* user_context) override; + uint64_t offset, rmr_t rmr, void* user_context, + bool high_priority) override; ofi_device_impl_t* p_ofi_device; int my_rank; @@ -124,4 +127,4 @@ class ofi_mr_impl_t : public lci::mr_impl_t } // namespace lci -#endif // LCI_BACKEND_OFI_BACKEND_OFI_HPP \ No newline at end of file +#endif // LCI_BACKEND_OFI_BACKEND_OFI_HPP diff --git a/src/network/ofi/backend_ofi_inline.hpp b/src/network/ofi/backend_ofi_inline.hpp index 0d82d689..e0030af1 100644 --- a/src/network/ofi/backend_ofi_inline.hpp +++ b/src/network/ofi/backend_ofi_inline.hpp @@ -122,7 +122,8 @@ inline size_t ofi_device_impl_t::post_recvs_impl(void* buffers[], size_t size, inline error_t ofi_endpoint_impl_t::post_sends_impl(int rank, void* buffer, size_t size, net_imm_data_t imm_data, - void* user_context) + void* user_context, + bool /*high_priority*/) { struct iovec iov; iov.iov_base = buffer; @@ -154,7 +155,8 @@ inline error_t ofi_endpoint_impl_t::post_sends_impl(int rank, void* buffer, inline error_t ofi_endpoint_impl_t::post_send_impl(int rank, void* buffer, size_t size, mr_t mr, net_imm_data_t imm_data, - void* user_context) + void* user_context, + bool /*high_priority*/) { LCI_OFI_CS_TRY_ENTER(LCI_NET_TRYLOCK_SEND, errorcode_t::retry_lock); ssize_t ret = fi_senddata(ofi_ep, buffer, size, ofi_detail::get_mr_desc(mr), @@ -173,7 +175,8 @@ inline error_t ofi_endpoint_impl_t::post_send_impl(int rank, void* buffer, inline error_t ofi_endpoint_impl_t::post_puts_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, - void* user_context) + void* user_context, + bool /*high_priority*/) { uintptr_t addr; if (ofi_domain_attr->mr_mode & FI_MR_VIRT_ADDR) { @@ -213,7 +216,8 @@ inline error_t ofi_endpoint_impl_t::post_puts_impl(int rank, void* buffer, inline error_t ofi_endpoint_impl_t::post_put_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, - void* user_context) + void* user_context, + bool /*high_priority*/) { uintptr_t addr; if (ofi_domain_attr->mr_mode & FI_MR_VIRT_ADDR) { @@ -252,7 +256,7 @@ inline error_t ofi_endpoint_impl_t::post_put_impl(int rank, void* buffer, inline error_t ofi_endpoint_impl_t::post_putImms_impl( int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, void* user_context) + net_imm_data_t imm_data, void* user_context, bool /*high_priority*/) { uintptr_t addr; if (ofi_domain_attr->mr_mode & FI_MR_VIRT_ADDR) { @@ -296,7 +300,8 @@ inline error_t ofi_endpoint_impl_t::post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, - void* user_context) + void* user_context, + bool /*high_priority*/) { uintptr_t addr; if (ofi_domain_attr->mr_mode & FI_MR_VIRT_ADDR) { @@ -340,7 +345,8 @@ inline error_t ofi_endpoint_impl_t::post_get_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, - void* user_context) + void* user_context, + bool /*high_priority*/) { uintptr_t addr; if (ofi_domain_attr->mr_mode & FI_MR_VIRT_ADDR) { @@ -380,4 +386,4 @@ inline error_t ofi_endpoint_impl_t::post_get_impl(int rank, void* buffer, } } // namespace lci -#endif // LCI_BACKEND_OFI_BACKEND_OFI_INLINE_HPP \ No newline at end of file +#endif // LCI_BACKEND_OFI_BACKEND_OFI_INLINE_HPP From 0bed17bce99432ea80caa7a5165ca5fd77c9b44f Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Sun, 19 Oct 2025 16:57:23 -0500 Subject: [PATCH 2/2] make hp slots configurable --- src/binding/generate_binding.py | 4 +++- src/binding/input/network.py | 1 + src/network/endpoint_inline.hpp | 7 ++++--- src/network/ibv/backend_ibv.cpp | 2 +- src/network/ibv/backend_ibv_inline.hpp | 16 ++++++++-------- src/network/network.cpp | 20 ++++++++++++-------- src/network/network.hpp | 12 ++++++------ src/network/ofi/backend_ofi_inline.hpp | 9 +++------ src/util/misc.hpp | 21 ++++++++++++++++----- 9 files changed, 54 insertions(+), 38 deletions(-) diff --git a/src/binding/generate_binding.py b/src/binding/generate_binding.py index 111ced6a..25a6fe76 100755 --- a/src/binding/generate_binding.py +++ b/src/binding/generate_binding.py @@ -188,6 +188,8 @@ def generate_global_attr_impl(input): text += f" g_default_attr.{attr_name} = get_env_or(\"{env_var}\", {attr['default_value']});\n" if type_name == "std::string": text += f" LCI_Log(LOG_INFO, \"env\", \"set {attr_name} to be %s\\n\", g_default_attr.{attr_name}.c_str());\n" + elif type_name in ["double", "float"]: + text += f" LCI_Log(LOG_INFO, \"env\", \"set {attr_name} to be %f\\n\", static_cast(g_default_attr.{attr_name}));\n" else: text += f" LCI_Log(LOG_INFO, \"env\", \"set {attr_name} to be %d\\n\", static_cast(g_default_attr.{attr_name}));\n" @@ -495,4 +497,4 @@ def generate_binding(input): if len(input) == 0: print("No input found. Exiting.") exit(1) - generate_binding(input) \ No newline at end of file + generate_binding(input) diff --git a/src/binding/input/network.py b/src/binding/input/network.py index 74bc343d..18fc4042 100644 --- a/src/binding/input/network.py +++ b/src/binding/input/network.py @@ -39,6 +39,7 @@ attr("size_t", "net_max_sends", default_value="LCI_BACKEND_MAX_SENDS_DEFAULT", comment="The maximum number of sends that can be posted to the underlying network queue at the same time."), attr("size_t", "net_max_recvs", default_value="LCI_BACKEND_MAX_RECVS_DEFAULT", comment="The maximum number of receives that can be posted to the underlying network queue at the same time."), attr("size_t", "net_max_cqes", default_value="LCI_BACKEND_MAX_CQES_DEFAULT", comment="The maximum number of CQEs that can reside in the underlying network queue at the same time."), + attr("double", "net_send_reserved_pct", default_value=0.25, comment="Fraction of send queue slots reserved for high-priority operations (0.0-1.0)."), attr("uint64_t", "ofi_lock_mode", comment="For the OFI backend: the lock mode for the device."), attr("bool", "alloc_default_endpoint", default_value=1, comment="Whether to allocate the default endpoint."), attr("int", "uid", default_value=-1, inout_trait="out", comment="A unique device id across the entire process."), diff --git a/src/network/endpoint_inline.hpp b/src/network/endpoint_inline.hpp index d9d30396..9d56ce78 100644 --- a/src/network/endpoint_inline.hpp +++ b/src/network/endpoint_inline.hpp @@ -17,15 +17,16 @@ inline error_t endpoint_impl_t::post_sends(int rank, void* buffer, size_t size, // if it fails // force_post is used by backlog queue to force post the operations // in the backlog queue - // We consider the operation high priority if it is either allow_retry is false - // or force_post is true, to minimize the chance of being pushed back to + // We consider the operation high priority if it is either allow_retry is + // false or force_post is true, to minimize the chance of being pushed back to // backlog queue. error_t error; if (!force_post && !backlog_queue.is_empty(rank)) { error = errorcode_t::retry_backlog; } else { bool high_priority = !allow_retry || force_post; - error = post_sends_impl(rank, buffer, size, imm_data, user_context, high_priority); + error = post_sends_impl(rank, buffer, size, imm_data, user_context, + high_priority); } if (error.is_retry()) { if (error.errorcode == errorcode_t::retry_lock) { diff --git a/src/network/ibv/backend_ibv.cpp b/src/network/ibv/backend_ibv.cpp index 4be56f9e..72d218ca 100644 --- a/src/network/ibv/backend_ibv.cpp +++ b/src/network/ibv/backend_ibv.cpp @@ -271,7 +271,7 @@ ibv_device_impl_t::ibv_device_impl_t(net_context_t net_context_, ib_qps.resize(get_rank_n()); qp_remaining_slots = std::vector>(get_rank_n()); - for (auto &slot : qp_remaining_slots) { + for (auto& slot : qp_remaining_slots) { slot.val.store(static_cast(attr.net_max_sends), std::memory_order_relaxed); } diff --git a/src/network/ibv/backend_ibv_inline.hpp b/src/network/ibv/backend_ibv_inline.hpp index cc0481ac..0712b86a 100644 --- a/src/network/ibv/backend_ibv_inline.hpp +++ b/src/network/ibv/backend_ibv_inline.hpp @@ -190,7 +190,9 @@ inline size_t ibv_device_impl_t::post_recvs_impl(void* buffers[], size_t size, inline bool ibv_endpoint_impl_t::try_acquire_slot(int rank, bool high_priority) { auto& counter = (*qp_remaining_slots)[rank].val; - int threshold = static_cast(device_attr.net_max_sends / 4); + double reserved_frac = device_attr.net_send_reserved_pct; + int threshold = static_cast( + static_cast(device_attr.net_max_sends) * reserved_frac); if (!high_priority && counter.load(std::memory_order_relaxed) <= threshold) { return false; } @@ -237,7 +239,8 @@ inline void ibv_endpoint_impl_t::unlock_qp(int rank) inline error_t ibv_endpoint_impl_t::post_sends_impl(int rank, void* buffer, size_t size, net_imm_data_t imm_data, - void* user_context, bool high_priority) + void* user_context, + bool high_priority) { LCI_Assert(size <= net_context_attr.max_inject_size, "%lu exceed the inline message size\n" @@ -496,12 +499,9 @@ inline error_t ibv_endpoint_impl_t::post_putImms_impl( } } -inline error_t ibv_endpoint_impl_t::post_putImm_impl(int rank, void* buffer, - size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, - void* user_context, - bool high_priority) +inline error_t ibv_endpoint_impl_t::post_putImm_impl( + int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, + net_imm_data_t imm_data, void* user_context, bool high_priority) { struct ibv_sge list; struct ibv_send_wr wr; diff --git a/src/network/network.cpp b/src/network/network.cpp index 17be0537..1b71e67a 100644 --- a/src/network/network.cpp +++ b/src/network/network.cpp @@ -117,18 +117,22 @@ void free_net_context_x::call_impl(net_context_t* net_context, runtime_t) const net_context->p_impl = nullptr; } -device_t alloc_device_x::call_impl(size_t net_max_sends, size_t net_max_recvs, - size_t net_max_cqes, uint64_t ofi_lock_mode, - bool alloc_default_endpoint, - attr_ibv_td_strategy_t ibv_td_strategy, - const char* name, void* user_context, - runtime_t runtime, net_context_t net_context, - packet_pool_t packet_pool) const +device_t alloc_device_x::call_impl( + size_t net_max_sends, size_t net_max_recvs, size_t net_max_cqes, + double net_send_reserved_pct, uint64_t ofi_lock_mode, + bool alloc_default_endpoint, attr_ibv_td_strategy_t ibv_td_strategy, + const char* name, void* user_context, runtime_t runtime, + net_context_t net_context, packet_pool_t packet_pool) const { + if (net_send_reserved_pct < 0.0 || net_send_reserved_pct >= 1.0) { + LCI_Assert(false, "net_send_reserved_pct %.2f is out of range [0.0, 1.0)", + net_send_reserved_pct); + } device_t::attr_t attr; attr.net_max_sends = net_max_sends; attr.net_max_recvs = net_max_recvs; attr.net_max_cqes = net_max_cqes; + attr.net_send_reserved_pct = net_send_reserved_pct; attr.ofi_lock_mode = ofi_lock_mode; attr.alloc_default_endpoint = alloc_default_endpoint; attr.ibv_td_strategy = ibv_td_strategy; @@ -192,4 +196,4 @@ void free_endpoint_x::call_impl(endpoint_t* endpoint, runtime_t) const endpoint->p_impl = nullptr; } -} // namespace lci \ No newline at end of file +} // namespace lci diff --git a/src/network/network.hpp b/src/network/network.hpp index fe2b7ef5..0f0350b5 100644 --- a/src/network/network.hpp +++ b/src/network/network.hpp @@ -109,11 +109,11 @@ class endpoint_impl_t endpoint_impl_t(device_t device_, attr_t attr_); virtual ~endpoint_impl_t() = default; virtual error_t post_sends_impl(int rank, void* buffer, size_t size, - net_imm_data_t imm_data, - void* user_context, bool high_priority) = 0; + net_imm_data_t imm_data, void* user_context, + bool high_priority) = 0; virtual error_t post_send_impl(int rank, void* buffer, size_t size, mr_t mr, - net_imm_data_t imm_data, - void* user_context, bool high_priority) = 0; + net_imm_data_t imm_data, void* user_context, + bool high_priority) = 0; virtual error_t post_puts_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, void* user_context, bool high_priority) = 0; @@ -122,8 +122,8 @@ class endpoint_impl_t bool high_priority) = 0; virtual error_t post_putImms_impl(int rank, void* buffer, size_t size, uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, - void* user_context, bool high_priority) = 0; + net_imm_data_t imm_data, void* user_context, + bool high_priority) = 0; virtual error_t post_putImm_impl(int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, net_imm_data_t imm_data, void* user_context, diff --git a/src/network/ofi/backend_ofi_inline.hpp b/src/network/ofi/backend_ofi_inline.hpp index e0030af1..8084b368 100644 --- a/src/network/ofi/backend_ofi_inline.hpp +++ b/src/network/ofi/backend_ofi_inline.hpp @@ -296,12 +296,9 @@ inline error_t ofi_endpoint_impl_t::post_putImms_impl( } } -inline error_t ofi_endpoint_impl_t::post_putImm_impl(int rank, void* buffer, - size_t size, mr_t mr, - uint64_t offset, rmr_t rmr, - net_imm_data_t imm_data, - void* user_context, - bool /*high_priority*/) +inline error_t ofi_endpoint_impl_t::post_putImm_impl( + int rank, void* buffer, size_t size, mr_t mr, uint64_t offset, rmr_t rmr, + net_imm_data_t imm_data, void* user_context, bool /*high_priority*/) { uintptr_t addr; if (ofi_domain_attr->mr_mode & FI_MR_VIRT_ADDR) { diff --git a/src/util/misc.hpp b/src/util/misc.hpp index db1385b8..4a74de80 100644 --- a/src/util/misc.hpp +++ b/src/util/misc.hpp @@ -13,19 +13,30 @@ namespace lci { -template +template ::value, int>::type = 0> +T get_env_or(const char* env, T default_val) +{ + const char* s = getenv(env); + if (s) { + return static_cast(atoi(s)); + } else { + return default_val; + } +} + +template ::value, + int>::type = 0> T get_env_or(const char* env, T default_val) { - static_assert(std::is_integral::value, "T must be an integral type"); const char* s = getenv(env); if (s) { - return atoi(s); + return static_cast(atof(s)); } else { return default_val; } } -template <> inline const char* get_env_or(const char* env, const char* default_val) { const char* s = getenv(env); @@ -118,4 +129,4 @@ struct padded_atomic_t { } // namespace lci -#endif // LCI_UTIL_MISC_HPP \ No newline at end of file +#endif // LCI_UTIL_MISC_HPP