diff --git a/include/converse.h b/include/converse.h index 9f57b8b..93df008 100644 --- a/include/converse.h +++ b/include/converse.h @@ -5,6 +5,7 @@ #ifdef __cplusplus #include +#include #else #include #endif @@ -441,9 +442,46 @@ void CmiNodeAllBarrier(); // scheduler void CsdExitScheduler(); int CsdScheduler(int maxmsgs); -void CsdEnqueueGeneral(void *Message, int strategy, int priobits, int *prioptr); -void CsdNodeEnqueueGeneral(void *Message, int strategy, int priobits, - unsigned int *prioptr); + +#ifdef __cplusplus +extern "C" { +#endif + +// Message-priority pair for the queue +struct MessagePriorityPair { + void* message; + long long priority; + + MessagePriorityPair(void* msg, long long prio) : message(msg), priority(prio) {} +}; + +// Comparator for increasing order of priority values +struct MessagePriorityComparator { + bool operator()(const MessagePriorityPair& a, const MessagePriorityPair& b) const { + return a.priority > b.priority; // Note: inverted for min-heap behavior + } +}; + +typedef std::priority_queue, MessagePriorityComparator> *Queue; + +#define QueueInit() new std::priority_queue, MessagePriorityComparator>() + +CpvExtern(Queue, CsdSchedQueue); +CsvExtern(Queue, CsdNodeQueue); +CsvExtern(CmiNodeLock, CsdNodeQueueLock); +void CqsEnqueueGeneral(Queue q, void *Message, int strategy, int priobits, + unsigned int *prioptr); +#define CsdEnqueueGeneral(msg, strategy, priobits, prioptr) \ + (CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),(msg),(strategy),(priobits),(prioptr))) +#define CsdNodeEnqueueGeneral(msg, strategy, priobits, prioptr) do { \ + CmiLock(CsvAccess(CsdNodeQueueLock)); \ + CqsEnqueueGeneral((Queue)CsvAccess(CsdNodeQueue),(msg),(strategy),(priobits),(prioptr)); \ + CmiUnlock(CsvAccess(CsdNodeQueueLock)); \ + } while(0) + +#ifdef __cplusplus +} +#endif void CmiAssignOnce(int *variable, int value); diff --git a/src/cldb.rand.cpp b/src/cldb.rand.cpp index ccda7d4..6128df0 100644 --- a/src/cldb.rand.cpp +++ b/src/cldb.rand.cpp @@ -15,9 +15,7 @@ void CldHandler(char *msg) { CldRestoreHandler((char *)msg); ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg)); ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); - // CsdEnqueueGeneral(msg, queueing, priobits, prioptr); - CmiPushPE(CmiMyPe(), len, - msg); // use priority queue when we add priority queue + CsdEnqueueGeneral(msg, queueing, priobits, prioptr); } void CldNodeHandler(char *msg) { @@ -103,9 +101,7 @@ void CldEnqueue(int pe, void *msg, int infofn) { /* CsdEnqueueGeneral is not thread or SIGIO safe */ // CmiPrintf(" myself processor %d ==> %d, length=%d Timer:%f , priori=%d // \n", CmiMyPe(), pe, len, CmiWallTimer(), *prioptr); - //CsdEnqueueGeneral(msg, queueing, priobits, prioptr); - CmiPushPE(CmiMyPe(), len, - msg); + CsdEnqueueGeneral(msg, queueing, priobits, prioptr); } else { ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr); if (pfn && CmiNodeOf(pe) != CmiMyNode()) { diff --git a/src/convcore.cpp b/src/convcore.cpp index c763633..f2685fc 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -29,6 +29,9 @@ int Cmi_numnodes; // represents the number of physical nodes/systems machine int Cmi_nodestart; std::vector **CmiHandlerTable; // array of handler vectors ConverseNodeQueue *CmiNodeQueue; +CpvDeclare(Queue, CsdSchedQueue); +CsvDeclare(Queue, CsdNodeQueue); +CsvDeclare(CmiNodeLock, CsdNodeQueueLock); double Cmi_startTime; CmiSpanningTreeInfo *_topoTree = NULL; int CharmLibInterOperate; @@ -285,6 +288,15 @@ void CmiInitState(int rank) { CpvAccess(interopExitFlag) = 0; CmiOnesidedDirectInit(); CcdModuleInit(); + CpvInitialize(Queue, CsdSchedQueue); + CpvAccess(CsdSchedQueue) = QueueInit(); + CsvInitialize(CmiLock, CsdNodeQueueLock); + CsvInitialize(Queue, CsdNodeQueue); + if (CmiMyRank() ==0) { + CsvAccess(CsdNodeQueueLock) = CmiCreateLock(); + CsvAccess(CsdNodeQueue) = QueueInit(); + } + CmiNodeBarrier(); } ConverseQueue *CmiGetQueue(int rank) { return Cmi_queues[rank]; } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 8368424..e83d713 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -49,23 +49,94 @@ void CsdScheduler() { } } - // the processor is idle + // poll node prio queue else { - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle + // Try to acquire lock without blocking + if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { + if (!CsvAccess(CsdNodeQueue)->empty()) { + auto result = CsvAccess(CsdNodeQueue)->top(); + CsvAccess(CsdNodeQueue)->pop(); + CmiUnlock(CsvAccess(CsdNodeQueueLock)); + void *msg = result.message; + // process event + CmiHandleMessage(msg); + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + } + else { + CmiUnlock(CsvAccess(CsdNodeQueueLock)); + //empty queue so check thread prio queue + if (!CpvAccess(CsdSchedQueue)->empty()) { + auto result = CpvAccess(CsdSchedQueue)->top(); + CpvAccess(CsdSchedQueue)->pop(); + void *msg = result.message; + + // process event + CmiHandleMessage(msg); + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + } else { + // the processor is idle + // if not already idle, set idle and raise condition + if (!CmiGetIdle()) { + CmiSetIdle(true); + CmiSetIdleTime(CmiWallTimer()); + CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); + } + // if already idle, call still idle and (maybe) long idle + else { + CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); + if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { + CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); + } + } + // poll the communication layer + comm_backend::progress(); + } + } + } else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); + // Could not acquire node queue lock, skip to thread prio queue + if (!CpvAccess(CsdSchedQueue)->empty()) { + auto result = CpvAccess(CsdSchedQueue)->top(); + CpvAccess(CsdSchedQueue)->pop(); + void *msg = result.message; + + // process event + CmiHandleMessage(msg); + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + } else { + // the processor is idle + // if not already idle, set idle and raise condition + if (!CmiGetIdle()) { + CmiSetIdle(true); + CmiSetIdleTime(CmiWallTimer()); + CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); + } + // if already idle, call still idle and (maybe) long idle + else { + CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); + if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { + CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); + } + } + // poll the communication layer + comm_backend::progress(); } } - // poll the communication layer - comm_backend::progress(); } CcdCallBacks(); @@ -75,7 +146,7 @@ void CsdScheduler() { } /** - * Similar to CsdScheduker, but return when the queues + * Similar to CsdScheduler, but return when the queues * are empty, not when the scheduler is stopped. */ void CsdSchedulePoll() { @@ -122,13 +193,67 @@ void CsdSchedulePoll() { } } + // poll node prio queue else { - comm_backend::progress(); - break; //break when queues are empty - } + // Try to acquire lock without blocking + if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { + if (!CsvAccess(CsdNodeQueue)->empty()) { + auto result = CsvAccess(CsdNodeQueue)->top(); + CsvAccess(CsdNodeQueue)->pop(); + CmiUnlock(CsvAccess(CsdNodeQueueLock)); + void *msg = result.message; + // process event + CmiHandleMessage(msg); - } + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + } + else { + CmiUnlock(CsvAccess(CsdNodeQueueLock)); + if (!CpvAccess(CsdSchedQueue)->empty()) { + auto result = CpvAccess(CsdSchedQueue)->top(); + CpvAccess(CsdSchedQueue)->pop(); + void *msg = result.message; + + // process event + CmiHandleMessage(msg); + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + } else { + comm_backend::progress(); + break; //break when queues are empty + } + } + } + else { + // Could not acquire node queue lock, skip to thread prio queue + if (!CpvAccess(CsdSchedQueue)->empty()) { + auto result = CpvAccess(CsdSchedQueue)->top(); + CpvAccess(CsdSchedQueue)->pop(); + void *msg = result.message; + // process event + CmiHandleMessage(msg); + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + } else { + comm_backend::progress(); + break; //break when queues are empty + } + } + } + } } int CsdScheduler(int maxmsgs){ @@ -140,12 +265,26 @@ int CsdScheduler(int maxmsgs){ } -void CsdEnqueueGeneral(void *Message, int strategy, int priobits, int *prioptr){ - CmiPushPE(CmiMyPe(), sizeof(Message), Message); +void CqsEnqueueGeneral(Queue q, void *Message, int strategy, int priobits, + unsigned int *prioptr){ + int iprio; + long long lprio; + switch (strategy){ //for now everything is FIFO + case CQS_QUEUEING_FIFO: + case CQS_QUEUEING_LIFO: + q->push(MessagePriorityPair((void*)Message, 0)); + break; + case CQS_QUEUEING_IFIFO: + case CQS_QUEUEING_ILIFO: + iprio=prioptr[0]+(1U<<(8*sizeof(unsigned int)-1)); + q->push(MessagePriorityPair((void*)Message, iprio)); + break; + case CQS_QUEUEING_LFIFO: + case CQS_QUEUEING_LLIFO: + lprio = ((long long*)prioptr)[0] + (1ULL<<(8*sizeof(long long)-1)); + q->push(MessagePriorityPair((void*)Message, lprio)); + break; + default: + CmiAbort("CqsEnqueueGeneral: invalid queueing strategy (bitvectors not supported yet)\n"); + } } - -void CsdNodeEnqueueGeneral(void *Message, int strategy, int priobits, unsigned int *prioptr){ - CmiGetNodeQueue()->push(Message); -} - -// TODO: implement CsdEnqueue/Dequeue (why are these necessary?)