Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions include/converse.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <cinttypes>
#include <cstdio>
#include <cstdlib>
#include <queue>
#include <pthread.h>

using CmiInt1 = std::int8_t;
Expand Down Expand Up @@ -364,12 +365,50 @@ void CmiNodeBarrier();
void CmiNodeAllBarrier();
#define CmiBarrier() CmiNodeBarrier()

typedef pthread_mutex_t *CmiNodeLock;
typedef CmiNodeLock CmiImmediateLockType;
extern int _immediateLock;
extern int _immediateFlag;
extern CmiNodeLock _smp_mutex;

// scheduler
void CsdExitScheduler();
int CsdScheduler(int maxmsgs);
void CsdEnqueueGeneral(void *Message, int strategy, int priobits, int *prioptr);
void CsdNodeEnqueueGeneral(void *Message, int strategy, int priobits,
unsigned int *prioptr);
//void CsdEnqueueGeneral(void *Message, int strategy, int priobits, int *prioptr);
//void CsdNodeEnqueueGeneral(void *Message, int strategy, int priobits,
// unsigned int *prioptr);

// Message-priority pair for the queue
typedef struct MessagePriorityPair {
void* message;
long long priority;

MessagePriorityPair(void* msg, long long prio) : message(msg), priority(prio) {}
} MessagePriorityPair;

// Comparator for increasing order of priority values
struct MessagePriorityComparator {
bool operator()(const MessagePriorityPair& a, const MessagePriorityPair& b) const {
return a.priority > b.priority; // Note: inverted for min-heap behavior
}
};

typedef std::priority_queue<MessagePriorityPair, std::vector<MessagePriorityPair>, MessagePriorityComparator> *Queue;

#define QueueInit() new std::priority_queue<MessagePriorityPair, std::vector<MessagePriorityPair>, MessagePriorityComparator>()

CpvExtern(Queue, CsdSchedQueue);
CsvExtern(Queue, CsdNodeQueue);
CsvExtern(CmiNodeLock, CsdNodeQueueLock);
void CqsEnqueueGeneral(Queue q, void *Message, int strategy, int priobits,
unsigned int *prioptr);
#define CsdEnqueueGeneral(msg, strategy, priobits, prioptr) \
(CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),(msg),(strategy),(priobits),(prioptr)))
#define CsdNodeEnqueueGeneral(msg, strategy, priobits, prioptr) do { \
CmiLock(CsvAccess(CsdNodeQueueLock)); \
CqsEnqueueGeneral((Queue)CsvAccess(CsdNodeQueue),(msg),(strategy),(priobits),(prioptr)); \
CmiUnlock(CsvAccess(CsdNodeQueueLock)); \
} while(0)

void CmiAssignOnce(int *variable, int value);

Expand Down Expand Up @@ -565,12 +604,6 @@ int CmiArgGivingUsage(void);
void CmiDeprecateArgInt(char **argv, const char *arg, const char *desc,
const char *warning);

typedef pthread_mutex_t *CmiNodeLock;
typedef CmiNodeLock CmiImmediateLockType;
extern int _immediateLock;
extern int _immediateFlag;
extern CmiNodeLock _smp_mutex;

#define CmiCreateImmediateLock() (0)
#define CmiImmediateLock(ignored) \
{ _immediateLock++; }
Expand Down
8 changes: 2 additions & 6 deletions src/cldb.rand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ void CldHandler(char *msg) {
CldRestoreHandler((char *)msg);
ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
// CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
CmiPushPE(CmiMyPe(), len,
msg); // use priority queue when we add priority queue
CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
}

void CldNodeHandler(char *msg) {
Expand Down Expand Up @@ -103,9 +101,7 @@ void CldEnqueue(int pe, void *msg, int infofn) {
/* CsdEnqueueGeneral is not thread or SIGIO safe */
// CmiPrintf(" myself processor %d ==> %d, length=%d Timer:%f , priori=%d
// \n", CmiMyPe(), pe, len, CmiWallTimer(), *prioptr);
//CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
CmiPushPE(CmiMyPe(), len,
msg);
CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
} else {
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
Expand Down
12 changes: 12 additions & 0 deletions src/convcore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ int Cmi_numnodes; // represents the number of physical nodes/systems machine
int Cmi_nodestart;
std::vector<CmiHandlerInfo> **CmiHandlerTable; // array of handler vectors
ConverseNodeQueue<void *> *CmiNodeQueue;
CpvDeclare(Queue, CsdSchedQueue);
CsvDeclare(Queue, CsdNodeQueue);
CsvDeclare(CmiNodeLock, CsdNodeQueueLock);
double Cmi_startTime;
CmiSpanningTreeInfo *_topoTree = NULL;
int CharmLibInterOperate;
Expand Down Expand Up @@ -279,6 +282,15 @@ void CmiInitState(int rank) {
CpvAccess(interopExitFlag) = 0;
CmiOnesidedDirectInit();
CcdModuleInit();
CpvInitialize(Queue, CsdSchedQueue);
CpvAccess(CsdSchedQueue) = QueueInit();
CsvInitialize(CmiLock, CsdNodeQueueLock);
CsvInitialize(Queue, CsdNodeQueue);
if (CmiMyRank() ==0) {
CsvAccess(CsdNodeQueueLock) = CmiCreateLock();
CsvAccess(CsdNodeQueue) = QueueInit();
}
CmiNodeBarrier();
}

ConverseQueue<void *> *CmiGetQueue(int rank) { return Cmi_queues[rank]; }
Expand Down
96 changes: 88 additions & 8 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,39 @@ void CsdScheduler() {
// process event
CmiHandleMessage(msg);

// release idle if necessary
if (CmiGetIdle()) {
CmiSetIdle(false);
CcdRaiseCondition(CcdPROCESSOR_END_IDLE);
}
}

// poll node prio queue
else if (!CsvAccess(CsdNodeQueue)->empty()) {
CmiLock(CsvAccess(CsdNodeQueueLock));
auto result = CsvAccess(CsdNodeQueue)->top();
CsvAccess(CsdNodeQueue)->pop();
CmiUnlock(CsvAccess(CsdNodeQueueLock));
void *msg = result.message;
// process event
CmiHandleMessage(msg);

// release idle if necessary
if (CmiGetIdle()) {
CmiSetIdle(false);
CcdRaiseCondition(CcdPROCESSOR_END_IDLE);
}
}

//poll thread prio queue
else if (!CpvAccess(CsdSchedQueue)->empty()) {
auto result = CpvAccess(CsdSchedQueue)->top();
CpvAccess(CsdSchedQueue)->pop();
void *msg = result.message;

// process event
CmiHandleMessage(msg);

// release idle if necessary
if (CmiGetIdle()) {
CmiSetIdle(false);
Expand Down Expand Up @@ -122,6 +155,39 @@ void CsdSchedulePoll() {
}
}

// poll node prio queue
else if (!CsvAccess(CsdNodeQueue)->empty()) {
CmiLock(CsvAccess(CsdNodeQueueLock));
auto result = CsvAccess(CsdNodeQueue)->top();
CsvAccess(CsdNodeQueue)->pop();
CmiUnlock(CsvAccess(CsdNodeQueueLock));
void *msg = result.message;
// process event
CmiHandleMessage(msg);

// release idle if necessary
if (CmiGetIdle()) {
CmiSetIdle(false);
CcdRaiseCondition(CcdPROCESSOR_END_IDLE);
}
}

//poll thread prio queue
else if (!CpvAccess(CsdSchedQueue)->empty()) {
auto result = CpvAccess(CsdSchedQueue)->top();
CpvAccess(CsdSchedQueue)->pop();
void *msg = result.message;

// process event
CmiHandleMessage(msg);

// release idle if necessary
if (CmiGetIdle()) {
CmiSetIdle(false);
CcdRaiseCondition(CcdPROCESSOR_END_IDLE);
}
}

else {
comm_backend::progress();
break; //break when queues are empty
Expand All @@ -140,12 +206,26 @@ int CsdScheduler(int maxmsgs){

}

void CsdEnqueueGeneral(void *Message, int strategy, int priobits, int *prioptr){
CmiPushPE(CmiMyPe(), sizeof(Message), Message);
void CqsEnqueueGeneral(Queue q, void *Message, int strategy, int priobits,
unsigned int *prioptr){
int iprio;
long long lprio;
switch (strategy){ //for now everything is FIFO
case CQS_QUEUEING_FIFO:
case CQS_QUEUEING_LIFO:
q->push(MessagePriorityPair((void*)Message, 0));
break;
case CQS_QUEUEING_IFIFO:
case CQS_QUEUEING_ILIFO:
iprio=prioptr[0]+(1U<<(8*sizeof(unsigned int)-1));
q->push(MessagePriorityPair((void*)Message, iprio));
break;
case CQS_QUEUEING_LFIFO:
case CQS_QUEUEING_LLIFO:
lprio = ((long long*)prioptr)[0] + (1ULL<<(8*sizeof(long long)-1));
q->push(MessagePriorityPair((void*)Message, lprio));
break;
default:
CmiAbort("CqsEnqueueGeneral: invalid queueing strategy (bitvectors not supported yet)\n");
}
}

void CsdNodeEnqueueGeneral(void *Message, int strategy, int priobits, unsigned int *prioptr){
CmiGetNodeQueue()->push(Message);
}

// TODO: implement CsdEnqueue/Dequeue (why are these necessary?)