From 5e1b7097a5e063fc5de8f502574ef98d2cd25fb1 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Fri, 19 Sep 2025 15:27:49 -0500 Subject: [PATCH 01/13] add shared memory functions --- include/converse.h | 4 + src/cmi-shmem-common.h | 131 +++++++++++++++++++++++ src/cmishm.cpp | 236 +++++++++++++++++++++++++++++++++++++++++ src/cmishmem.cpp | 236 +++++++++++++++++++++++++++++++++++++++++ src/cmishmem.h | 109 +++++++++++++++++++ src/cmixpmem.cpp | 196 ++++++++++++++++++++++++++++++++++ src/convcore.cpp | 23 +++- 7 files changed, 934 insertions(+), 1 deletion(-) create mode 100644 src/cmi-shmem-common.h create mode 100644 src/cmishm.cpp create mode 100644 src/cmishmem.cpp create mode 100644 src/cmishmem.h create mode 100644 src/cmixpmem.cpp diff --git a/include/converse.h b/include/converse.h index f33020a..def94ba 100644 --- a/include/converse.h +++ b/include/converse.h @@ -314,6 +314,7 @@ void CmiSyncSendAndFree(int destPE, int messageSize, void *msg); void CmiSyncListSend(int npes, const int *pes, int len, void *msg); void CmiSyncListSendAndFree(int npes, const int *pes, int len, void *msg); void CmiPushPE(int destPE, void *msg); +void CmiPushNode(void *msg); void CmiSyncSendFn(int destPE, int messageSize, char *msg); void CmiFreeSendFn(int destPE, int messageSize, char *msg); @@ -979,4 +980,7 @@ void CmiInterSyncNodeSendAndFreeFn(int destNode, int partition, int messageSize, /* end of variables and functions for partition */ +#include "cmishmem.h" +CsvExtern(CmiIpcManager*, coreIpcManager_); + #endif // CONVERSE_H diff --git a/src/cmi-shmem-common.h b/src/cmi-shmem-common.h new file mode 100644 index 0000000..140b3e3 --- /dev/null +++ b/src/cmi-shmem-common.h @@ -0,0 +1,131 @@ +#ifndef CMI_SHMEM_COMMON_HH +#define CMI_SHMEM_COMMON_HH + +#include "cmishmem.h" + +#include +#include +#include +#include +#include + + +namespace cmi { +namespace ipc { +CpvDeclare(std::size_t, kRecommendedCutoff); +} +} // namespace cmi + +CpvStaticDeclare(std::size_t, kSegmentSize); +constexpr std::size_t kDefaultSegmentSize = 8 * 1024 * 1024; + +constexpr std::size_t kNumCutOffPoints = 25; +const std::array kCutOffPoints = { + 64, 128, 256, 512, 1024, 2048, 4096, + 8192, 16384, 32768, 65536, 131072, 262144, 524288, + 1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864, + 134217728, 268435456, 536870912, 1073741824}; + +struct ipc_metadata_; +CsvStaticDeclare(CmiNodeLock, sleeper_lock); + +using sleeper_map_t = std::vector; +CsvStaticDeclare(sleeper_map_t, sleepers); + +// the data each pe shares with its peers +// contains pool of free blocks, heap, and receive queue +struct ipc_shared_ { + std::array, kNumCutOffPoints> free; + std::atomic queue; + std::atomic heap; + std::uintptr_t max; + + ipc_shared_(std::uintptr_t begin, std::uintptr_t end) + : queue(cmi::ipc::max), heap(begin), max(end) { + for (auto& f : this->free) { + f.store(cmi::ipc::max); + } + } +}; + +// shared data for each pe +struct ipc_metadata_ { + // maps ranks to shared segments + std::map shared; + // physical node rank + int mine; + // key of this instance + std::size_t key; + // base constructor + ipc_metadata_(std::size_t key_) : mine(CmiMyNode()), key(key_) {} + // virtual destructor may be needed + virtual ~ipc_metadata_() {} +}; + +inline std::size_t whichBin_(std::size_t size); + +inline static void initIpcShared_(ipc_shared_* shared) { + auto begin = (std::uintptr_t)(sizeof(ipc_shared_) + + (sizeof(ipc_shared_) % ALIGN_BYTES)); + CmiAssert(begin != cmi::ipc::nil); + auto end = begin + CpvAccess(kSegmentSize); + new (shared) ipc_shared_(begin, end); +} + +inline static ipc_shared_* makeIpcShared_(void) { + auto* shared = (ipc_shared_*)(::operator new(sizeof(ipc_shared_) + + CpvAccess(kSegmentSize))); + initIpcShared_(shared); + return shared; +} + +inline void initSegmentSize_(char** argv) { + using namespace cmi::ipc; + CpvInitialize(std::size_t, kRecommendedCutoff); + CpvInitialize(std::size_t, kSegmentSize); + + CmiInt8 value; + auto flag = + CmiGetArgLongDesc(argv, "++" CMI_IPC_POOL_SIZE_ARG, &value, CMI_IPC_POOL_SIZE_DESC); + CpvAccess(kSegmentSize) = flag ? (std::size_t)value : kDefaultSegmentSize; + CmiEnforceMsg(CpvAccess(kSegmentSize), "segment size must be non-zero!"); + if (CmiGetArgLongDesc(argv, "++" CMI_IPC_CUTOFF_ARG, &value, CMI_IPC_CUTOFF_DESC)) { + auto bin = whichBin_((std::size_t)value); + CmiEnforceMsg(bin < kNumCutOffPoints, "ipc cutoff out of range!"); + CpvAccess(kRecommendedCutoff) = kCutOffPoints[bin]; + } else { + auto max = CpvAccess(kSegmentSize) / kNumCutOffPoints; + auto bin = (std::intptr_t)whichBin_(max) - 1; + CpvAccess(kRecommendedCutoff) = kCutOffPoints[(bin >= 0) ? bin : 0]; + } +} + +inline static void printIpcStartupMessage_(const char* implName) { + using namespace cmi::ipc; + CmiPrintf("Converse> %s pool init'd with %luB segment and %luB cutoff.\n", + implName, CpvAccess(kSegmentSize), + CpvAccess(kRecommendedCutoff)); +} + +inline static void initSleepers_(void) { + if (CmiMyRank() == 0) { + CsvInitialize(sleeper_map_t, sleepers); + CsvAccess(sleepers).resize(CmiMyNodeSize()); + CsvInitialize(CmiNodeLock, sleeper_lock); + CsvAccess(sleeper_lock) = CmiCreateLock(); + } +} + +inline static void putSleeper_(CthThread th) { + CmiLock(CsvAccess(sleeper_lock)); + (CsvAccess(sleepers))[CmiMyRank()] = th; + CmiUnlock(CsvAccess(sleeper_lock)); +} + +static void awakenSleepers_(void); + +using ipc_manager_ptr_ = std::unique_ptr; +using ipc_manager_map_ = std::vector; +CsvStaticDeclare(ipc_manager_map_, managers_); + +#endif \ No newline at end of file diff --git a/src/cmishm.cpp b/src/cmishm.cpp new file mode 100644 index 0000000..02bf021 --- /dev/null +++ b/src/cmishm.cpp @@ -0,0 +1,236 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cmi-shmem-common.h" +#include + +CpvStaticDeclare(int, num_cbs_recvd); +CpvStaticDeclare(int, num_cbs_exptd); +CpvStaticDeclare(int, handle_callback); +CpvStaticDeclare(int, handle_node_pid); +CsvStaticDeclare(pid_t, node_pid); + +static int sendPid_(CmiIpcManager*); +static void openAllShared_(CmiIpcManager*); + +struct pid_message_ { + char core[CmiMsgHeaderSizeBytes]; + std::size_t key; + pid_t pid; +}; + +#define CMI_SHARED_FMT "cmi_pid%lu_node%d_shared_" + +// opens a shared memory segment for a given physical rank +static std::pair openShared_(int node) { + // determine the size of the shared segment + // (adding the size of the queues and what nots) + auto size = CpvAccess(kSegmentSize) + sizeof(ipc_shared_); + // generate a name for this pe + auto slen = snprintf(NULL, 0, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), node); + auto name = new char[slen]; + snprintf(name, slen, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), node); + DEBUGF(("%d> opening share %s\n", CmiMyPe(), name)); + // try opening the share exclusively + auto fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, 0666); + // if we succeed, we're the first accessor, so: + if (fd >= 0) { + // truncate it to the correct size + auto status = ftruncate(fd, size); + CmiAssert(status >= 0); + } else { + // otherwise just open it + fd = shm_open(name, O_RDWR, 0666); + CmiAssert(fd >= 0); + } + // then delete the name + delete[] name; + // map the segment to an address: + auto* res = (ipc_shared_*)mmap(nullptr, size, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + CmiAssert(res != MAP_FAILED); + // return the file descriptor/shared + return std::make_pair(fd, res); +} + +struct CmiIpcManager : public ipc_metadata_ { + std::map fds; + + CmiIpcManager(std::size_t key) : ipc_metadata_(key) { + auto firstPe = CmiNodeFirst(CmiMyNode()); + auto thisRank = CmiPhysicalRank(firstPe); + if (thisRank == 0) { + if (sendPid_(this) == 1) { + openAllShared_(this); + awakenSleepers_(); + } + } + } + + virtual ~CmiIpcManager() { + auto& size = CpvAccess(kSegmentSize); + // for each rank/descriptor pair + for (auto& pair : this->fds) { + auto& proc = pair.first; + auto& fd = pair.second; + // unmap the memory segment + munmap(this->shared[proc], size); + // close the file + close(fd); + // unlinking the shm segment for our pe + if (proc == this->mine) { + auto slen = + snprintf(NULL, 0, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), proc); + auto name = new char[slen]; + snprintf(name, slen, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), proc); + shm_unlink(name); + delete[] name; + } + } + } +}; + +static void openAllShared_(CmiIpcManager* meta) { + int* pes; + int nPes; + int thisNode = CmiPhysicalNodeID(CmiMyPe()); + CmiGetPesOnPhysicalNode(thisNode, &pes, &nPes); + int nSize = CmiMyNodeSize(); + int nProcs = nPes / nSize; + // for each rank in this physical node: + for (auto rank = 0; rank < nProcs; rank++) { + // open its shared segment + auto pe = pes[rank * nSize]; + auto proc = CmiNodeOf(pe); + auto res = openShared_(proc); + // initializing it if it's ours + if (proc == meta->mine) initIpcShared_(res.second); + // store the retrieved data + meta->fds[proc] = res.first; + meta->shared[proc] = res.second; + } + DEBUGF(("%d> finished opening all shared\n", meta->mine)); +} + +// returns number of processes in node +int procBroadcastAndFree_(char* msg, std::size_t size) { + int* pes; + int nPes; + int thisPe = CmiMyPe(); + int thisNode = CmiPhysicalNodeID(thisPe); + CmiGetPesOnPhysicalNode(thisNode, &pes, &nPes); + int nSize = CmiMyNodeSize(); + int nProcs = nPes / nSize; + CmiAssert(thisPe == pes[0]); + + CpvAccess(num_cbs_exptd) = nProcs - 1; + for (auto rank = 1; rank < nProcs; rank++) { + auto& pe = pes[rank * nSize]; + if (rank == (nProcs - 1)) { + CmiSyncSendAndFree(pe, size, msg); + } else { + CmiSyncSend(pe, size, msg); + } + } + + // free if we didn't send anything + if (nProcs == 1) { + CmiFree(msg); + } + + return nProcs; +} + +static int sendPid_(CmiIpcManager* manager) { + CsvInitialize(pid_t, node_pid); + CsvAccess(node_pid) = getpid(); + + auto* pmsg = (pid_message_*)CmiAlloc(sizeof(pid_message_)); + CmiSetHandler(pmsg, CpvAccess(handle_node_pid)); + pmsg->key = manager->key; + pmsg->pid = CsvAccess(node_pid); + + return procBroadcastAndFree_((char*)pmsg, sizeof(pid_message_)); +} + +static void callbackHandler_(void* msg) { + int mine = CmiMyPe(); + int node = CmiPhysicalNodeID(mine); + int first = CmiGetFirstPeOnPhysicalNode(node); + auto* pmsg = (pid_message_*)msg; + + if (mine == first) { + // if we're still expecting messages: + if (++(CpvAccess(num_cbs_recvd)) < CpvAccess(num_cbs_exptd)) { + // free this one + CmiFree(msg); + // and move along + return; + } else { + // otherwise -- tell everyone we're ready! + printIpcStartupMessage_("pxshm"); + procBroadcastAndFree_((char*)msg, sizeof(pid_message_)); + } + } else { + CmiFree(msg); + } + + auto& meta = (CsvAccess(managers_))[(pmsg->key - 1)]; + openAllShared_(meta.get()); + awakenSleepers_(); +} + +static void nodePidHandler_(void* msg) { + auto* pmsg = (pid_message_*)msg; + CsvInitialize(pid_t, node_pid); + CsvAccess(node_pid) = pmsg->pid; + + int node = CmiPhysicalNodeID(CmiMyPe()); + int root = CmiGetFirstPeOnPhysicalNode(node); + CmiSetHandler(msg, CpvAccess(handle_callback)); + CmiSyncSendAndFree(root, sizeof(pid_message_), (char*)msg); +} + +void CmiIpcInit(char** argv) { + CsvInitialize(ipc_manager_map_, managers_); + + initSleepers_(); + initSegmentSize_(argv); + + CpvInitialize(int, num_cbs_recvd); + CpvInitialize(int, num_cbs_exptd); + CpvInitialize(int, handle_callback); + CpvAccess(handle_callback) = CmiRegisterHandler(callbackHandler_); + CpvInitialize(int, handle_node_pid); + CpvAccess(handle_node_pid) = CmiRegisterHandler(nodePidHandler_); +} + +CmiIpcManager* CmiMakeIpcManager(CthThread th) { + CpvAccess(num_cbs_recvd) = CpvAccess(num_cbs_exptd) = 0; + + putSleeper_(th); + + // ensure all sleepers are reg'd + CmiNodeAllBarrier(); + + if (CmiMyRank() == 0) { + auto key = CsvAccess(managers_).size() + 1; + auto* manager = new CmiIpcManager(key); + CsvAccess(managers_).emplace_back(manager); + // signal the metadata is ready + CmiNodeAllBarrier(); + return manager; + } else { + // pause until the metadata is ready + CmiNodeAllBarrier(); + return CsvAccess(managers_).back().get(); + } +} \ No newline at end of file diff --git a/src/cmishmem.cpp b/src/cmishmem.cpp new file mode 100644 index 0000000..4d284fd --- /dev/null +++ b/src/cmishmem.cpp @@ -0,0 +1,236 @@ +#include "cmi-shmem-common.h" + +#if CMK_HAS_XPMEM +#include "cmixpmem.cpp" +#else +#include "cmishm.cpp" +#endif + +#define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic*)msg)->rank +extern void CmiPushNode(void* msg); + +CpvExtern(int, CthResumeNormalThreadIdx); + +inline std::size_t whichBin_(std::size_t size); +inline static CmiIpcBlock* popBlock_(std::atomic& head, + void* base); +inline static bool pushBlock_(std::atomic& head, + std::uintptr_t value, void* base); +static std::uintptr_t allocBlock_(ipc_shared_* meta, std::size_t size); + +void* CmiIpcBlockToMsg(CmiIpcBlock* block, bool init) { + auto* msg = (char*)CmiIpcBlockToMsg(block); + if (init) { + // NOTE ( this is identical to code in CmiAlloc ) + CmiAssert(((uintptr_t)msg % ALIGN_BYTES) == 0); + CMI_ZC_MSGTYPE((void*) ptr) = CMK_REG_NO_ZC_MSG; + CMI_MSG_NOKEEP((void*) ptr) = 0; + SIZEFIELD(msg) = block->size; + REFFIELDSET(msg, 1); + } + return msg; +} + +CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, char* src, std::size_t len, + int node, int rank, int timeout) { + char* dst; + CmiIpcBlock* block; + // check whether we miraculously got a usable block + if ((block = CmiIsIpcBlock(manager, BLKSTART(src), node)) && (node == block->src)) { + dst = src; + } else { + std::pair status; + // we only want to attempt again if we fail due to a timeout: + if (timeout > 0) { + do { + status = CmiAllocIpcBlock(manager, node, len + sizeof(CmiChunkHeader)); + } while ((--timeout) && (status.second == CMI_IPC_TIMEOUT)); + } else { + do { + status = CmiAllocIpcBlock(manager, node, len + sizeof(CmiChunkHeader)); + // never give up, never surrender! + } while (status.second == CMI_IPC_TIMEOUT); + } + // grab the block from the rval + block = status.first; + if (block == nullptr) { + return nullptr; + } else { + CmiAssertMsg((block->dst == manager->mine) && (manager->mine == CmiMyNode())); + dst = (char*)CmiIpcBlockToMsg(block, true); + memcpy(dst, src, len); + CmiFree(src); + } + } + CMI_DEST_RANK(dst) = rank; + return block; +} + +extern void CmiHandleImmediateMessage(void *msg); + +void CmiDeliverIpcBlockMsg(CmiIpcBlock* block) { + auto* msg = CmiIpcBlockToMsg(block); + auto& rank = CMI_DEST_RANK(msg); + if (rank == cmi::ipc::nodeDatagram) { + CmiPushNode(msg); + } else + CmiPushPE(rank, msg); +} + +inline static bool metadataReady_(CmiIpcManager* meta) { + return meta && meta->shared[meta->mine]; +} + +CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager* meta) { + if (metadataReady_(meta)) { + auto& shared = meta->shared[meta->mine]; + return popBlock_(shared->queue, shared); + } else { + return nullptr; + } +} + +bool CmiPushIpcBlock(CmiIpcManager* meta, CmiIpcBlock* block) { + auto& shared = meta->shared[block->src]; + auto& queue = shared->queue; + CmiAssert(meta->mine == block->dst); + return pushBlock_(queue, block->orig, shared); +} + +std::pair CmiAllocIpcBlock(CmiIpcManager* meta, int dstProc, std::size_t size) { + auto dstNode = CmiPhysicalNodeID(CmiNodeFirst(dstProc)); + auto thisPe = CmiInCommThread() ? CmiNodeFirst(CmiMyNode()) : CmiMyPe(); + auto thisProc = CmiMyNode(); + auto thisNode = CmiPhysicalNodeID(thisPe); + if ((thisProc == dstProc) || (thisNode != dstNode)) { + return std::make_pair((CmiIpcBlock*)nullptr, CMI_IPC_REMOTE_DESTINATION); + } + + auto& shared = meta->shared[dstProc]; + auto bin = whichBin_(size); + CmiAssert(bin < kNumCutOffPoints); + + auto* block = popBlock_(shared->free[bin], shared); + if (block == nullptr) { + auto totalSize = kCutOffPoints[bin]; + auto offset = allocBlock_(shared, totalSize); + switch (offset) { + case cmi::ipc::nil: + return std::make_pair((CmiIpcBlock*)nullptr, CMI_IPC_TIMEOUT); + case cmi::ipc::max: + return std::make_pair((CmiIpcBlock*)nullptr, CMI_IPC_OUT_OF_MEMORY); + default: + break; + } + // the block's address is relative to the share + block = (CmiIpcBlock*)((char*)shared + offset); + CmiAssert(((std::uintptr_t)block % alignof(CmiIpcBlock)) == 0); + // construct the block + new (block) CmiIpcBlock(totalSize, offset); + } + + block->src = dstProc; + block->dst = thisProc; + + return std::make_pair(block, CMI_IPC_SUCCESS); +} + +void CmiFreeIpcBlock(CmiIpcManager* meta, CmiIpcBlock* block) { + auto bin = whichBin_(block->size); + CmiAssertMsg(bin < kNumCutOffPoints); + auto& shared = meta->shared[block->src]; + auto& free = shared->free[bin]; + while (!pushBlock_(free, block->orig, shared)) + ; +} + +CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager* meta, void* addr, int node) { + auto* shared = meta ? meta->shared[node] : nullptr; + if (shared == nullptr) { + return nullptr; + } + auto* begin = (char*)shared; + auto* end = begin + shared->max; + if (begin < addr && addr < end) { + return (CmiIpcBlock*)((char*)addr - sizeof(CmiIpcBlock)); + } else { + return nullptr; + } +} + +static std::uintptr_t allocBlock_(ipc_shared_* meta, std::size_t size) { + auto res = meta->heap.exchange(cmi::ipc::nil, std::memory_order_acquire); + if (res == cmi::ipc::nil) { + return cmi::ipc::nil; + } else { + auto next = res + size + sizeof(CmiIpcBlock); + auto offset = size % alignof(CmiIpcBlock); + auto oom = next >= meta->max; + auto value = oom ? res : (next + offset); + auto status = meta->heap.exchange(value, std::memory_order_release); + CmiAssert(status == cmi::ipc::nil); + if (oom) { + return cmi::ipc::max; + } else { + return res; + } + } +} + +// NOTE ( there may be a faster way to do this? ) +inline std::size_t whichBin_(std::size_t size) { + std::size_t bin; + for (bin = 0; bin < kNumCutOffPoints; bin++) { + if (size <= kCutOffPoints[bin]) { + break; + } + } + return bin; +} + +inline static CmiIpcBlock* popBlock_(std::atomic& head, + void* base) { + auto prev = head.exchange(cmi::ipc::nil, std::memory_order_acquire); + if (prev == cmi::ipc::nil) { + return nullptr; + } else if (prev == cmi::ipc::max) { + auto check = head.exchange(prev, std::memory_order_release); + CmiAssert(check == cmi::ipc::nil); + return nullptr; + } else { + // translate the "home" PE's address into a local one + CmiAssert(((std::uintptr_t)base % ALIGN_BYTES) == 0); + auto* xlatd = (CmiIpcBlock*)((char*)base + prev); + auto check = head.exchange(xlatd->next, std::memory_order_release); + CmiAssert(check == cmi::ipc::nil); + return xlatd; + } +} + +inline static bool pushBlock_(std::atomic& head, + std::uintptr_t value, void* base) { + CmiAssert(value != cmi::ipc::nil); + auto prev = head.exchange(cmi::ipc::nil, std::memory_order_acquire); + if (prev == cmi::ipc::nil) { + return false; + } + auto* block = (CmiIpcBlock*)((char*)base + value); + block->next = prev; + auto check = head.exchange(value, std::memory_order_release); + CmiAssert(check == cmi::ipc::nil); + return true; +} + +static void awakenSleepers_(void) { + auto& sleepers = CsvAccess(sleepers); + for (auto i = 0; i < sleepers.size(); i++) { + auto& th = sleepers[i]; + if (i == CmiMyRank()) { + CthAwaken(th); + } else { + auto* token = CthGetToken(th); + CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx)); + CmiPushPE(i, token); + } + } +} \ No newline at end of file diff --git a/src/cmishmem.h b/src/cmishmem.h new file mode 100644 index 0000000..1793d52 --- /dev/null +++ b/src/cmishmem.h @@ -0,0 +1,109 @@ +#ifndef CMI_SHMEM_HH +#define CMI_SHMEM_HH + +static_assert(CMK_USE_SHMEM, "enable shmem to use this header"); + +#include "converse_internal.h" +#include +#include +#include +#include + +#define CMI_IPC_CUTOFF_ARG "ipccutoff" +#define CMI_IPC_CUTOFF_DESC "max message size for cmi-shmem (in bytes)" +#define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize" +#define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)" + +namespace cmi { +namespace ipc { +// recommended cutoff for block sizes +CpvExtern(std::size_t, kRecommendedCutoff); +// used to represent an empty linked list +constexpr auto nil = std::uintptr_t(0); +// used to represent the tail of a linked list +constexpr auto max = std::numeric_limits::max(); +// used to indicate a message bound for a node +constexpr auto nodeDatagram = std::numeric_limits::max(); +// default number of attempts to alloc before timing out +constexpr auto defaultTimeout = 4; +} // namespace ipc +} // namespace cmi + +// alignas is used for padding here, rather than for alignment of the +// CmiIpcBlock itself. +struct alignas(ALIGN_BYTES) CmiIpcBlock { +public: + // "home" rank of the block + int src; + int dst; + std::uintptr_t orig; + std::uintptr_t next; + std::size_t size; + + CmiIpcBlock(std::size_t size_, std::uintptr_t orig_) + : orig(orig_), next(cmi::ipc::nil), size(size_) {} +}; + +struct CmiIpcManager; + +enum CmiIpcAllocStatus { + CMI_IPC_OUT_OF_MEMORY, + CMI_IPC_REMOTE_DESTINATION, + CMI_IPC_SUCCESS, + CMI_IPC_TIMEOUT +}; + +// sets up ipc environment +void CmiIpcInit(char** argv); + +// creates an ipc manager, waking the thread when it's done +// ( this must be called in the same order on all pes! ) +CmiIpcManager* CmiMakeIpcManager(CthThread th); + +// push/pop blocks from the manager's send/recv queue +bool CmiPushIpcBlock(CmiIpcManager*, CmiIpcBlock*); +CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager*); + +// tries to allocate a block, returning null if unsucessful +// (fails when other PEs are contending resources) +// second value of pair indicates failure cause +std::pair CmiAllocIpcBlock(CmiIpcManager*, int node, std::size_t size); + +// frees a block -- enabling it to be used again +void CmiFreeIpcBlock(CmiIpcManager*, CmiIpcBlock*); + +// currently a no-op but may be eventually usable +// intended to "capture" blocks from remote pes +inline void CmiCacheIpcBlock(CmiIpcBlock*) { return; } + +// identifies whether a void* is the payload of a block +// belonging to the given node +CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager*, void*, int node); + +// if (init) is true -- initializes the +// memory segment for use as a message +void* CmiIpcBlockToMsg(CmiIpcBlock*, bool init); + +// equivalent to calling above with (init = false) +inline void* CmiIpcBlockToMsg(CmiIpcBlock* block) { + auto res = (char*)block + sizeof(CmiIpcBlock) + sizeof(CmiChunkHeader); + return (void*)res; +} + +inline CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, void* msg) { + return CmiIsIpcBlock(manager, (char*)msg - sizeof(CmiChunkHeader), CmiMyNode()); +} + +CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager*, char* msg, std::size_t len, int node, + int rank = cmi::ipc::nodeDatagram, + int timeout = cmi::ipc::defaultTimeout); + +// deliver a block as a message +void CmiDeliverIpcBlockMsg(CmiIpcBlock*); + +inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) { + using namespace cmi::ipc; + return CpvAccess(kRecommendedCutoff); +} + +#endif \ No newline at end of file diff --git a/src/cmixpmem.cpp b/src/cmixpmem.cpp new file mode 100644 index 0000000..8e53a01 --- /dev/null +++ b/src/cmixpmem.cpp @@ -0,0 +1,196 @@ +#include "cmi-shmem-common.h" +#include +#include +#include + +extern "C" { +#include +} + +// "borrowed" from VADER +// (https://github.com/open-mpi/ompi/tree/386ba164557bb8115131921041757be94a989646/opal/mca/smsc/xpmem) +#define OPAL_DOWN_ALIGN(x, a, t) ((x) & ~(((t)(a)-1))) +#define OPAL_DOWN_ALIGN_PTR(x, a, t) \ + ((t)OPAL_DOWN_ALIGN((uintptr_t)x, a, uintptr_t)) +#define OPAL_ALIGN(x, a, t) (((x) + ((t)(a)-1)) & ~(((t)(a)-1))) +#define OPAL_ALIGN_PTR(x, a, t) ((t)OPAL_ALIGN((uintptr_t)x, a, uintptr_t)) +#define OPAL_ALIGN_PAD_AMOUNT(x, s) \ + ((~((uintptr_t)(x)) + 1) & ((uintptr_t)(s)-1)) + +CpvStaticDeclare(int, handle_init); + +struct init_msg_ { + char core[CmiMsgHeaderSizeBytes]; + std::size_t key; + int from; + xpmem_segid_t segid; + ipc_shared_* shared; +}; + +// NOTE ( we should eventually detach xpmem segments at close ) +// ( it's not urgently needed since xpmem does it for us ) +struct CmiIpcManager : public ipc_metadata_ { + // maps ranks to segments + std::map segments; + // maps segments to xpmem apids + std::map instances; + // number of physical peers + int nPeers; + // create our local shared data + CmiIpcManager(std::size_t key) : ipc_metadata_(key) { + this->shared[this->mine] = makeIpcShared_(); + } + + void put_segment(int proc, const xpmem_segid_t& segid) { + auto ins = this->segments.emplace(proc, segid); + CmiAssert(ins.second); + } + + xpmem_segid_t get_segment(int proc) { + auto search = this->segments.find(proc); + if (search == std::end(this->segments)) { + if (mine == proc) { + auto segid = + xpmem_make(0, XPMEM_MAXADDR_SIZE, XPMEM_PERMIT_MODE, (void*)0666); + this->put_segment(mine, segid); + return segid; + } else { + return -1; + } + } else { + return search->second; + } + } + + xpmem_apid_t get_instance(int proc) { + auto segid = this->get_segment(proc); + if (segid >= 0) { + auto search = this->instances.find(segid); + if (search == std::end(this->instances)) { + auto apid = xpmem_get(segid, XPMEM_RDWR, XPMEM_PERMIT_MODE, NULL); + CmiAssertMsg(apid >= 0, "invalid segid?"); + auto ins = this->instances.emplace(segid, apid); + CmiAssert(ins.second); + search = ins.first; + } + return search->second; + } else { + return -1; + } + } +}; + +void* translateAddr_(ipc_manager_ptr_& meta, int proc, void* remote_ptr, + const std::size_t& size) { + if (proc == meta->mine) { + return remote_ptr; + } else { + auto apid = meta->get_instance(proc); + CmiAssert(apid >= 0); + // this magic was borrowed from VADER + uintptr_t attach_align = 1 << 23; + auto base = OPAL_DOWN_ALIGN_PTR(remote_ptr, attach_align, uintptr_t); + auto bound = + OPAL_ALIGN_PTR(remote_ptr + size - 1, attach_align, uintptr_t) + 1; + + using offset_type = decltype(xpmem_addr::offset); + xpmem_addr addr{.apid = apid, .offset = (offset_type)base}; + auto* ctx = xpmem_attach(addr, bound - base, NULL); + CmiAssert(ctx != (void*)-1); + + return (void*)((uintptr_t)ctx + + (ptrdiff_t)((uintptr_t)remote_ptr - (uintptr_t)base)); + } +} + +static void handleInitialize_(void* msg) { + auto* imsg = (init_msg_*)msg; + auto& meta = (CsvAccess(managers_))[(imsg->key - 1)]; + // extract the segment id and shared region + // from the msg (registering it in our metadata) + meta->put_segment(imsg->from, imsg->segid); + meta->shared[imsg->from] = (ipc_shared_*)translateAddr_( + meta, imsg->from, imsg->shared, sizeof(ipc_shared_)); + // then free the message + CmiFree(imsg); + // if we received messages from all our peers: + if (meta->nPeers == meta->shared.size()) { + // resume the sleeping thread + if (CmiMyPe() == 0) { + printIpcStartupMessage_("xpmem"); + } + + awakenSleepers_(); + } +} + +void CmiIpcInit(char** argv) { + CsvInitialize(ipc_manager_map_, managers_); + + initSleepers_(); + initSegmentSize_(argv); + + CpvInitialize(int, handle_init); + CpvAccess(handle_init) = CmiRegisterHandler(handleInitialize_); +} + +CmiIpcManager* CmiMakeIpcManager(CthThread th) { + putSleeper_(th); + + // ensure all sleepers are reg'd + CmiNodeAllBarrier(); + + CmiIpcManager* meta; + if (CmiMyRank() == 0) { + auto key = CsvAccess(managers_).size() + 1; + meta = new CmiIpcManager(key); + CsvAccess(managers_).emplace_back(meta); + } else { + // pause until the metadata is ready + CmiNodeAllBarrier(); + return CsvAccess(managers_).back().get(); + } + + int* pes; + int nPes; + auto thisPe = CmiMyPe(); + auto thisNode = CmiPhysicalNodeID(CmiMyPe()); + CmiGetPesOnPhysicalNode(thisNode, &pes, &nPes); + auto nSize = CmiMyNodeSize(); + auto nProcs = nPes / nSize; + meta->nPeers = nProcs; + + if (nProcs > 1) { + auto* imsg = (init_msg_*)CmiAlloc(sizeof(init_msg_)); + CmiSetHandler(imsg, CpvAccess(handle_init)); + imsg->key = meta->key; + imsg->from = meta->mine; + imsg->segid = meta->get_segment(meta->mine); + imsg->shared = meta->shared[meta->mine]; + // send messages to all the pes on this node + for (auto i = 0; i < nProcs; i++) { + auto& pe = pes[i * nSize]; + auto last = i == (nProcs - 1); + if (pe == thisPe) { + if (last) { + CmiFree(imsg); + } + continue; + } else if (last) { + // free'ing with the last send + CmiSyncSendAndFree(pe, sizeof(init_msg_), (char*)imsg); + } else { + // then sending (without free) otherwise + CmiSyncSend(pe, sizeof(init_msg_), (char*)imsg); + } + } + } else { + // single process -- wake up sleeping thread(s) + awakenSleepers_(); + } + + // signal that the metadata is ready + CmiNodeAllBarrier(); + + return meta; +} \ No newline at end of file diff --git a/src/convcore.cpp b/src/convcore.cpp index c22cc0a..274eed1 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -43,6 +43,7 @@ int quietMode; int quietModeRequested; int userDrivenMode; int _replaySystem = 0; +CsvDeclare(CmiIpcManager*, coreIpcManager_); //partition PartitionInfo _partitionInfo; @@ -257,6 +258,10 @@ void CmiInitState(int rank) { newZCPupGets); // Check if this is necessary CpvInitialize(int, interopExitFlag); CpvAccess(interopExitFlag) = 0; + #if CMK_USE_SHMEM + CsvInitialize(CmiIpcManager*, coreIpcManager_); + CsvAccess(coreIpcManager_) = nullptr; + #endif CmiOnesidedDirectInit(); CcdModuleInit(); } @@ -336,6 +341,10 @@ void CmiPushPE(int destPE, void *msg) { CmiPushPE(destPE, messageSize, msg); } +void CmiPushNode(void *msg) { + CmiNodeQueue->push(msg); +} + void *CmiAlloc(int size) { if (size <= 0) { CmiPrintf("CmiAlloc: size <= 0\n"); @@ -388,8 +397,20 @@ void CmiFree(void *msg) { if (refCount == 1) { free(BLKSTART(parentBlk)); } - + #if CMK_USE_SHMEM + // we should only free _our_ IPC blocks -- so calling CmiFree on + // an IPC block issued by another process will cause a bad free! + // (note -- this would only occur if you alloc an ipc block then + // decide not to send it; that should be avoided! ) + CmiIpcBlock* ipc; + auto* manager = CsvAccess(coreIpcManager_); + if (blk && (ipc = CmiIsIpcBlock(manager, BLKSTART(blk), CmiMyNode()))) { + CmiFreeIpcBlock(manager, ipc); + return; + } +#endif + } int CmiGetReference(void *blk) { return REFFIELD(CmiAllocFindEnclosing(blk)); } From 515ea50bc323a421e3d2cac0d81d7d6b040ca74f Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Fri, 19 Sep 2025 16:02:15 -0500 Subject: [PATCH 02/13] fix build for reconverse --- {src => include}/cmishmem.h | 8 +++----- include/converse.h | 28 +++++++++++++++++++--------- src/CMakeLists.txt | 2 +- src/cmi-shmem-common.h | 1 + src/cmishm.cpp | 26 ++++++++++++++++++++++++++ src/cmishmem.cpp | 37 ++++++------------------------------- src/conv-topology.cpp | 24 ------------------------ 7 files changed, 56 insertions(+), 70 deletions(-) rename {src => include}/cmishmem.h (97%) diff --git a/src/cmishmem.h b/include/cmishmem.h similarity index 97% rename from src/cmishmem.h rename to include/cmishmem.h index 1793d52..b51bd72 100644 --- a/src/cmishmem.h +++ b/include/cmishmem.h @@ -1,9 +1,6 @@ #ifndef CMI_SHMEM_HH #define CMI_SHMEM_HH -static_assert(CMK_USE_SHMEM, "enable shmem to use this header"); - -#include "converse_internal.h" #include #include #include @@ -14,6 +11,9 @@ static_assert(CMK_USE_SHMEM, "enable shmem to use this header"); #define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize" #define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)" +struct CmiIpcManager; +#include "converse.h" + namespace cmi { namespace ipc { // recommended cutoff for block sizes @@ -44,8 +44,6 @@ struct alignas(ALIGN_BYTES) CmiIpcBlock { : orig(orig_), next(cmi::ipc::nil), size(size_) {} }; -struct CmiIpcManager; - enum CmiIpcAllocStatus { CMI_IPC_OUT_OF_MEMORY, CMI_IPC_REMOTE_DESTINATION, diff --git a/include/converse.h b/include/converse.h index def94ba..14b7230 100644 --- a/include/converse.h +++ b/include/converse.h @@ -389,15 +389,25 @@ int CmiError(const char *format, ...); void CmiInitCPUTopology(char **argv); void CmiInitCPUAffinity(char **argv); -void __CmiEnforceMsgHelper(const char *expr, const char *fileName, int lineNum, - const char *msg, ...); - -#define CmiEnforce(condition) \ - do { \ - if (!(condition)) { \ - __CmiEnforceMsgHelper(#condition, __FILE__, __LINE__, ""); \ - } \ - } while (0) +#define __CMK_STRING(x) #x +#define __CMK_XSTRING(x) __CMK_STRING(x) + +void __CmiEnforceHelper(const char* expr, const char* fileName, const char* lineNum); +void __CmiEnforceMsgHelper(const char* expr, const char* fileName, + const char* lineNum, const char* msg, ...); + +#define CmiEnforce(expr) \ + ((void)(CMI_LIKELY(expr) ? 0 \ + : (__CmiEnforceHelper(__CMK_STRING(expr), __FILE__, \ + __CMK_XSTRING(__LINE__)), \ + 0))) + +#define CmiEnforceMsg(expr, ...) \ + ((void)(CMI_LIKELY(expr) \ + ? 0 \ + : (__CmiEnforceMsgHelper(__CMK_STRING(expr), __FILE__, \ + __CMK_XSTRING(__LINE__), __VA_ARGS__), \ + 0))) double getCurrentTime(void); double CmiWallTimer(void); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 00b6df2..3002702 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,7 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp scheduler.cpp cpuaffinity.cpp collectives.cpp comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp - conv-rdma.cpp conv-topology.cpp) + conv-rdma.cpp conv-topology.cpp cmishm.cpp cmishmem.cpp) target_include_directories( reconverse PRIVATE $ $) diff --git a/src/cmi-shmem-common.h b/src/cmi-shmem-common.h index 140b3e3..4c63972 100644 --- a/src/cmi-shmem-common.h +++ b/src/cmi-shmem-common.h @@ -1,6 +1,7 @@ #ifndef CMI_SHMEM_COMMON_HH #define CMI_SHMEM_COMMON_HH +#include "converse_internal.h" #include "cmishmem.h" #include diff --git a/src/cmishm.cpp b/src/cmishm.cpp index 02bf021..c4acaa1 100644 --- a/src/cmishm.cpp +++ b/src/cmishm.cpp @@ -17,6 +17,7 @@ CpvStaticDeclare(int, num_cbs_exptd); CpvStaticDeclare(int, handle_callback); CpvStaticDeclare(int, handle_node_pid); CsvStaticDeclare(pid_t, node_pid); +CpvExtern(int, CthResumeNormalThreadIdx); static int sendPid_(CmiIpcManager*); static void openAllShared_(CmiIpcManager*); @@ -120,6 +121,31 @@ static void openAllShared_(CmiIpcManager* meta) { DEBUGF(("%d> finished opening all shared\n", meta->mine)); } +// NOTE ( there may be a faster way to do this? ) +inline std::size_t whichBin_(std::size_t size) { + std::size_t bin; + for (bin = 0; bin < kNumCutOffPoints; bin++) { + if (size <= kCutOffPoints[bin]) { + break; + } + } + return bin; +} + +static void awakenSleepers_(void) { + auto& current_sleepers = CsvAccess(sleepers); + for (auto i = 0; i < current_sleepers.size(); i++) { + auto& th = current_sleepers[i]; + if (i == CmiMyRank()) { + CthAwaken(th); + } else { + auto* token = CthGetToken(th); + CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx)); + CmiPushPE(i, token); + } + } +} + // returns number of processes in node int procBroadcastAndFree_(char* msg, std::size_t size) { int* pes; diff --git a/src/cmishmem.cpp b/src/cmishmem.cpp index 4d284fd..840573f 100644 --- a/src/cmishmem.cpp +++ b/src/cmishmem.cpp @@ -6,7 +6,7 @@ #include "cmishm.cpp" #endif -#define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic*)msg)->rank +#define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic*)msg)->destPE extern void CmiPushNode(void* msg); CpvExtern(int, CthResumeNormalThreadIdx); @@ -23,8 +23,8 @@ void* CmiIpcBlockToMsg(CmiIpcBlock* block, bool init) { if (init) { // NOTE ( this is identical to code in CmiAlloc ) CmiAssert(((uintptr_t)msg % ALIGN_BYTES) == 0); - CMI_ZC_MSGTYPE((void*) ptr) = CMK_REG_NO_ZC_MSG; - CMI_MSG_NOKEEP((void*) ptr) = 0; + CMI_ZC_MSGTYPE((void*) msg) = CMK_REG_NO_ZC_MSG; + CMI_MSG_NOKEEP((void*) msg) = 0; SIZEFIELD(msg) = block->size; REFFIELDSET(msg, 1); } @@ -56,7 +56,7 @@ CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, char* src, std::size_t len if (block == nullptr) { return nullptr; } else { - CmiAssertMsg((block->dst == manager->mine) && (manager->mine == CmiMyNode())); + CmiAssert((block->dst == manager->mine) && (manager->mine == CmiMyNode())); dst = (char*)CmiIpcBlockToMsg(block, true); memcpy(dst, src, len); CmiFree(src); @@ -99,7 +99,7 @@ bool CmiPushIpcBlock(CmiIpcManager* meta, CmiIpcBlock* block) { std::pair CmiAllocIpcBlock(CmiIpcManager* meta, int dstProc, std::size_t size) { auto dstNode = CmiPhysicalNodeID(CmiNodeFirst(dstProc)); - auto thisPe = CmiInCommThread() ? CmiNodeFirst(CmiMyNode()) : CmiMyPe(); + auto thisPe = CmiMyPe(); auto thisProc = CmiMyNode(); auto thisNode = CmiPhysicalNodeID(thisPe); if ((thisProc == dstProc) || (thisNode != dstNode)) { @@ -137,7 +137,7 @@ std::pair CmiAllocIpcBlock(CmiIpcManager* meta, void CmiFreeIpcBlock(CmiIpcManager* meta, CmiIpcBlock* block) { auto bin = whichBin_(block->size); - CmiAssertMsg(bin < kNumCutOffPoints); + CmiAssert(bin < kNumCutOffPoints); auto& shared = meta->shared[block->src]; auto& free = shared->free[bin]; while (!pushBlock_(free, block->orig, shared)) @@ -177,17 +177,6 @@ static std::uintptr_t allocBlock_(ipc_shared_* meta, std::size_t size) { } } -// NOTE ( there may be a faster way to do this? ) -inline std::size_t whichBin_(std::size_t size) { - std::size_t bin; - for (bin = 0; bin < kNumCutOffPoints; bin++) { - if (size <= kCutOffPoints[bin]) { - break; - } - } - return bin; -} - inline static CmiIpcBlock* popBlock_(std::atomic& head, void* base) { auto prev = head.exchange(cmi::ipc::nil, std::memory_order_acquire); @@ -219,18 +208,4 @@ inline static bool pushBlock_(std::atomic& head, auto check = head.exchange(value, std::memory_order_release); CmiAssert(check == cmi::ipc::nil); return true; -} - -static void awakenSleepers_(void) { - auto& sleepers = CsvAccess(sleepers); - for (auto i = 0; i < sleepers.size(); i++) { - auto& th = sleepers[i]; - if (i == CmiMyRank()) { - CthAwaken(th); - } else { - auto* token = CthGetToken(th); - CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx)); - CmiPushPE(i, token); - } - } } \ No newline at end of file diff --git a/src/conv-topology.cpp b/src/conv-topology.cpp index 683144b..4387aaa 100644 --- a/src/conv-topology.cpp +++ b/src/conv-topology.cpp @@ -251,20 +251,6 @@ using namespace CpuTopoDetails; // } static std::atomic cpuTopoSyncHandlerDone{}; -# if CMK_SMP && !CMK_SMP_NO_COMMTHD -extern void CommunicationServerThread(int sleepTime); -static std::atomic cpuTopoSyncCommThreadDone{}; -# endif - -# if CMK_SMP && !CMK_SMP_NO_COMMTHD -static void cpuTopoSyncWaitCommThread(std::atomic& done) -{ - do CommunicationServerThread(5); - while (!done.load()); - - CommunicationServerThread(5); -} -# endif static void cpuTopoSyncWait(std::atomic& done) { @@ -532,13 +518,6 @@ void LrtsInitCpuTopo(char** argv) return; } -# if CMK_SMP && !CMK_SMP_NO_COMMTHD - if (CmiInCommThread()) - { - cpuTopoSyncWaitCommThread(cpuTopoSyncCommThreadDone); - } - else -# endif { /* prepare a msg to send */ hostnameMsg* msg = (hostnameMsg*)CmiAlloc(sizeof(hostnameMsg) + sizeof(_procInfo)); @@ -566,9 +545,6 @@ void LrtsInitCpuTopo(char** argv) CsdSchedulePoll(); } -# if CMK_SMP && !CMK_SMP_NO_COMMTHD - cpuTopoSyncCommThreadDone = true; -# endif } } From 373d330d6b8acd6f40e8f4f3da1a7f09563ca1dd Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Fri, 19 Sep 2025 16:19:12 -0500 Subject: [PATCH 03/13] fix build --- CMakeLists.txt | 1 + src/convcore.cpp | 36 +++++++++++++++++++++++++++++------- src/converse_config.h.in | 1 + 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8984f58..3ba3da4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,7 @@ option(SPANTREE "whether to enable spanning tree collectives" OFF) #should turn # Reconverse uses SMP always, but turn on this option for backwards compatibility (see pingpong_multipairs for example) option(CMK_SMP "whether to enable SMP support" ON) option(CMK_CPV_IS_SMP "whether to enable SMP support for cpvs" ON) +option(CMK_USE_SHMEM "whether to use POSIX shared memory for IPC" OFF) option(RECONVERSE_ATOMIC_QUEUE "whether to use atomic queue" ON) diff --git a/src/convcore.cpp b/src/convcore.cpp index 274eed1..393db74 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -258,7 +258,7 @@ void CmiInitState(int rank) { newZCPupGets); // Check if this is necessary CpvInitialize(int, interopExitFlag); CpvAccess(interopExitFlag) = 0; - #if CMK_USE_SHMEM + #ifdef CMK_USE_SHMEM CsvInitialize(CmiIpcManager*, coreIpcManager_); CsvAccess(coreIpcManager_) = nullptr; #endif @@ -398,14 +398,14 @@ void CmiFree(void *msg) { free(BLKSTART(parentBlk)); } - #if CMK_USE_SHMEM + #ifdef CMK_USE_SHMEM // we should only free _our_ IPC blocks -- so calling CmiFree on // an IPC block issued by another process will cause a bad free! // (note -- this would only occur if you alloc an ipc block then // decide not to send it; that should be avoided! ) CmiIpcBlock* ipc; auto* manager = CsvAccess(coreIpcManager_); - if (blk && (ipc = CmiIsIpcBlock(manager, BLKSTART(blk), CmiMyNode()))) { + if (msg && (ipc = CmiIsIpcBlock(manager, BLKSTART(msg), CmiMyNode()))) { CmiFreeIpcBlock(manager, ipc); return; } @@ -747,12 +747,34 @@ int CmiError(const char *format, ...) { return ret; } -void __CmiEnforceMsgHelper(const char *expr, const char *fileName, int lineNum, - const char *msg, ...) { - CmiAbort("[%d] Assertion \"%s\" failed in file %s line %d.\n", CmiMyPe(), - expr, fileName, lineNum); +void __CmiEnforceHelper(const char* expr, const char* fileName, const char* lineNum) +{ + CmiAbort("[%d] Assertion \"%s\" failed in file %s line %s.\n", CmiMyPe(), expr, + fileName, lineNum); +} + +void __CmiEnforceMsgHelper(const char* expr, const char* fileName, const char* lineNum, + const char* msg, ...) +{ + va_list args; + va_start(args, msg); + + // Get length of formatted string + va_list argsCopy; + va_copy(argsCopy, args); + const auto size = 1 + vsnprintf(nullptr, 0, msg, argsCopy); + va_end(argsCopy); + + // Allocate a buffer of right size and create formatted string in it + std::vector formatted(size); + vsnprintf(formatted.data(), size, msg, args); + va_end(args); + + CmiAbort("[%d] Assertion \"%s\" failed in file %s line %s.\n%s", CmiMyPe(), expr, + fileName, lineNum, formatted.data()); } + bool CmiGetIdle() { return idle_condition; } void CmiSetIdle(bool idle) { idle_condition = idle; } diff --git a/src/converse_config.h.in b/src/converse_config.h.in index 4b97829..037e4d5 100644 --- a/src/converse_config.h.in +++ b/src/converse_config.h.in @@ -6,5 +6,6 @@ #cmakedefine SPANTREE #cmakedefine CMK_SMP #cmakedefine CMK_CPV_IS_SMP +#cmakedefine CMK_USE_SHMEM #endif From 83abfa5d2a00d23c2b4dab3ed0740050c2e8c907 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Fri, 19 Sep 2025 16:34:53 -0500 Subject: [PATCH 04/13] newlines --- include/cmishmem.h | 2 +- src/cmi-shmem-common.h | 2 +- src/cmishm.cpp | 2 +- src/cmishmem.cpp | 2 +- src/cmixpmem.cpp | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/cmishmem.h b/include/cmishmem.h index b51bd72..c7b284e 100644 --- a/include/cmishmem.h +++ b/include/cmishmem.h @@ -104,4 +104,4 @@ inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) { return CpvAccess(kRecommendedCutoff); } -#endif \ No newline at end of file +#endif diff --git a/src/cmi-shmem-common.h b/src/cmi-shmem-common.h index 4c63972..8921aee 100644 --- a/src/cmi-shmem-common.h +++ b/src/cmi-shmem-common.h @@ -129,4 +129,4 @@ using ipc_manager_ptr_ = std::unique_ptr; using ipc_manager_map_ = std::vector; CsvStaticDeclare(ipc_manager_map_, managers_); -#endif \ No newline at end of file +#endif diff --git a/src/cmishm.cpp b/src/cmishm.cpp index c4acaa1..832be38 100644 --- a/src/cmishm.cpp +++ b/src/cmishm.cpp @@ -259,4 +259,4 @@ CmiIpcManager* CmiMakeIpcManager(CthThread th) { CmiNodeAllBarrier(); return CsvAccess(managers_).back().get(); } -} \ No newline at end of file +} diff --git a/src/cmishmem.cpp b/src/cmishmem.cpp index 840573f..b5bcc3a 100644 --- a/src/cmishmem.cpp +++ b/src/cmishmem.cpp @@ -208,4 +208,4 @@ inline static bool pushBlock_(std::atomic& head, auto check = head.exchange(value, std::memory_order_release); CmiAssert(check == cmi::ipc::nil); return true; -} \ No newline at end of file +} diff --git a/src/cmixpmem.cpp b/src/cmixpmem.cpp index 8e53a01..b895887 100644 --- a/src/cmixpmem.cpp +++ b/src/cmixpmem.cpp @@ -193,4 +193,4 @@ CmiIpcManager* CmiMakeIpcManager(CthThread th) { CmiNodeAllBarrier(); return meta; -} \ No newline at end of file +} From c91cb16788e3fbf5cc5fb32648e851ec2acde10f Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Sat, 20 Sep 2025 08:16:55 -0500 Subject: [PATCH 05/13] remove import --- include/cmishmem.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/cmishmem.h b/include/cmishmem.h index c7b284e..ac23205 100644 --- a/include/cmishmem.h +++ b/include/cmishmem.h @@ -12,7 +12,6 @@ #define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)" struct CmiIpcManager; -#include "converse.h" namespace cmi { namespace ipc { From ac3c9583f185effdfc443f5731179636047a412b Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Sat, 20 Sep 2025 08:32:05 -0500 Subject: [PATCH 06/13] remove cmishmem.h --- include/cmishmem.h | 106 ----------------------------------------- include/converse.h | 100 +++++++++++++++++++++++++++++++++++++- src/cmi-shmem-common.h | 1 - 3 files changed, 99 insertions(+), 108 deletions(-) delete mode 100644 include/cmishmem.h diff --git a/include/cmishmem.h b/include/cmishmem.h deleted file mode 100644 index ac23205..0000000 --- a/include/cmishmem.h +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef CMI_SHMEM_HH -#define CMI_SHMEM_HH - -#include -#include -#include -#include - -#define CMI_IPC_CUTOFF_ARG "ipccutoff" -#define CMI_IPC_CUTOFF_DESC "max message size for cmi-shmem (in bytes)" -#define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize" -#define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)" - -struct CmiIpcManager; - -namespace cmi { -namespace ipc { -// recommended cutoff for block sizes -CpvExtern(std::size_t, kRecommendedCutoff); -// used to represent an empty linked list -constexpr auto nil = std::uintptr_t(0); -// used to represent the tail of a linked list -constexpr auto max = std::numeric_limits::max(); -// used to indicate a message bound for a node -constexpr auto nodeDatagram = std::numeric_limits::max(); -// default number of attempts to alloc before timing out -constexpr auto defaultTimeout = 4; -} // namespace ipc -} // namespace cmi - -// alignas is used for padding here, rather than for alignment of the -// CmiIpcBlock itself. -struct alignas(ALIGN_BYTES) CmiIpcBlock { -public: - // "home" rank of the block - int src; - int dst; - std::uintptr_t orig; - std::uintptr_t next; - std::size_t size; - - CmiIpcBlock(std::size_t size_, std::uintptr_t orig_) - : orig(orig_), next(cmi::ipc::nil), size(size_) {} -}; - -enum CmiIpcAllocStatus { - CMI_IPC_OUT_OF_MEMORY, - CMI_IPC_REMOTE_DESTINATION, - CMI_IPC_SUCCESS, - CMI_IPC_TIMEOUT -}; - -// sets up ipc environment -void CmiIpcInit(char** argv); - -// creates an ipc manager, waking the thread when it's done -// ( this must be called in the same order on all pes! ) -CmiIpcManager* CmiMakeIpcManager(CthThread th); - -// push/pop blocks from the manager's send/recv queue -bool CmiPushIpcBlock(CmiIpcManager*, CmiIpcBlock*); -CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager*); - -// tries to allocate a block, returning null if unsucessful -// (fails when other PEs are contending resources) -// second value of pair indicates failure cause -std::pair CmiAllocIpcBlock(CmiIpcManager*, int node, std::size_t size); - -// frees a block -- enabling it to be used again -void CmiFreeIpcBlock(CmiIpcManager*, CmiIpcBlock*); - -// currently a no-op but may be eventually usable -// intended to "capture" blocks from remote pes -inline void CmiCacheIpcBlock(CmiIpcBlock*) { return; } - -// identifies whether a void* is the payload of a block -// belonging to the given node -CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager*, void*, int node); - -// if (init) is true -- initializes the -// memory segment for use as a message -void* CmiIpcBlockToMsg(CmiIpcBlock*, bool init); - -// equivalent to calling above with (init = false) -inline void* CmiIpcBlockToMsg(CmiIpcBlock* block) { - auto res = (char*)block + sizeof(CmiIpcBlock) + sizeof(CmiChunkHeader); - return (void*)res; -} - -inline CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, void* msg) { - return CmiIsIpcBlock(manager, (char*)msg - sizeof(CmiChunkHeader), CmiMyNode()); -} - -CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager*, char* msg, std::size_t len, int node, - int rank = cmi::ipc::nodeDatagram, - int timeout = cmi::ipc::defaultTimeout); - -// deliver a block as a message -void CmiDeliverIpcBlockMsg(CmiIpcBlock*); - -inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) { - using namespace cmi::ipc; - return CpvAccess(kRecommendedCutoff); -} - -#endif diff --git a/include/converse.h b/include/converse.h index 14b7230..85f6f60 100644 --- a/include/converse.h +++ b/include/converse.h @@ -8,6 +8,8 @@ #include #include #include +#include +#include using CmiInt1 = std::int8_t; using CmiInt2 = std::int16_t; @@ -990,7 +992,103 @@ void CmiInterSyncNodeSendAndFreeFn(int destNode, int partition, int messageSize, /* end of variables and functions for partition */ -#include "cmishmem.h" +#define CMI_IPC_CUTOFF_ARG "ipccutoff" +#define CMI_IPC_CUTOFF_DESC "max message size for cmi-shmem (in bytes)" +#define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize" +#define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)" + +struct CmiIpcManager; + +namespace cmi { +namespace ipc { +// recommended cutoff for block sizes +CpvExtern(std::size_t, kRecommendedCutoff); +// used to represent an empty linked list +constexpr auto nil = std::uintptr_t(0); +// used to represent the tail of a linked list +constexpr auto max = std::numeric_limits::max(); +// used to indicate a message bound for a node +constexpr auto nodeDatagram = std::numeric_limits::max(); +// default number of attempts to alloc before timing out +constexpr auto defaultTimeout = 4; +} // namespace ipc +} // namespace cmi + +// alignas is used for padding here, rather than for alignment of the +// CmiIpcBlock itself. +struct alignas(ALIGN_BYTES) CmiIpcBlock { +public: + // "home" rank of the block + int src; + int dst; + std::uintptr_t orig; + std::uintptr_t next; + std::size_t size; + + CmiIpcBlock(std::size_t size_, std::uintptr_t orig_) + : orig(orig_), next(cmi::ipc::nil), size(size_) {} +}; + +enum CmiIpcAllocStatus { + CMI_IPC_OUT_OF_MEMORY, + CMI_IPC_REMOTE_DESTINATION, + CMI_IPC_SUCCESS, + CMI_IPC_TIMEOUT +}; + +// sets up ipc environment +void CmiIpcInit(char** argv); + +// creates an ipc manager, waking the thread when it's done +// ( this must be called in the same order on all pes! ) +CmiIpcManager* CmiMakeIpcManager(CthThread th); + +// push/pop blocks from the manager's send/recv queue +bool CmiPushIpcBlock(CmiIpcManager*, CmiIpcBlock*); +CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager*); + +// tries to allocate a block, returning null if unsucessful +// (fails when other PEs are contending resources) +// second value of pair indicates failure cause +std::pair CmiAllocIpcBlock(CmiIpcManager*, int node, std::size_t size); + +// frees a block -- enabling it to be used again +void CmiFreeIpcBlock(CmiIpcManager*, CmiIpcBlock*); + +// currently a no-op but may be eventually usable +// intended to "capture" blocks from remote pes +inline void CmiCacheIpcBlock(CmiIpcBlock*) { return; } + +// identifies whether a void* is the payload of a block +// belonging to the given node +CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager*, void*, int node); + +// if (init) is true -- initializes the +// memory segment for use as a message +void* CmiIpcBlockToMsg(CmiIpcBlock*, bool init); + +// equivalent to calling above with (init = false) +inline void* CmiIpcBlockToMsg(CmiIpcBlock* block) { + auto res = (char*)block + sizeof(CmiIpcBlock) + sizeof(CmiChunkHeader); + return (void*)res; +} + +inline CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, void* msg) { + return CmiIsIpcBlock(manager, (char*)msg - sizeof(CmiChunkHeader), CmiMyNode()); +} + +CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager*, char* msg, std::size_t len, int node, + int rank = cmi::ipc::nodeDatagram, + int timeout = cmi::ipc::defaultTimeout); + +// deliver a block as a message +void CmiDeliverIpcBlockMsg(CmiIpcBlock*); + +inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) { + using namespace cmi::ipc; + return CpvAccess(kRecommendedCutoff); +} + CsvExtern(CmiIpcManager*, coreIpcManager_); #endif // CONVERSE_H diff --git a/src/cmi-shmem-common.h b/src/cmi-shmem-common.h index 8921aee..9946605 100644 --- a/src/cmi-shmem-common.h +++ b/src/cmi-shmem-common.h @@ -2,7 +2,6 @@ #define CMI_SHMEM_COMMON_HH #include "converse_internal.h" -#include "cmishmem.h" #include #include From 283de73531a00d38103db39209336b79c9c3d9f9 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Sat, 20 Sep 2025 08:49:11 -0500 Subject: [PATCH 07/13] CldHandlerIndex is Cpv variable --- src/cldb.cpp | 2 +- src/cldb.h | 2 +- src/cldb.rand.cpp | 11 ++++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/cldb.cpp b/src/cldb.cpp index 78d7d33..b69561e 100644 --- a/src/cldb.cpp +++ b/src/cldb.cpp @@ -6,8 +6,8 @@ typedef char *BitVector; -/* CpvDeclare(int, CldHandlerIndex); +/* CpvDeclare(int, CldNodeHandlerIndex); CpvDeclare(BitVector, CldPEBitVector); CpvDeclare(int, CldBalanceHandlerIndex); diff --git a/src/cldb.h b/src/cldb.h index 2eb4ad4..f27fc05 100644 --- a/src/cldb.h +++ b/src/cldb.h @@ -5,7 +5,7 @@ #define MAXMSGBFRSIZE 100000 -extern thread_local int CldHandlerIndex; +CpvExtern(int, CldHandlerIndex); extern thread_local int CldNodeHandlerIndex; extern thread_local int CldBalanceHandlerIndex; extern thread_local int CldRelocatedMessages; diff --git a/src/cldb.rand.cpp b/src/cldb.rand.cpp index ccda7d4..3fcc483 100644 --- a/src/cldb.rand.cpp +++ b/src/cldb.rand.cpp @@ -42,7 +42,7 @@ void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); CmiSyncMulticastAndFree(grp, len, msg); @@ -61,7 +61,7 @@ void CldEnqueueWithinNode(void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); CmiWithinNodeBroadcast(len, (char *)msg); @@ -77,7 +77,7 @@ void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); CmiSyncListSendAndFree(npes, pes, len, msg); @@ -112,7 +112,7 @@ void CldEnqueue(int pe, void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); if (pe == CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, msg); @@ -157,7 +157,8 @@ void CldNodeEnqueue(int node, void *msg, int infofn) { } void CldModuleInit(char **argv) { - CldHandlerIndex = CmiRegisterHandler((CmiHandler)CldHandler); + CpvInitialize(int, CldHandlerIndex); + CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler); CldNodeHandlerIndex = CmiRegisterHandler((CmiHandler)CldNodeHandler); CldRelocatedMessages = 0; CldLoadBalanceMessages = 0; From a6c6e28ea1129506fde3306d7861b81437666b7f Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Sat, 20 Sep 2025 08:56:54 -0500 Subject: [PATCH 08/13] cthissuspendable --- include/converse.h | 2 ++ src/threads.cpp | 2 ++ 2 files changed, 4 insertions(+) diff --git a/include/converse.h b/include/converse.h index 85f6f60..64f3bce 100644 --- a/include/converse.h +++ b/include/converse.h @@ -200,6 +200,8 @@ static void CthThreadFree(CthThread t); void CthResume(CthThread t); +int CthIsSuspendable(CthThread t); + void CthSuspend(void); void CthAwaken(CthThread th); diff --git a/src/threads.cpp b/src/threads.cpp index 51668b0..34e39ab 100644 --- a/src/threads.cpp +++ b/src/threads.cpp @@ -309,6 +309,8 @@ void CthResume(CthThread t) { */ } +int CthIsSuspendable(CthThread t) { return B(t)->suspendable; } + /* Suspend: finds the next thread to execute, and resumes it */ From 76b80fef5a42b0855277ae03db0df790a1a7671b Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Sat, 20 Sep 2025 09:19:42 -0500 Subject: [PATCH 09/13] cmake change --- src/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3002702..c87a3eb 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,7 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp scheduler.cpp cpuaffinity.cpp collectives.cpp comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp - conv-rdma.cpp conv-topology.cpp cmishm.cpp cmishmem.cpp) + conv-rdma.cpp conv-topology.cpp cmishmem.cpp) target_include_directories( reconverse PRIVATE $ $) From d1c607e76900d141045a949158ea867a5586802a Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Sun, 21 Sep 2025 12:52:52 -0500 Subject: [PATCH 10/13] small fix --- src/cmishm.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cmishm.cpp b/src/cmishm.cpp index 832be38..57c05e7 100644 --- a/src/cmishm.cpp +++ b/src/cmishm.cpp @@ -192,6 +192,7 @@ static void callbackHandler_(void* msg) { int node = CmiPhysicalNodeID(mine); int first = CmiGetFirstPeOnPhysicalNode(node); auto* pmsg = (pid_message_*)msg; + int key = pmsg->key; if (mine == first) { // if we're still expecting messages: @@ -209,7 +210,7 @@ static void callbackHandler_(void* msg) { CmiFree(msg); } - auto& meta = (CsvAccess(managers_))[(pmsg->key - 1)]; + auto& meta = (CsvAccess(managers_))[(key - 1)]; openAllShared_(meta.get()); awakenSleepers_(); } From 054bdc639aa1a6190f907144ff93d0d2dd0c66d1 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Mon, 22 Sep 2025 12:47:07 -0500 Subject: [PATCH 11/13] fix scheduling and freeing --- src/convcore.cpp | 10 +++++----- src/scheduler.cpp | 7 +++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index 393db74..8352a43 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -394,10 +394,6 @@ void CmiFree(void *msg) { // if(refCount==0) /* Logic error: reference count shouldn't already have been zero */ // CmiAbort("CmiFree reference count was zero-- is this a duplicate free?"); - if (refCount == 1) { - free(BLKSTART(parentBlk)); - } - #ifdef CMK_USE_SHMEM // we should only free _our_ IPC blocks -- so calling CmiFree on // an IPC block issued by another process will cause a bad free! @@ -409,7 +405,11 @@ void CmiFree(void *msg) { CmiFreeIpcBlock(manager, ipc); return; } -#endif + #endif + + if (refCount == 1) { + free(BLKSTART(parentBlk)); + } } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 8368424..37429e8 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -18,6 +18,13 @@ void CsdScheduler() { CcdRaiseCondition(CcdSCHEDLOOP); + #ifdef CMK_USE_SHMEM + CmiIpcBlock* block = CmiPopIpcBlock(CsvAccess(coreIpcManager_)); + if (block != nullptr) { + CmiDeliverIpcBlockMsg(block); + } + #endif + // poll node queue if (!nodeQueue->empty()) { auto result = nodeQueue->pop(); From 0f6ce9564b34c81502861f0290980ea0bbebb9c2 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 1 Oct 2025 08:03:31 -0500 Subject: [PATCH 12/13] Update src/cmishm.cpp Co-authored-by: Justin Szaday --- src/cmishm.cpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/cmishm.cpp b/src/cmishm.cpp index 57c05e7..11c3b63 100644 --- a/src/cmishm.cpp +++ b/src/cmishm.cpp @@ -121,15 +121,11 @@ static void openAllShared_(CmiIpcManager* meta) { DEBUGF(("%d> finished opening all shared\n", meta->mine)); } -// NOTE ( there may be a faster way to do this? ) inline std::size_t whichBin_(std::size_t size) { - std::size_t bin; - for (bin = 0; bin < kNumCutOffPoints; bin++) { - if (size <= kCutOffPoints[bin]) { - break; - } - } - return bin; + const auto* begin = kCutOffPoints; + const auto* end = kCutOffPoints + kNumCutOffPoints; + const auto* it = std::lower_bound(begin, end, size); + return static_cast(it - begin); // returns kNumCutOffPoints if none } static void awakenSleepers_(void) { From d65339c5fa34aa2b40bfd3e8ffe5fbe8d527559c Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 1 Oct 2025 08:12:00 -0500 Subject: [PATCH 13/13] fix pointers in whichBin --- src/cmishm.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmishm.cpp b/src/cmishm.cpp index 11c3b63..5562bfd 100644 --- a/src/cmishm.cpp +++ b/src/cmishm.cpp @@ -122,8 +122,8 @@ static void openAllShared_(CmiIpcManager* meta) { } inline std::size_t whichBin_(std::size_t size) { - const auto* begin = kCutOffPoints; - const auto* end = kCutOffPoints + kNumCutOffPoints; + const auto* begin = kCutOffPoints.data(); + const auto* end = kCutOffPoints.data() + kNumCutOffPoints; const auto* it = std::lower_bound(begin, end, size); return static_cast(it - begin); // returns kNumCutOffPoints if none }