33
44#include " FWCore/ParameterSet/interface/ParameterSet.h"
55#include " FWCore/ServiceRegistry/interface/ActivityRegistry.h"
6+ #include " FWCore/ServiceRegistry/interface/StreamContext.h"
67#include " DataFormats/Provenance/interface/EventID.h"
78#include " DataFormats/Provenance/interface/LuminosityBlockID.h"
89#include " DataFormats/Provenance/interface/Timestamp.h"
2021#include < queue>
2122#include < sstream>
2223#include < unordered_map>
24+ #include " oneapi/tbb/task_arena.h"
25+ #include " oneapi/tbb/task_scheduler_observer.h"
2326
2427/* Description
2528 this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
2831 moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
2932 Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
3033 At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
31- Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
32- when starting to run associated to the memory location of path name strings as accessible when path is executed.
33- Path intermediate info will be called "ministate" :D
3434 The general counters and status variables (event number, number of processed events, number of passed and stored
3535 events, luminosity section etc.) are also monitored here.
3636
@@ -47,7 +47,10 @@ namespace edm {
4747
4848namespace evf {
4949
50+ template <typename T>
51+ struct ContainableAtomic ;
5052 class FastMonitoringThread ;
53+ class ConcurrencyTracker ;
5154
5255 namespace FastMonState {
5356
@@ -62,7 +65,11 @@ namespace evf {
6265 mBoL ,
6366 mEoL ,
6467 mGlobEoL ,
65- mCOUNT
68+ mFwk ,
69+ mIdleSource ,
70+ mEvent ,
71+ mIgnore ,
72+ mCOUNT ,
6673 };
6774
6875 enum Macrostate {
@@ -153,19 +160,21 @@ namespace evf {
153160 };
154161 } // namespace FastMonState
155162
163+ constexpr int nSpecialModules = FastMonState::mCOUNT ;
164+ // reserve output module space
165+ constexpr int nReservedModules = 128 ;
166+
156167 class FastMonitoringService : public MicroStateService {
157168 public:
158169 // the names of the states - some of them are never reached in an online app
159- static const edm::ModuleDescription reservedMicroStateNames [FastMonState::mCOUNT ];
170+ static const edm::ModuleDescription specialMicroStateNames [FastMonState::mCOUNT ];
160171 static const std::string macroStateNames[FastMonState::MCOUNT];
161172 static const std::string inputStateNames[FastMonState::inCOUNT];
162173 // Reserved names for microstates
163- static const std::string nopath_;
164174 FastMonitoringService (const edm::ParameterSet&, edm::ActivityRegistry&);
165175 ~FastMonitoringService () override ;
166176 static void fillDescriptions (edm::ConfigurationDescriptions& descriptions);
167177
168- std::string makePathLegendaJson ();
169178 std::string makeModuleLegendaJson ();
170179 std::string makeInputLegendaJson ();
171180
@@ -200,10 +209,6 @@ namespace evf {
200209 void preSourceEarlyTermination (edm::TerminationOrigin);
201210 void setExceptionDetected (unsigned int ls);
202211
203- // this is still needed for use in special functions like DQM which are in turn framework services
204- void setMicroState (FastMonState::Microstate);
205- void setMicroState (edm::StreamID, FastMonState::Microstate);
206-
207212 void accumulateFileSize (unsigned int lumi, unsigned long fileSize);
208213 void startedLookingForFile ();
209214 void stoppedLookingForFile (unsigned int lumi);
@@ -223,29 +228,40 @@ namespace evf {
223228 void setInputSource (DAQSource* inputSource) { daqInputSource_ = inputSource; }
224229 void setInState (FastMonState::InputState inputState) { inputState_ = inputState; }
225230 void setInStateSup (FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
231+ // available for other modules
232+ void setTMicrostate (FastMonState::Microstate m);
233+
234+ static unsigned int getTID () { return tbb::this_task_arena::current_thread_index (); }
226235
227236 private:
228237 void doSnapshot (const unsigned int ls, const bool isGlobalEOL);
229238
230239 void snapshotRunner ();
231240
241+ static unsigned int getSID (edm::StreamContext const & sc) { return sc.streamID ().value (); }
242+
243+ static unsigned int getSID (edm::StreamID const & sid) { return sid.value (); }
244+
232245 // the actual monitoring thread is held by a separate class object for ease of maintenance
233- std::shared_ptr<FastMonitoringThread> fmt_;
246+ std::unique_ptr<FastMonitoringThread> fmt_;
247+ std::unique_ptr<ConcurrencyTracker> ct_;
234248 // Encoding encModule_;
235249 // std::vector<Encoding> encPath_;
236250 FedRawDataInputSource* inputSource_ = nullptr ;
237251 DAQSource* daqInputSource_ = nullptr ;
238252 std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
239253 std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
240254
241- unsigned int nStreams_;
242- unsigned int nThreads_;
255+ unsigned int nStreams_ = 0 ;
256+ unsigned int nMonThreads_ = 0 ;
257+ unsigned int nThreads_ = 0 ;
258+ bool tbbMonitoringMode_;
259+ bool tbbConcurrencyTracker_;
243260 int sleepTime_;
244261 unsigned int fastMonIntervals_;
245262 unsigned int snapCounter_ = 0 ;
246263 std::string microstateDefPath_, fastMicrostateDefPath_;
247- std::string fastName_, fastPath_, slowName_;
248- bool filePerFwkStream_;
264+ std::string fastName_, fastPath_;
249265
250266 // variables that are used by/monitored by FastMonitoringThread / FastMonitor
251267
@@ -272,9 +288,6 @@ namespace evf {
272288 // to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
273289 std::vector<std::atomic<bool >*> streamCounterUpdating_;
274290
275- std::vector<std::atomic<bool >*> collectedPathList_;
276- std::vector<bool > pathNamesReady_;
277-
278291 std::filesystem::path workingDirectory_, runDirectory_;
279292
280293 bool threadIDAvailable_ = false ;
@@ -283,8 +296,6 @@ namespace evf {
283296
284297 std::string moduleLegendFile_;
285298 std::string moduleLegendFileJson_;
286- std::string pathLegendFile_;
287- std::string pathLegendFileJson_;
288299 std::string inputLegendFileJson_;
289300 unsigned int nOutputModules_ = 0 ;
290301
@@ -293,7 +304,13 @@ namespace evf {
293304 std::atomic<bool > has_source_exception_ = false ;
294305 std::atomic<bool > has_data_exception_ = false ;
295306 std::vector<unsigned int > exceptionInLS_;
296- std::vector<std::string> fastPathList_;
307+
308+ // per stream
309+ std::vector<ContainableAtomic<const void *>> microstate_;
310+ std::vector<ContainableAtomic<unsigned char >> microstateAcqFlag_;
311+ // per thread
312+ std::vector<ContainableAtomic<const void *>> tmicrostate_;
313+ std::vector<ContainableAtomic<unsigned char >> tmicrostateAcqFlag_;
297314
298315 bool verbose_ = false ;
299316 };
0 commit comments