Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ option(SPANTREE "whether to enable spanning tree collectives" OFF) #should turn
# Reconverse uses SMP always, but turn on this option for backwards compatibility (see pingpong_multipairs for example)
option(CMK_SMP "whether to enable SMP support" ON)
option(CMK_CPV_IS_SMP "whether to enable SMP support for cpvs" ON)
option(CMK_USE_SHMEM "whether to use POSIX shared memory for IPC" OFF)

option(RECONVERSE_ATOMIC_QUEUE "whether to use atomic queue" ON)

Expand Down
131 changes: 123 additions & 8 deletions include/converse.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#include <cstdio>
#include <cstdlib>
#include <pthread.h>
#include <atomic>
#include <limits>
#include <utility>

using CmiInt1 = std::int8_t;
using CmiInt2 = std::int16_t;
Expand Down Expand Up @@ -199,6 +202,8 @@ void CthFree(CthThread t);

void CthResume(CthThread t);

int CthIsSuspendable(CthThread t);

void CthSuspend(void);

void CthAwaken(CthThread th);
Expand Down Expand Up @@ -324,6 +329,7 @@ void CmiSyncSendAndFree(int destPE, int messageSize, void *msg);
void CmiSyncListSend(int npes, const int *pes, int len, void *msg);
void CmiSyncListSendAndFree(int npes, const int *pes, int len, void *msg);
void CmiPushPE(int destPE, void *msg);
void CmiPushNode(void *msg);

void CmiSyncSendFn(int destPE, int messageSize, char *msg);
void CmiFreeSendFn(int destPE, int messageSize, char *msg);
Expand Down Expand Up @@ -399,15 +405,25 @@ int CmiError(const char *format, ...);
void CmiInitCPUTopology(char **argv);
void CmiInitCPUAffinity(char **argv);

void __CmiEnforceMsgHelper(const char *expr, const char *fileName, int lineNum,
const char *msg, ...);
#define __CMK_STRING(x) #x
#define __CMK_XSTRING(x) __CMK_STRING(x)

#define CmiEnforce(condition) \
do { \
if (!(condition)) { \
__CmiEnforceMsgHelper(#condition, __FILE__, __LINE__, ""); \
} \
} while (0)
void __CmiEnforceHelper(const char* expr, const char* fileName, const char* lineNum);
void __CmiEnforceMsgHelper(const char* expr, const char* fileName,
const char* lineNum, const char* msg, ...);

#define CmiEnforce(expr) \
((void)(CMI_LIKELY(expr) ? 0 \
: (__CmiEnforceHelper(__CMK_STRING(expr), __FILE__, \
__CMK_XSTRING(__LINE__)), \
0)))

#define CmiEnforceMsg(expr, ...) \
((void)(CMI_LIKELY(expr) \
? 0 \
: (__CmiEnforceMsgHelper(__CMK_STRING(expr), __FILE__, \
__CMK_XSTRING(__LINE__), __VA_ARGS__), \
0)))

double getCurrentTime(void);
double CmiWallTimer(void);
Expand Down Expand Up @@ -1045,4 +1061,103 @@ void CmiInterSyncNodeSendAndFreeFn(int destNode, int partition, int messageSize,

/* end of variables and functions for partition */

#define CMI_IPC_CUTOFF_ARG "ipccutoff"
#define CMI_IPC_CUTOFF_DESC "max message size for cmi-shmem (in bytes)"
#define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize"
#define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)"

struct CmiIpcManager;

namespace cmi {
namespace ipc {
// recommended cutoff for block sizes
CpvExtern(std::size_t, kRecommendedCutoff);
// used to represent an empty linked list
constexpr auto nil = std::uintptr_t(0);
// used to represent the tail of a linked list
constexpr auto max = std::numeric_limits<std::uintptr_t>::max();
// used to indicate a message bound for a node
constexpr auto nodeDatagram = std::numeric_limits<CmiUInt2>::max();
// default number of attempts to alloc before timing out
constexpr auto defaultTimeout = 4;
} // namespace ipc
} // namespace cmi

// alignas is used for padding here, rather than for alignment of the
// CmiIpcBlock itself.
struct alignas(ALIGN_BYTES) CmiIpcBlock {
public:
// "home" rank of the block
int src;
int dst;
std::uintptr_t orig;
std::uintptr_t next;
std::size_t size;

CmiIpcBlock(std::size_t size_, std::uintptr_t orig_)
: orig(orig_), next(cmi::ipc::nil), size(size_) {}
};

enum CmiIpcAllocStatus {
CMI_IPC_OUT_OF_MEMORY,
CMI_IPC_REMOTE_DESTINATION,
CMI_IPC_SUCCESS,
CMI_IPC_TIMEOUT
};

// sets up ipc environment
void CmiIpcInit(char** argv);

// creates an ipc manager, waking the thread when it's done
// ( this must be called in the same order on all pes! )
CmiIpcManager* CmiMakeIpcManager(CthThread th);

// push/pop blocks from the manager's send/recv queue
bool CmiPushIpcBlock(CmiIpcManager*, CmiIpcBlock*);
CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager*);

// tries to allocate a block, returning null if unsucessful
// (fails when other PEs are contending resources)
// second value of pair indicates failure cause
std::pair<CmiIpcBlock*, CmiIpcAllocStatus> CmiAllocIpcBlock(CmiIpcManager*, int node, std::size_t size);

// frees a block -- enabling it to be used again
void CmiFreeIpcBlock(CmiIpcManager*, CmiIpcBlock*);

// currently a no-op but may be eventually usable
// intended to "capture" blocks from remote pes
inline void CmiCacheIpcBlock(CmiIpcBlock*) { return; }

// identifies whether a void* is the payload of a block
// belonging to the given node
CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager*, void*, int node);

// if (init) is true -- initializes the
// memory segment for use as a message
void* CmiIpcBlockToMsg(CmiIpcBlock*, bool init);

// equivalent to calling above with (init = false)
inline void* CmiIpcBlockToMsg(CmiIpcBlock* block) {
auto res = (char*)block + sizeof(CmiIpcBlock) + sizeof(CmiChunkHeader);
return (void*)res;
}

inline CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, void* msg) {
return CmiIsIpcBlock(manager, (char*)msg - sizeof(CmiChunkHeader), CmiMyNode());
}

CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager*, char* msg, std::size_t len, int node,
int rank = cmi::ipc::nodeDatagram,
int timeout = cmi::ipc::defaultTimeout);

// deliver a block as a message
void CmiDeliverIpcBlockMsg(CmiIpcBlock*);

inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) {
using namespace cmi::ipc;
return CpvAccess(kRecommendedCutoff);
}

CsvExtern(CmiIpcManager*, coreIpcManager_);

#endif // CONVERSE_H
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ target_include_directories(reconverse PRIVATE .)
target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp
scheduler.cpp cpuaffinity.cpp collectives.cpp
comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp
conv-rdma.cpp conv-topology.cpp msgmgr.cpp)
conv-rdma.cpp conv-topology.cpp msgmgr.cpp cmishmem.cpp)
target_include_directories(
reconverse PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
Expand Down
2 changes: 1 addition & 1 deletion src/cldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

typedef char *BitVector;

/*
CpvDeclare(int, CldHandlerIndex);
/*
CpvDeclare(int, CldNodeHandlerIndex);
CpvDeclare(BitVector, CldPEBitVector);
CpvDeclare(int, CldBalanceHandlerIndex);
Expand Down
2 changes: 1 addition & 1 deletion src/cldb.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#define MAXMSGBFRSIZE 100000

extern thread_local int CldHandlerIndex;
CpvExtern(int, CldHandlerIndex);
extern thread_local int CldNodeHandlerIndex;
extern thread_local int CldBalanceHandlerIndex;
extern thread_local int CldRelocatedMessages;
Expand Down
11 changes: 6 additions & 5 deletions src/cldb.rand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) {
pfn(&msg);
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
}
CldSwitchHandler((char *)msg, CldHandlerIndex);
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
CmiSetInfo(msg, infofn);

CmiSyncMulticastAndFree(grp, len, msg);
Expand All @@ -61,7 +61,7 @@ void CldEnqueueWithinNode(void *msg, int infofn) {
pfn(&msg);
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
}
CldSwitchHandler((char *)msg, CldHandlerIndex);
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
CmiSetInfo(msg, infofn);

CmiWithinNodeBroadcast(len, (char *)msg);
Expand All @@ -77,7 +77,7 @@ void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn) {
pfn(&msg);
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
}
CldSwitchHandler((char *)msg, CldHandlerIndex);
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
CmiSetInfo(msg, infofn);

CmiSyncListSendAndFree(npes, pes, len, msg);
Expand Down Expand Up @@ -112,7 +112,7 @@ void CldEnqueue(int pe, void *msg, int infofn) {
pfn(&msg);
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
}
CldSwitchHandler((char *)msg, CldHandlerIndex);
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
CmiSetInfo(msg, infofn);
if (pe == CLD_BROADCAST) {
CmiSyncBroadcastAndFree(len, msg);
Expand Down Expand Up @@ -157,7 +157,8 @@ void CldNodeEnqueue(int node, void *msg, int infofn) {
}

void CldModuleInit(char **argv) {
CldHandlerIndex = CmiRegisterHandler((CmiHandler)CldHandler);
CpvInitialize(int, CldHandlerIndex);
CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
CldNodeHandlerIndex = CmiRegisterHandler((CmiHandler)CldNodeHandler);
CldRelocatedMessages = 0;
CldLoadBalanceMessages = 0;
Expand Down
131 changes: 131 additions & 0 deletions src/cmi-shmem-common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#ifndef CMI_SHMEM_COMMON_HH
#define CMI_SHMEM_COMMON_HH

#include "converse_internal.h"

#include <array>
#include <limits>
#include <map>
#include <memory>
#include <vector>


namespace cmi {
namespace ipc {
CpvDeclare(std::size_t, kRecommendedCutoff);
}
} // namespace cmi

CpvStaticDeclare(std::size_t, kSegmentSize);
constexpr std::size_t kDefaultSegmentSize = 8 * 1024 * 1024;

constexpr std::size_t kNumCutOffPoints = 25;
const std::array<std::size_t, kNumCutOffPoints> kCutOffPoints = {
64, 128, 256, 512, 1024, 2048, 4096,
8192, 16384, 32768, 65536, 131072, 262144, 524288,
1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864,
134217728, 268435456, 536870912, 1073741824};

struct ipc_metadata_;
CsvStaticDeclare(CmiNodeLock, sleeper_lock);

using sleeper_map_t = std::vector<CthThread>;
CsvStaticDeclare(sleeper_map_t, sleepers);

// the data each pe shares with its peers
// contains pool of free blocks, heap, and receive queue
struct ipc_shared_ {
std::array<std::atomic<std::uintptr_t>, kNumCutOffPoints> free;
std::atomic<std::uintptr_t> queue;
std::atomic<std::uintptr_t> heap;
std::uintptr_t max;

ipc_shared_(std::uintptr_t begin, std::uintptr_t end)
: queue(cmi::ipc::max), heap(begin), max(end) {
for (auto& f : this->free) {
f.store(cmi::ipc::max);
}
}
};

// shared data for each pe
struct ipc_metadata_ {
// maps ranks to shared segments
std::map<int, ipc_shared_*> shared;
// physical node rank
int mine;
// key of this instance
std::size_t key;
// base constructor
ipc_metadata_(std::size_t key_) : mine(CmiMyNode()), key(key_) {}
// virtual destructor may be needed
virtual ~ipc_metadata_() {}
};

inline std::size_t whichBin_(std::size_t size);

inline static void initIpcShared_(ipc_shared_* shared) {
auto begin = (std::uintptr_t)(sizeof(ipc_shared_) +
(sizeof(ipc_shared_) % ALIGN_BYTES));
CmiAssert(begin != cmi::ipc::nil);
auto end = begin + CpvAccess(kSegmentSize);
new (shared) ipc_shared_(begin, end);
}

inline static ipc_shared_* makeIpcShared_(void) {
auto* shared = (ipc_shared_*)(::operator new(sizeof(ipc_shared_) +
CpvAccess(kSegmentSize)));
initIpcShared_(shared);
return shared;
}

inline void initSegmentSize_(char** argv) {
using namespace cmi::ipc;
CpvInitialize(std::size_t, kRecommendedCutoff);
CpvInitialize(std::size_t, kSegmentSize);

CmiInt8 value;
auto flag =
CmiGetArgLongDesc(argv, "++" CMI_IPC_POOL_SIZE_ARG, &value, CMI_IPC_POOL_SIZE_DESC);
CpvAccess(kSegmentSize) = flag ? (std::size_t)value : kDefaultSegmentSize;
CmiEnforceMsg(CpvAccess(kSegmentSize), "segment size must be non-zero!");
if (CmiGetArgLongDesc(argv, "++" CMI_IPC_CUTOFF_ARG, &value, CMI_IPC_CUTOFF_DESC)) {
auto bin = whichBin_((std::size_t)value);
CmiEnforceMsg(bin < kNumCutOffPoints, "ipc cutoff out of range!");
CpvAccess(kRecommendedCutoff) = kCutOffPoints[bin];
} else {
auto max = CpvAccess(kSegmentSize) / kNumCutOffPoints;
auto bin = (std::intptr_t)whichBin_(max) - 1;
CpvAccess(kRecommendedCutoff) = kCutOffPoints[(bin >= 0) ? bin : 0];
}
}

inline static void printIpcStartupMessage_(const char* implName) {
using namespace cmi::ipc;
CmiPrintf("Converse> %s pool init'd with %luB segment and %luB cutoff.\n",
implName, CpvAccess(kSegmentSize),
CpvAccess(kRecommendedCutoff));
}

inline static void initSleepers_(void) {
if (CmiMyRank() == 0) {
CsvInitialize(sleeper_map_t, sleepers);
CsvAccess(sleepers).resize(CmiMyNodeSize());
CsvInitialize(CmiNodeLock, sleeper_lock);
CsvAccess(sleeper_lock) = CmiCreateLock();
}
}

inline static void putSleeper_(CthThread th) {
CmiLock(CsvAccess(sleeper_lock));
(CsvAccess(sleepers))[CmiMyRank()] = th;
CmiUnlock(CsvAccess(sleeper_lock));
}

static void awakenSleepers_(void);

using ipc_manager_ptr_ = std::unique_ptr<CmiIpcManager>;
using ipc_manager_map_ = std::vector<ipc_manager_ptr_>;
CsvStaticDeclare(ipc_manager_map_, managers_);

#endif
Loading