diff --git a/CMakeLists.txt b/CMakeLists.txt index 6d69b25..3fb1530 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,6 +52,7 @@ option(SPANTREE "whether to enable spanning tree collectives" OFF) #should turn # Reconverse uses SMP always, but turn on this option for backwards compatibility (see pingpong_multipairs for example) option(CMK_SMP "whether to enable SMP support" ON) option(CMK_CPV_IS_SMP "whether to enable SMP support for cpvs" ON) +option(CMK_USE_SHMEM "whether to use POSIX shared memory for IPC" OFF) option(RECONVERSE_ATOMIC_QUEUE "whether to use atomic queue" ON) diff --git a/include/converse.h b/include/converse.h index 85bb459..1f15baf 100644 --- a/include/converse.h +++ b/include/converse.h @@ -8,6 +8,9 @@ #include #include #include +#include +#include +#include using CmiInt1 = std::int8_t; using CmiInt2 = std::int16_t; @@ -199,6 +202,8 @@ void CthFree(CthThread t); void CthResume(CthThread t); +int CthIsSuspendable(CthThread t); + void CthSuspend(void); void CthAwaken(CthThread th); @@ -324,6 +329,7 @@ void CmiSyncSendAndFree(int destPE, int messageSize, void *msg); void CmiSyncListSend(int npes, const int *pes, int len, void *msg); void CmiSyncListSendAndFree(int npes, const int *pes, int len, void *msg); void CmiPushPE(int destPE, void *msg); +void CmiPushNode(void *msg); void CmiSyncSendFn(int destPE, int messageSize, char *msg); void CmiFreeSendFn(int destPE, int messageSize, char *msg); @@ -399,15 +405,25 @@ int CmiError(const char *format, ...); void CmiInitCPUTopology(char **argv); void CmiInitCPUAffinity(char **argv); -void __CmiEnforceMsgHelper(const char *expr, const char *fileName, int lineNum, - const char *msg, ...); +#define __CMK_STRING(x) #x +#define __CMK_XSTRING(x) __CMK_STRING(x) -#define CmiEnforce(condition) \ - do { \ - if (!(condition)) { \ - __CmiEnforceMsgHelper(#condition, __FILE__, __LINE__, ""); \ - } \ - } while (0) +void __CmiEnforceHelper(const char* expr, const char* fileName, const char* lineNum); +void __CmiEnforceMsgHelper(const char* expr, const char* fileName, + const char* lineNum, const char* msg, ...); + +#define CmiEnforce(expr) \ + ((void)(CMI_LIKELY(expr) ? 0 \ + : (__CmiEnforceHelper(__CMK_STRING(expr), __FILE__, \ + __CMK_XSTRING(__LINE__)), \ + 0))) + +#define CmiEnforceMsg(expr, ...) \ + ((void)(CMI_LIKELY(expr) \ + ? 0 \ + : (__CmiEnforceMsgHelper(__CMK_STRING(expr), __FILE__, \ + __CMK_XSTRING(__LINE__), __VA_ARGS__), \ + 0))) double getCurrentTime(void); double CmiWallTimer(void); @@ -1045,4 +1061,103 @@ void CmiInterSyncNodeSendAndFreeFn(int destNode, int partition, int messageSize, /* end of variables and functions for partition */ +#define CMI_IPC_CUTOFF_ARG "ipccutoff" +#define CMI_IPC_CUTOFF_DESC "max message size for cmi-shmem (in bytes)" +#define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize" +#define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)" + +struct CmiIpcManager; + +namespace cmi { +namespace ipc { +// recommended cutoff for block sizes +CpvExtern(std::size_t, kRecommendedCutoff); +// used to represent an empty linked list +constexpr auto nil = std::uintptr_t(0); +// used to represent the tail of a linked list +constexpr auto max = std::numeric_limits::max(); +// used to indicate a message bound for a node +constexpr auto nodeDatagram = std::numeric_limits::max(); +// default number of attempts to alloc before timing out +constexpr auto defaultTimeout = 4; +} // namespace ipc +} // namespace cmi + +// alignas is used for padding here, rather than for alignment of the +// CmiIpcBlock itself. +struct alignas(ALIGN_BYTES) CmiIpcBlock { +public: + // "home" rank of the block + int src; + int dst; + std::uintptr_t orig; + std::uintptr_t next; + std::size_t size; + + CmiIpcBlock(std::size_t size_, std::uintptr_t orig_) + : orig(orig_), next(cmi::ipc::nil), size(size_) {} +}; + +enum CmiIpcAllocStatus { + CMI_IPC_OUT_OF_MEMORY, + CMI_IPC_REMOTE_DESTINATION, + CMI_IPC_SUCCESS, + CMI_IPC_TIMEOUT +}; + +// sets up ipc environment +void CmiIpcInit(char** argv); + +// creates an ipc manager, waking the thread when it's done +// ( this must be called in the same order on all pes! ) +CmiIpcManager* CmiMakeIpcManager(CthThread th); + +// push/pop blocks from the manager's send/recv queue +bool CmiPushIpcBlock(CmiIpcManager*, CmiIpcBlock*); +CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager*); + +// tries to allocate a block, returning null if unsucessful +// (fails when other PEs are contending resources) +// second value of pair indicates failure cause +std::pair CmiAllocIpcBlock(CmiIpcManager*, int node, std::size_t size); + +// frees a block -- enabling it to be used again +void CmiFreeIpcBlock(CmiIpcManager*, CmiIpcBlock*); + +// currently a no-op but may be eventually usable +// intended to "capture" blocks from remote pes +inline void CmiCacheIpcBlock(CmiIpcBlock*) { return; } + +// identifies whether a void* is the payload of a block +// belonging to the given node +CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager*, void*, int node); + +// if (init) is true -- initializes the +// memory segment for use as a message +void* CmiIpcBlockToMsg(CmiIpcBlock*, bool init); + +// equivalent to calling above with (init = false) +inline void* CmiIpcBlockToMsg(CmiIpcBlock* block) { + auto res = (char*)block + sizeof(CmiIpcBlock) + sizeof(CmiChunkHeader); + return (void*)res; +} + +inline CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, void* msg) { + return CmiIsIpcBlock(manager, (char*)msg - sizeof(CmiChunkHeader), CmiMyNode()); +} + +CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager*, char* msg, std::size_t len, int node, + int rank = cmi::ipc::nodeDatagram, + int timeout = cmi::ipc::defaultTimeout); + +// deliver a block as a message +void CmiDeliverIpcBlockMsg(CmiIpcBlock*); + +inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) { + using namespace cmi::ipc; + return CpvAccess(kRecommendedCutoff); +} + +CsvExtern(CmiIpcManager*, coreIpcManager_); + #endif // CONVERSE_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 811834a..447ed26 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,7 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp scheduler.cpp cpuaffinity.cpp collectives.cpp comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp - conv-rdma.cpp conv-topology.cpp msgmgr.cpp) + conv-rdma.cpp conv-topology.cpp msgmgr.cpp cmishmem.cpp) target_include_directories( reconverse PRIVATE $ $) diff --git a/src/cldb.cpp b/src/cldb.cpp index 78d7d33..b69561e 100644 --- a/src/cldb.cpp +++ b/src/cldb.cpp @@ -6,8 +6,8 @@ typedef char *BitVector; -/* CpvDeclare(int, CldHandlerIndex); +/* CpvDeclare(int, CldNodeHandlerIndex); CpvDeclare(BitVector, CldPEBitVector); CpvDeclare(int, CldBalanceHandlerIndex); diff --git a/src/cldb.h b/src/cldb.h index 2eb4ad4..f27fc05 100644 --- a/src/cldb.h +++ b/src/cldb.h @@ -5,7 +5,7 @@ #define MAXMSGBFRSIZE 100000 -extern thread_local int CldHandlerIndex; +CpvExtern(int, CldHandlerIndex); extern thread_local int CldNodeHandlerIndex; extern thread_local int CldBalanceHandlerIndex; extern thread_local int CldRelocatedMessages; diff --git a/src/cldb.rand.cpp b/src/cldb.rand.cpp index ccda7d4..3fcc483 100644 --- a/src/cldb.rand.cpp +++ b/src/cldb.rand.cpp @@ -42,7 +42,7 @@ void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); CmiSyncMulticastAndFree(grp, len, msg); @@ -61,7 +61,7 @@ void CldEnqueueWithinNode(void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); CmiWithinNodeBroadcast(len, (char *)msg); @@ -77,7 +77,7 @@ void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); CmiSyncListSendAndFree(npes, pes, len, msg); @@ -112,7 +112,7 @@ void CldEnqueue(int pe, void *msg, int infofn) { pfn(&msg); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); } - CldSwitchHandler((char *)msg, CldHandlerIndex); + CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex)); CmiSetInfo(msg, infofn); if (pe == CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, msg); @@ -157,7 +157,8 @@ void CldNodeEnqueue(int node, void *msg, int infofn) { } void CldModuleInit(char **argv) { - CldHandlerIndex = CmiRegisterHandler((CmiHandler)CldHandler); + CpvInitialize(int, CldHandlerIndex); + CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler); CldNodeHandlerIndex = CmiRegisterHandler((CmiHandler)CldNodeHandler); CldRelocatedMessages = 0; CldLoadBalanceMessages = 0; diff --git a/src/cmi-shmem-common.h b/src/cmi-shmem-common.h new file mode 100644 index 0000000..9946605 --- /dev/null +++ b/src/cmi-shmem-common.h @@ -0,0 +1,131 @@ +#ifndef CMI_SHMEM_COMMON_HH +#define CMI_SHMEM_COMMON_HH + +#include "converse_internal.h" + +#include +#include +#include +#include +#include + + +namespace cmi { +namespace ipc { +CpvDeclare(std::size_t, kRecommendedCutoff); +} +} // namespace cmi + +CpvStaticDeclare(std::size_t, kSegmentSize); +constexpr std::size_t kDefaultSegmentSize = 8 * 1024 * 1024; + +constexpr std::size_t kNumCutOffPoints = 25; +const std::array kCutOffPoints = { + 64, 128, 256, 512, 1024, 2048, 4096, + 8192, 16384, 32768, 65536, 131072, 262144, 524288, + 1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864, + 134217728, 268435456, 536870912, 1073741824}; + +struct ipc_metadata_; +CsvStaticDeclare(CmiNodeLock, sleeper_lock); + +using sleeper_map_t = std::vector; +CsvStaticDeclare(sleeper_map_t, sleepers); + +// the data each pe shares with its peers +// contains pool of free blocks, heap, and receive queue +struct ipc_shared_ { + std::array, kNumCutOffPoints> free; + std::atomic queue; + std::atomic heap; + std::uintptr_t max; + + ipc_shared_(std::uintptr_t begin, std::uintptr_t end) + : queue(cmi::ipc::max), heap(begin), max(end) { + for (auto& f : this->free) { + f.store(cmi::ipc::max); + } + } +}; + +// shared data for each pe +struct ipc_metadata_ { + // maps ranks to shared segments + std::map shared; + // physical node rank + int mine; + // key of this instance + std::size_t key; + // base constructor + ipc_metadata_(std::size_t key_) : mine(CmiMyNode()), key(key_) {} + // virtual destructor may be needed + virtual ~ipc_metadata_() {} +}; + +inline std::size_t whichBin_(std::size_t size); + +inline static void initIpcShared_(ipc_shared_* shared) { + auto begin = (std::uintptr_t)(sizeof(ipc_shared_) + + (sizeof(ipc_shared_) % ALIGN_BYTES)); + CmiAssert(begin != cmi::ipc::nil); + auto end = begin + CpvAccess(kSegmentSize); + new (shared) ipc_shared_(begin, end); +} + +inline static ipc_shared_* makeIpcShared_(void) { + auto* shared = (ipc_shared_*)(::operator new(sizeof(ipc_shared_) + + CpvAccess(kSegmentSize))); + initIpcShared_(shared); + return shared; +} + +inline void initSegmentSize_(char** argv) { + using namespace cmi::ipc; + CpvInitialize(std::size_t, kRecommendedCutoff); + CpvInitialize(std::size_t, kSegmentSize); + + CmiInt8 value; + auto flag = + CmiGetArgLongDesc(argv, "++" CMI_IPC_POOL_SIZE_ARG, &value, CMI_IPC_POOL_SIZE_DESC); + CpvAccess(kSegmentSize) = flag ? (std::size_t)value : kDefaultSegmentSize; + CmiEnforceMsg(CpvAccess(kSegmentSize), "segment size must be non-zero!"); + if (CmiGetArgLongDesc(argv, "++" CMI_IPC_CUTOFF_ARG, &value, CMI_IPC_CUTOFF_DESC)) { + auto bin = whichBin_((std::size_t)value); + CmiEnforceMsg(bin < kNumCutOffPoints, "ipc cutoff out of range!"); + CpvAccess(kRecommendedCutoff) = kCutOffPoints[bin]; + } else { + auto max = CpvAccess(kSegmentSize) / kNumCutOffPoints; + auto bin = (std::intptr_t)whichBin_(max) - 1; + CpvAccess(kRecommendedCutoff) = kCutOffPoints[(bin >= 0) ? bin : 0]; + } +} + +inline static void printIpcStartupMessage_(const char* implName) { + using namespace cmi::ipc; + CmiPrintf("Converse> %s pool init'd with %luB segment and %luB cutoff.\n", + implName, CpvAccess(kSegmentSize), + CpvAccess(kRecommendedCutoff)); +} + +inline static void initSleepers_(void) { + if (CmiMyRank() == 0) { + CsvInitialize(sleeper_map_t, sleepers); + CsvAccess(sleepers).resize(CmiMyNodeSize()); + CsvInitialize(CmiNodeLock, sleeper_lock); + CsvAccess(sleeper_lock) = CmiCreateLock(); + } +} + +inline static void putSleeper_(CthThread th) { + CmiLock(CsvAccess(sleeper_lock)); + (CsvAccess(sleepers))[CmiMyRank()] = th; + CmiUnlock(CsvAccess(sleeper_lock)); +} + +static void awakenSleepers_(void); + +using ipc_manager_ptr_ = std::unique_ptr; +using ipc_manager_map_ = std::vector; +CsvStaticDeclare(ipc_manager_map_, managers_); + +#endif diff --git a/src/cmishm.cpp b/src/cmishm.cpp new file mode 100644 index 0000000..5562bfd --- /dev/null +++ b/src/cmishm.cpp @@ -0,0 +1,259 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cmi-shmem-common.h" +#include + +CpvStaticDeclare(int, num_cbs_recvd); +CpvStaticDeclare(int, num_cbs_exptd); +CpvStaticDeclare(int, handle_callback); +CpvStaticDeclare(int, handle_node_pid); +CsvStaticDeclare(pid_t, node_pid); +CpvExtern(int, CthResumeNormalThreadIdx); + +static int sendPid_(CmiIpcManager*); +static void openAllShared_(CmiIpcManager*); + +struct pid_message_ { + char core[CmiMsgHeaderSizeBytes]; + std::size_t key; + pid_t pid; +}; + +#define CMI_SHARED_FMT "cmi_pid%lu_node%d_shared_" + +// opens a shared memory segment for a given physical rank +static std::pair openShared_(int node) { + // determine the size of the shared segment + // (adding the size of the queues and what nots) + auto size = CpvAccess(kSegmentSize) + sizeof(ipc_shared_); + // generate a name for this pe + auto slen = snprintf(NULL, 0, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), node); + auto name = new char[slen]; + snprintf(name, slen, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), node); + DEBUGF(("%d> opening share %s\n", CmiMyPe(), name)); + // try opening the share exclusively + auto fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, 0666); + // if we succeed, we're the first accessor, so: + if (fd >= 0) { + // truncate it to the correct size + auto status = ftruncate(fd, size); + CmiAssert(status >= 0); + } else { + // otherwise just open it + fd = shm_open(name, O_RDWR, 0666); + CmiAssert(fd >= 0); + } + // then delete the name + delete[] name; + // map the segment to an address: + auto* res = (ipc_shared_*)mmap(nullptr, size, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + CmiAssert(res != MAP_FAILED); + // return the file descriptor/shared + return std::make_pair(fd, res); +} + +struct CmiIpcManager : public ipc_metadata_ { + std::map fds; + + CmiIpcManager(std::size_t key) : ipc_metadata_(key) { + auto firstPe = CmiNodeFirst(CmiMyNode()); + auto thisRank = CmiPhysicalRank(firstPe); + if (thisRank == 0) { + if (sendPid_(this) == 1) { + openAllShared_(this); + awakenSleepers_(); + } + } + } + + virtual ~CmiIpcManager() { + auto& size = CpvAccess(kSegmentSize); + // for each rank/descriptor pair + for (auto& pair : this->fds) { + auto& proc = pair.first; + auto& fd = pair.second; + // unmap the memory segment + munmap(this->shared[proc], size); + // close the file + close(fd); + // unlinking the shm segment for our pe + if (proc == this->mine) { + auto slen = + snprintf(NULL, 0, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), proc); + auto name = new char[slen]; + snprintf(name, slen, CMI_SHARED_FMT, (std::size_t)CsvAccess(node_pid), proc); + shm_unlink(name); + delete[] name; + } + } + } +}; + +static void openAllShared_(CmiIpcManager* meta) { + int* pes; + int nPes; + int thisNode = CmiPhysicalNodeID(CmiMyPe()); + CmiGetPesOnPhysicalNode(thisNode, &pes, &nPes); + int nSize = CmiMyNodeSize(); + int nProcs = nPes / nSize; + // for each rank in this physical node: + for (auto rank = 0; rank < nProcs; rank++) { + // open its shared segment + auto pe = pes[rank * nSize]; + auto proc = CmiNodeOf(pe); + auto res = openShared_(proc); + // initializing it if it's ours + if (proc == meta->mine) initIpcShared_(res.second); + // store the retrieved data + meta->fds[proc] = res.first; + meta->shared[proc] = res.second; + } + DEBUGF(("%d> finished opening all shared\n", meta->mine)); +} + +inline std::size_t whichBin_(std::size_t size) { + const auto* begin = kCutOffPoints.data(); + const auto* end = kCutOffPoints.data() + kNumCutOffPoints; + const auto* it = std::lower_bound(begin, end, size); + return static_cast(it - begin); // returns kNumCutOffPoints if none +} + +static void awakenSleepers_(void) { + auto& current_sleepers = CsvAccess(sleepers); + for (auto i = 0; i < current_sleepers.size(); i++) { + auto& th = current_sleepers[i]; + if (i == CmiMyRank()) { + CthAwaken(th); + } else { + auto* token = CthGetToken(th); + CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx)); + CmiPushPE(i, token); + } + } +} + +// returns number of processes in node +int procBroadcastAndFree_(char* msg, std::size_t size) { + int* pes; + int nPes; + int thisPe = CmiMyPe(); + int thisNode = CmiPhysicalNodeID(thisPe); + CmiGetPesOnPhysicalNode(thisNode, &pes, &nPes); + int nSize = CmiMyNodeSize(); + int nProcs = nPes / nSize; + CmiAssert(thisPe == pes[0]); + + CpvAccess(num_cbs_exptd) = nProcs - 1; + for (auto rank = 1; rank < nProcs; rank++) { + auto& pe = pes[rank * nSize]; + if (rank == (nProcs - 1)) { + CmiSyncSendAndFree(pe, size, msg); + } else { + CmiSyncSend(pe, size, msg); + } + } + + // free if we didn't send anything + if (nProcs == 1) { + CmiFree(msg); + } + + return nProcs; +} + +static int sendPid_(CmiIpcManager* manager) { + CsvInitialize(pid_t, node_pid); + CsvAccess(node_pid) = getpid(); + + auto* pmsg = (pid_message_*)CmiAlloc(sizeof(pid_message_)); + CmiSetHandler(pmsg, CpvAccess(handle_node_pid)); + pmsg->key = manager->key; + pmsg->pid = CsvAccess(node_pid); + + return procBroadcastAndFree_((char*)pmsg, sizeof(pid_message_)); +} + +static void callbackHandler_(void* msg) { + int mine = CmiMyPe(); + int node = CmiPhysicalNodeID(mine); + int first = CmiGetFirstPeOnPhysicalNode(node); + auto* pmsg = (pid_message_*)msg; + int key = pmsg->key; + + if (mine == first) { + // if we're still expecting messages: + if (++(CpvAccess(num_cbs_recvd)) < CpvAccess(num_cbs_exptd)) { + // free this one + CmiFree(msg); + // and move along + return; + } else { + // otherwise -- tell everyone we're ready! + printIpcStartupMessage_("pxshm"); + procBroadcastAndFree_((char*)msg, sizeof(pid_message_)); + } + } else { + CmiFree(msg); + } + + auto& meta = (CsvAccess(managers_))[(key - 1)]; + openAllShared_(meta.get()); + awakenSleepers_(); +} + +static void nodePidHandler_(void* msg) { + auto* pmsg = (pid_message_*)msg; + CsvInitialize(pid_t, node_pid); + CsvAccess(node_pid) = pmsg->pid; + + int node = CmiPhysicalNodeID(CmiMyPe()); + int root = CmiGetFirstPeOnPhysicalNode(node); + CmiSetHandler(msg, CpvAccess(handle_callback)); + CmiSyncSendAndFree(root, sizeof(pid_message_), (char*)msg); +} + +void CmiIpcInit(char** argv) { + CsvInitialize(ipc_manager_map_, managers_); + + initSleepers_(); + initSegmentSize_(argv); + + CpvInitialize(int, num_cbs_recvd); + CpvInitialize(int, num_cbs_exptd); + CpvInitialize(int, handle_callback); + CpvAccess(handle_callback) = CmiRegisterHandler(callbackHandler_); + CpvInitialize(int, handle_node_pid); + CpvAccess(handle_node_pid) = CmiRegisterHandler(nodePidHandler_); +} + +CmiIpcManager* CmiMakeIpcManager(CthThread th) { + CpvAccess(num_cbs_recvd) = CpvAccess(num_cbs_exptd) = 0; + + putSleeper_(th); + + // ensure all sleepers are reg'd + CmiNodeAllBarrier(); + + if (CmiMyRank() == 0) { + auto key = CsvAccess(managers_).size() + 1; + auto* manager = new CmiIpcManager(key); + CsvAccess(managers_).emplace_back(manager); + // signal the metadata is ready + CmiNodeAllBarrier(); + return manager; + } else { + // pause until the metadata is ready + CmiNodeAllBarrier(); + return CsvAccess(managers_).back().get(); + } +} diff --git a/src/cmishmem.cpp b/src/cmishmem.cpp new file mode 100644 index 0000000..b5bcc3a --- /dev/null +++ b/src/cmishmem.cpp @@ -0,0 +1,211 @@ +#include "cmi-shmem-common.h" + +#if CMK_HAS_XPMEM +#include "cmixpmem.cpp" +#else +#include "cmishm.cpp" +#endif + +#define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic*)msg)->destPE +extern void CmiPushNode(void* msg); + +CpvExtern(int, CthResumeNormalThreadIdx); + +inline std::size_t whichBin_(std::size_t size); +inline static CmiIpcBlock* popBlock_(std::atomic& head, + void* base); +inline static bool pushBlock_(std::atomic& head, + std::uintptr_t value, void* base); +static std::uintptr_t allocBlock_(ipc_shared_* meta, std::size_t size); + +void* CmiIpcBlockToMsg(CmiIpcBlock* block, bool init) { + auto* msg = (char*)CmiIpcBlockToMsg(block); + if (init) { + // NOTE ( this is identical to code in CmiAlloc ) + CmiAssert(((uintptr_t)msg % ALIGN_BYTES) == 0); + CMI_ZC_MSGTYPE((void*) msg) = CMK_REG_NO_ZC_MSG; + CMI_MSG_NOKEEP((void*) msg) = 0; + SIZEFIELD(msg) = block->size; + REFFIELDSET(msg, 1); + } + return msg; +} + +CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, char* src, std::size_t len, + int node, int rank, int timeout) { + char* dst; + CmiIpcBlock* block; + // check whether we miraculously got a usable block + if ((block = CmiIsIpcBlock(manager, BLKSTART(src), node)) && (node == block->src)) { + dst = src; + } else { + std::pair status; + // we only want to attempt again if we fail due to a timeout: + if (timeout > 0) { + do { + status = CmiAllocIpcBlock(manager, node, len + sizeof(CmiChunkHeader)); + } while ((--timeout) && (status.second == CMI_IPC_TIMEOUT)); + } else { + do { + status = CmiAllocIpcBlock(manager, node, len + sizeof(CmiChunkHeader)); + // never give up, never surrender! + } while (status.second == CMI_IPC_TIMEOUT); + } + // grab the block from the rval + block = status.first; + if (block == nullptr) { + return nullptr; + } else { + CmiAssert((block->dst == manager->mine) && (manager->mine == CmiMyNode())); + dst = (char*)CmiIpcBlockToMsg(block, true); + memcpy(dst, src, len); + CmiFree(src); + } + } + CMI_DEST_RANK(dst) = rank; + return block; +} + +extern void CmiHandleImmediateMessage(void *msg); + +void CmiDeliverIpcBlockMsg(CmiIpcBlock* block) { + auto* msg = CmiIpcBlockToMsg(block); + auto& rank = CMI_DEST_RANK(msg); + if (rank == cmi::ipc::nodeDatagram) { + CmiPushNode(msg); + } else + CmiPushPE(rank, msg); +} + +inline static bool metadataReady_(CmiIpcManager* meta) { + return meta && meta->shared[meta->mine]; +} + +CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager* meta) { + if (metadataReady_(meta)) { + auto& shared = meta->shared[meta->mine]; + return popBlock_(shared->queue, shared); + } else { + return nullptr; + } +} + +bool CmiPushIpcBlock(CmiIpcManager* meta, CmiIpcBlock* block) { + auto& shared = meta->shared[block->src]; + auto& queue = shared->queue; + CmiAssert(meta->mine == block->dst); + return pushBlock_(queue, block->orig, shared); +} + +std::pair CmiAllocIpcBlock(CmiIpcManager* meta, int dstProc, std::size_t size) { + auto dstNode = CmiPhysicalNodeID(CmiNodeFirst(dstProc)); + auto thisPe = CmiMyPe(); + auto thisProc = CmiMyNode(); + auto thisNode = CmiPhysicalNodeID(thisPe); + if ((thisProc == dstProc) || (thisNode != dstNode)) { + return std::make_pair((CmiIpcBlock*)nullptr, CMI_IPC_REMOTE_DESTINATION); + } + + auto& shared = meta->shared[dstProc]; + auto bin = whichBin_(size); + CmiAssert(bin < kNumCutOffPoints); + + auto* block = popBlock_(shared->free[bin], shared); + if (block == nullptr) { + auto totalSize = kCutOffPoints[bin]; + auto offset = allocBlock_(shared, totalSize); + switch (offset) { + case cmi::ipc::nil: + return std::make_pair((CmiIpcBlock*)nullptr, CMI_IPC_TIMEOUT); + case cmi::ipc::max: + return std::make_pair((CmiIpcBlock*)nullptr, CMI_IPC_OUT_OF_MEMORY); + default: + break; + } + // the block's address is relative to the share + block = (CmiIpcBlock*)((char*)shared + offset); + CmiAssert(((std::uintptr_t)block % alignof(CmiIpcBlock)) == 0); + // construct the block + new (block) CmiIpcBlock(totalSize, offset); + } + + block->src = dstProc; + block->dst = thisProc; + + return std::make_pair(block, CMI_IPC_SUCCESS); +} + +void CmiFreeIpcBlock(CmiIpcManager* meta, CmiIpcBlock* block) { + auto bin = whichBin_(block->size); + CmiAssert(bin < kNumCutOffPoints); + auto& shared = meta->shared[block->src]; + auto& free = shared->free[bin]; + while (!pushBlock_(free, block->orig, shared)) + ; +} + +CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager* meta, void* addr, int node) { + auto* shared = meta ? meta->shared[node] : nullptr; + if (shared == nullptr) { + return nullptr; + } + auto* begin = (char*)shared; + auto* end = begin + shared->max; + if (begin < addr && addr < end) { + return (CmiIpcBlock*)((char*)addr - sizeof(CmiIpcBlock)); + } else { + return nullptr; + } +} + +static std::uintptr_t allocBlock_(ipc_shared_* meta, std::size_t size) { + auto res = meta->heap.exchange(cmi::ipc::nil, std::memory_order_acquire); + if (res == cmi::ipc::nil) { + return cmi::ipc::nil; + } else { + auto next = res + size + sizeof(CmiIpcBlock); + auto offset = size % alignof(CmiIpcBlock); + auto oom = next >= meta->max; + auto value = oom ? res : (next + offset); + auto status = meta->heap.exchange(value, std::memory_order_release); + CmiAssert(status == cmi::ipc::nil); + if (oom) { + return cmi::ipc::max; + } else { + return res; + } + } +} + +inline static CmiIpcBlock* popBlock_(std::atomic& head, + void* base) { + auto prev = head.exchange(cmi::ipc::nil, std::memory_order_acquire); + if (prev == cmi::ipc::nil) { + return nullptr; + } else if (prev == cmi::ipc::max) { + auto check = head.exchange(prev, std::memory_order_release); + CmiAssert(check == cmi::ipc::nil); + return nullptr; + } else { + // translate the "home" PE's address into a local one + CmiAssert(((std::uintptr_t)base % ALIGN_BYTES) == 0); + auto* xlatd = (CmiIpcBlock*)((char*)base + prev); + auto check = head.exchange(xlatd->next, std::memory_order_release); + CmiAssert(check == cmi::ipc::nil); + return xlatd; + } +} + +inline static bool pushBlock_(std::atomic& head, + std::uintptr_t value, void* base) { + CmiAssert(value != cmi::ipc::nil); + auto prev = head.exchange(cmi::ipc::nil, std::memory_order_acquire); + if (prev == cmi::ipc::nil) { + return false; + } + auto* block = (CmiIpcBlock*)((char*)base + value); + block->next = prev; + auto check = head.exchange(value, std::memory_order_release); + CmiAssert(check == cmi::ipc::nil); + return true; +} diff --git a/src/cmixpmem.cpp b/src/cmixpmem.cpp new file mode 100644 index 0000000..b895887 --- /dev/null +++ b/src/cmixpmem.cpp @@ -0,0 +1,196 @@ +#include "cmi-shmem-common.h" +#include +#include +#include + +extern "C" { +#include +} + +// "borrowed" from VADER +// (https://github.com/open-mpi/ompi/tree/386ba164557bb8115131921041757be94a989646/opal/mca/smsc/xpmem) +#define OPAL_DOWN_ALIGN(x, a, t) ((x) & ~(((t)(a)-1))) +#define OPAL_DOWN_ALIGN_PTR(x, a, t) \ + ((t)OPAL_DOWN_ALIGN((uintptr_t)x, a, uintptr_t)) +#define OPAL_ALIGN(x, a, t) (((x) + ((t)(a)-1)) & ~(((t)(a)-1))) +#define OPAL_ALIGN_PTR(x, a, t) ((t)OPAL_ALIGN((uintptr_t)x, a, uintptr_t)) +#define OPAL_ALIGN_PAD_AMOUNT(x, s) \ + ((~((uintptr_t)(x)) + 1) & ((uintptr_t)(s)-1)) + +CpvStaticDeclare(int, handle_init); + +struct init_msg_ { + char core[CmiMsgHeaderSizeBytes]; + std::size_t key; + int from; + xpmem_segid_t segid; + ipc_shared_* shared; +}; + +// NOTE ( we should eventually detach xpmem segments at close ) +// ( it's not urgently needed since xpmem does it for us ) +struct CmiIpcManager : public ipc_metadata_ { + // maps ranks to segments + std::map segments; + // maps segments to xpmem apids + std::map instances; + // number of physical peers + int nPeers; + // create our local shared data + CmiIpcManager(std::size_t key) : ipc_metadata_(key) { + this->shared[this->mine] = makeIpcShared_(); + } + + void put_segment(int proc, const xpmem_segid_t& segid) { + auto ins = this->segments.emplace(proc, segid); + CmiAssert(ins.second); + } + + xpmem_segid_t get_segment(int proc) { + auto search = this->segments.find(proc); + if (search == std::end(this->segments)) { + if (mine == proc) { + auto segid = + xpmem_make(0, XPMEM_MAXADDR_SIZE, XPMEM_PERMIT_MODE, (void*)0666); + this->put_segment(mine, segid); + return segid; + } else { + return -1; + } + } else { + return search->second; + } + } + + xpmem_apid_t get_instance(int proc) { + auto segid = this->get_segment(proc); + if (segid >= 0) { + auto search = this->instances.find(segid); + if (search == std::end(this->instances)) { + auto apid = xpmem_get(segid, XPMEM_RDWR, XPMEM_PERMIT_MODE, NULL); + CmiAssertMsg(apid >= 0, "invalid segid?"); + auto ins = this->instances.emplace(segid, apid); + CmiAssert(ins.second); + search = ins.first; + } + return search->second; + } else { + return -1; + } + } +}; + +void* translateAddr_(ipc_manager_ptr_& meta, int proc, void* remote_ptr, + const std::size_t& size) { + if (proc == meta->mine) { + return remote_ptr; + } else { + auto apid = meta->get_instance(proc); + CmiAssert(apid >= 0); + // this magic was borrowed from VADER + uintptr_t attach_align = 1 << 23; + auto base = OPAL_DOWN_ALIGN_PTR(remote_ptr, attach_align, uintptr_t); + auto bound = + OPAL_ALIGN_PTR(remote_ptr + size - 1, attach_align, uintptr_t) + 1; + + using offset_type = decltype(xpmem_addr::offset); + xpmem_addr addr{.apid = apid, .offset = (offset_type)base}; + auto* ctx = xpmem_attach(addr, bound - base, NULL); + CmiAssert(ctx != (void*)-1); + + return (void*)((uintptr_t)ctx + + (ptrdiff_t)((uintptr_t)remote_ptr - (uintptr_t)base)); + } +} + +static void handleInitialize_(void* msg) { + auto* imsg = (init_msg_*)msg; + auto& meta = (CsvAccess(managers_))[(imsg->key - 1)]; + // extract the segment id and shared region + // from the msg (registering it in our metadata) + meta->put_segment(imsg->from, imsg->segid); + meta->shared[imsg->from] = (ipc_shared_*)translateAddr_( + meta, imsg->from, imsg->shared, sizeof(ipc_shared_)); + // then free the message + CmiFree(imsg); + // if we received messages from all our peers: + if (meta->nPeers == meta->shared.size()) { + // resume the sleeping thread + if (CmiMyPe() == 0) { + printIpcStartupMessage_("xpmem"); + } + + awakenSleepers_(); + } +} + +void CmiIpcInit(char** argv) { + CsvInitialize(ipc_manager_map_, managers_); + + initSleepers_(); + initSegmentSize_(argv); + + CpvInitialize(int, handle_init); + CpvAccess(handle_init) = CmiRegisterHandler(handleInitialize_); +} + +CmiIpcManager* CmiMakeIpcManager(CthThread th) { + putSleeper_(th); + + // ensure all sleepers are reg'd + CmiNodeAllBarrier(); + + CmiIpcManager* meta; + if (CmiMyRank() == 0) { + auto key = CsvAccess(managers_).size() + 1; + meta = new CmiIpcManager(key); + CsvAccess(managers_).emplace_back(meta); + } else { + // pause until the metadata is ready + CmiNodeAllBarrier(); + return CsvAccess(managers_).back().get(); + } + + int* pes; + int nPes; + auto thisPe = CmiMyPe(); + auto thisNode = CmiPhysicalNodeID(CmiMyPe()); + CmiGetPesOnPhysicalNode(thisNode, &pes, &nPes); + auto nSize = CmiMyNodeSize(); + auto nProcs = nPes / nSize; + meta->nPeers = nProcs; + + if (nProcs > 1) { + auto* imsg = (init_msg_*)CmiAlloc(sizeof(init_msg_)); + CmiSetHandler(imsg, CpvAccess(handle_init)); + imsg->key = meta->key; + imsg->from = meta->mine; + imsg->segid = meta->get_segment(meta->mine); + imsg->shared = meta->shared[meta->mine]; + // send messages to all the pes on this node + for (auto i = 0; i < nProcs; i++) { + auto& pe = pes[i * nSize]; + auto last = i == (nProcs - 1); + if (pe == thisPe) { + if (last) { + CmiFree(imsg); + } + continue; + } else if (last) { + // free'ing with the last send + CmiSyncSendAndFree(pe, sizeof(init_msg_), (char*)imsg); + } else { + // then sending (without free) otherwise + CmiSyncSend(pe, sizeof(init_msg_), (char*)imsg); + } + } + } else { + // single process -- wake up sleeping thread(s) + awakenSleepers_(); + } + + // signal that the metadata is ready + CmiNodeAllBarrier(); + + return meta; +} diff --git a/src/conv-topology.cpp b/src/conv-topology.cpp index be577fa..986e324 100644 --- a/src/conv-topology.cpp +++ b/src/conv-topology.cpp @@ -245,20 +245,6 @@ using namespace CpuTopoDetails; // } static std::atomic cpuTopoSyncHandlerDone{}; -# if CMK_SMP && !CMK_SMP_NO_COMMTHD -extern void CommunicationServerThread(int sleepTime); -static std::atomic cpuTopoSyncCommThreadDone{}; -# endif - -# if CMK_SMP && !CMK_SMP_NO_COMMTHD -static void cpuTopoSyncWaitCommThread(std::atomic& done) -{ - do CommunicationServerThread(5); - while (!done.load()); - - CommunicationServerThread(5); -} -# endif static void cpuTopoSyncWait(std::atomic& done) { @@ -526,13 +512,6 @@ void LrtsInitCpuTopo(char** argv) return; } -# if CMK_SMP && !CMK_SMP_NO_COMMTHD - if (CmiInCommThread()) - { - cpuTopoSyncWaitCommThread(cpuTopoSyncCommThreadDone); - } - else -# endif { /* prepare a msg to send */ hostnameMsg* msg = (hostnameMsg*)CmiAlloc(sizeof(hostnameMsg) + sizeof(_procInfo)); @@ -560,9 +539,6 @@ void LrtsInitCpuTopo(char** argv) CsdSchedulePoll(); } -# if CMK_SMP && !CMK_SMP_NO_COMMTHD - cpuTopoSyncCommThreadDone = true; -# endif } } diff --git a/src/convcore.cpp b/src/convcore.cpp index a040708..b927fe1 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -44,6 +44,7 @@ int quietMode; int quietModeRequested; int userDrivenMode; int _replaySystem = 0; +CsvDeclare(CmiIpcManager*, coreIpcManager_); CmiNodeLock CmiMemLock_lock; CpvDeclare(int, isHelperOn); @@ -277,6 +278,10 @@ void CmiInitState(int rank) { newZCPupGets); // Check if this is necessary CpvInitialize(int, interopExitFlag); CpvAccess(interopExitFlag) = 0; + #ifdef CMK_USE_SHMEM + CsvInitialize(CmiIpcManager*, coreIpcManager_); + CsvAccess(coreIpcManager_) = nullptr; + #endif CmiOnesidedDirectInit(); CcdModuleInit(); } @@ -356,6 +361,10 @@ void CmiPushPE(int destPE, void *msg) { CmiPushPE(destPE, messageSize, msg); } +void CmiPushNode(void *msg) { + CmiNodeQueue->push(msg); +} + void *CmiAlloc(int size) { if (size <= 0) { CmiPrintf("CmiAlloc: size <= 0\n"); @@ -405,9 +414,23 @@ void CmiFree(void *msg) { // zero */ // CmiAbort("CmiFree reference count was zero-- is this a duplicate free?"); + #ifdef CMK_USE_SHMEM + // we should only free _our_ IPC blocks -- so calling CmiFree on + // an IPC block issued by another process will cause a bad free! + // (note -- this would only occur if you alloc an ipc block then + // decide not to send it; that should be avoided! ) + CmiIpcBlock* ipc; + auto* manager = CsvAccess(coreIpcManager_); + if (msg && (ipc = CmiIsIpcBlock(manager, BLKSTART(msg), CmiMyNode()))) { + CmiFreeIpcBlock(manager, ipc); + return; + } + #endif + if (refCount == 1) { free(BLKSTART(parentBlk)); } + } int CmiGetReference(void *blk) { return REFFIELD(CmiAllocFindEnclosing(blk)); } @@ -750,12 +773,34 @@ int CmiError(const char *format, ...) { return ret; } -void __CmiEnforceMsgHelper(const char *expr, const char *fileName, int lineNum, - const char *msg, ...) { - CmiAbort("[%d] Assertion \"%s\" failed in file %s line %d.\n", CmiMyPe(), - expr, fileName, lineNum); +void __CmiEnforceHelper(const char* expr, const char* fileName, const char* lineNum) +{ + CmiAbort("[%d] Assertion \"%s\" failed in file %s line %s.\n", CmiMyPe(), expr, + fileName, lineNum); +} + +void __CmiEnforceMsgHelper(const char* expr, const char* fileName, const char* lineNum, + const char* msg, ...) +{ + va_list args; + va_start(args, msg); + + // Get length of formatted string + va_list argsCopy; + va_copy(argsCopy, args); + const auto size = 1 + vsnprintf(nullptr, 0, msg, argsCopy); + va_end(argsCopy); + + // Allocate a buffer of right size and create formatted string in it + std::vector formatted(size); + vsnprintf(formatted.data(), size, msg, args); + va_end(args); + + CmiAbort("[%d] Assertion \"%s\" failed in file %s line %s.\n%s", CmiMyPe(), expr, + fileName, lineNum, formatted.data()); } + bool CmiGetIdle() { return idle_condition; } void CmiSetIdle(bool idle) { idle_condition = idle; } diff --git a/src/converse_config.h.in b/src/converse_config.h.in index 975a2f6..c31483d 100644 --- a/src/converse_config.h.in +++ b/src/converse_config.h.in @@ -7,5 +7,6 @@ #cmakedefine SPANTREE #cmakedefine CMK_SMP #cmakedefine CMK_CPV_IS_SMP +#cmakedefine CMK_USE_SHMEM #endif diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 8368424..37429e8 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -18,6 +18,13 @@ void CsdScheduler() { CcdRaiseCondition(CcdSCHEDLOOP); + #ifdef CMK_USE_SHMEM + CmiIpcBlock* block = CmiPopIpcBlock(CsvAccess(coreIpcManager_)); + if (block != nullptr) { + CmiDeliverIpcBlockMsg(block); + } + #endif + // poll node queue if (!nodeQueue->empty()) { auto result = nodeQueue->pop(); diff --git a/src/threads.cpp b/src/threads.cpp index 44883e9..9e3792c 100644 --- a/src/threads.cpp +++ b/src/threads.cpp @@ -309,6 +309,8 @@ void CthResume(CthThread t) { */ } +int CthIsSuspendable(CthThread t) { return B(t)->suspendable; } + /* Suspend: finds the next thread to execute, and resumes it */