NAGASH 0.9.8
Next Generation Analysis System
Loading...
Searching...
No Matches
Analysis.h
Go to the documentation of this file.
1//***************************************************************************************
4//***************************************************************************************
5
222#pragma once
223
224#include "NAGASH/Global.h"
225#include "NAGASH/ThreadPool.h"
226#include "NAGASH/ConfigTool.h"
227#include "NAGASH/LoopEvent.h"
228#include "NAGASH/ResultGroup.h"
229#include "NAGASH/Job.h"
230
231namespace NAGASH
232{
233
235 {
236 public:
237 Analysis();
238 Analysis(const Analysis &ana) = delete;
239 Analysis(Analysis &&ana) = delete;
240 Analysis &operator=(const Analysis &ana) = delete;
241 Analysis &operator=(Analysis &&ana) = delete;
242
243 Analysis(const ConfigTool &c);
244 Analysis(const char *configfilename);
245 ~Analysis();
246 void SetThreadNumber(int num);
247 std::shared_ptr<ConfigTool> ConfigUser();
248
249 // deal with eventloop
250 // for multi thread
251 template <typename LE>
252 std::shared_ptr<ResultGroup> ProcessEventLoop(const char *filelist, bool dosave = true);
253 template <typename LE>
254 std::shared_ptr<ResultGroup> SubmitEventLoop(const char *filelist, bool dosave = true);
255 // for multi process
256 template <typename LE>
257 std::shared_ptr<ResultGroup> ProcessEventLoopMainThread(const char *filename, bool dosave = true);
258
259 // deal with user defined functions
260 template <typename J, typename... Args>
261 std::shared_ptr<ResultGroup> SubmitJob(bool doSave, Args &&...args);
262 template <typename J, typename JS, typename... Args>
263 std::shared_ptr<ResultGroup> SubmitJob(bool doSave, Args &&...args);
264
265 std::shared_ptr<MSGTool> MSGUser();
266 void Join();
267
268 protected:
269 std::shared_ptr<ConfigTool> config;
270 std::vector<std::shared_ptr<ResultGroup>> ResultsToSave;
271 std::shared_ptr<Timer> TimerUser();
272
273 private:
275 std::vector<int> SubmittedJobCount;
277 std::shared_ptr<MSGTool> msg;
278 std::shared_ptr<Timer> timer;
279 std::shared_ptr<ThreadPool> pool;
280 std::vector<std::future<StatusCode>> vf;
281 void ConfigThreadPool();
282
283 template <typename F, typename... Args>
284 void ThreadPoolEnqueue(F &&f, Args &&...args);
285 };
286
287 inline std::shared_ptr<ConfigTool> Analysis::ConfigUser() { return this->config; }
288 inline std::shared_ptr<Timer> Analysis::TimerUser() { return timer; }
289 inline std::shared_ptr<MSGTool> Analysis::MSGUser() { return msg; }
290
299 template <typename LE>
300 std::shared_ptr<ResultGroup> Analysis::ProcessEventLoop(const char *filelist, bool dosave)
301 {
302 auto result = SubmitEventLoop<LE>(filelist, dosave);
303 Join();
304 return result;
305 }
306
315 template <typename LE>
316 std::shared_ptr<ResultGroup> Analysis::SubmitEventLoop(const char *filelist, bool dosave)
317 {
318 // check LE type
319 static_assert(!std::is_pointer<LE>::value, "Analysis::SubmitEventLoop : type can not be a pointer");
320 static_assert(std::is_base_of<LoopEvent, LE>::value, "Analysis::SubmitEventLoop : Input type must be or be inherited from LoopEvent class");
321
322 std::ifstream infile;
323 infile.open(filelist, std::ios::in);
324 if (!infile.is_open())
325 {
326 MSGUser()->StartTitle("Analysis::SubmitEventLoop");
327 MSGUser()->MSG_WARNING("file list ", filelist, " does not exist, returning a null result");
328 MSGUser()->EndTitle();
329 return nullptr;
330 }
331
332 auto origin = msg->OutputLevel;
333 bool existfileselection = false;
334 bool existfileselectionpattern = false;
335 msg->OutputLevel = MSGLevel::SILENT;
336 std::vector<int> fileselection = config->GetPar<std::vector<int>>("FileSelection", &existfileselection);
337 std::string fileselectionpattern(config->GetPar<TString>("FileSelectionPattern", &existfileselectionpattern).Data());
339 msg->OutputLevel = origin;
340
341 auto result = std::make_shared<ResultGroup>(this->msg, this->config);
342 std::string line;
343 int count = 0;
344 while (std::getline(infile, line))
345 {
346 ++count;
347
348 if (existfileselection && std::find(fileselection.begin(), fileselection.end(), count) == fileselection.end())
349 continue;
350
351 if (existfileselectionpattern && !std::regex_search(line, fileselectionregex))
352 continue;
353
354 // remove comment and check if the file exists
355 auto commentstart = line.find("#");
356 if (commentstart != std::string::npos)
357 line.erase(commentstart, line.size());
358
359 // remove the carriage return
360 if (line.back() == '\r')
361 line.pop_back();
362
363 int isexist = access(line.c_str(), F_OK);
364 if (isexist == 0)
365 {
366 SubmitJob<LoopEvent, LE>(false, result, TString(line.c_str()));
367 }
368 else if (line.size() != 0)
369 {
370 MSGUser()->StartTitle("Analysis::SubmitEventLoop");
371 MSGUser()->MSG_WARNING("file ", line, " does not exist, will skip this file");
372 MSGUser()->EndTitle();
373 }
374 }
375
376 if (dosave)
377 ResultsToSave.push_back(result);
378
379 infile.close();
380 return result;
381 }
382
391 template <typename LE>
392 inline std::shared_ptr<ResultGroup> Analysis::ProcessEventLoopMainThread(const char *filename, bool dosave)
393 {
394 // check LE type
395 static_assert(!std::is_pointer<LE>::value, "Analysis::SubmitEventLoop : type can not be a pointer");
396 static_assert(std::is_base_of<LoopEvent, LE>::value, "Analysis::SubmitEventLoop : Input type must be or be inherited from LoopEvent class");
397
399 configCopy.BookPar<int>("JobNumber", -1);
400 auto r = std::make_shared<ResultGroup>(this->msg, this->config);
401 LoopEvent::Process<LE>(configCopy, nullptr, r, filename);
402 return r;
403 }
404
415 template <typename J, typename... Args>
416 inline std::shared_ptr<ResultGroup> Analysis::SubmitJob(bool doSave, Args &&...args)
417 {
418 static_assert(!std::is_pointer<J>::value, "Analysis::SubmitJob : type can not be a pointer");
419 static_assert(std::is_base_of<Job, J>::value, "Analysis::SubmitJob : Input type must be or be inherited from Job class");
420
422 configCopy.BookPar<int>("JobNumber", SubmittedJobs);
423 // copy msg
424 configCopy.SetMSG(std::make_shared<MSGTool>());
425 configCopy.MSGUser()->OutputLevel = config->MSGUser()->OutputLevel;
426 configCopy.MSGUser()->DefinedLevel = config->MSGUser()->DefinedLevel;
427 configCopy.MSGUser()->FocusName = config->MSGUser()->FocusName;
428 configCopy.MSGUser()->isCreateLog = config->MSGUser()->isCreateLog;
429 configCopy.MSGUser()->isOpenLog = config->MSGUser()->isOpenLog;
430 configCopy.MSGUser()->LogFileName = config->MSGUser()->LogFileName;
431 configCopy.MSGUser()->FuncTitle = config->MSGUser()->FuncTitle;
432
433 auto r = std::make_shared<ResultGroup>(this->msg, this->config);
434 ThreadPoolEnqueue(J::Process, configCopy, r, std::forward<Args>(args)...);
435 if (doSave)
436 {
437 ResultsToSave.emplace_back(r);
438 }
439
440 return r;
441 }
442
454 template <typename J, typename JS, typename... Args> // Job::Process with template
455 std::shared_ptr<ResultGroup> Analysis::SubmitJob(bool doSave, Args &&...args) // use config from Analysis
456 {
457 static_assert(!std::is_pointer<J>::value, "Analysis::SubmitJob : type can not be a pointer");
458 static_assert(std::is_base_of<Job, J>::value, "Analysis::SubmitJob : Input type must be or be inherited from Job class");
459
461 configCopy.BookPar<int>("JobNumber", SubmittedJobs);
462 // copy msg
463 configCopy.SetMSG(std::make_shared<MSGTool>());
464 configCopy.MSGUser()->OutputLevel = config->MSGUser()->OutputLevel;
465 configCopy.MSGUser()->DefinedLevel = config->MSGUser()->DefinedLevel;
466 configCopy.MSGUser()->FocusName = config->MSGUser()->FocusName;
467 configCopy.MSGUser()->isCreateLog = config->MSGUser()->isCreateLog;
468 configCopy.MSGUser()->isOpenLog = config->MSGUser()->isOpenLog;
469 configCopy.MSGUser()->LogFileName = config->MSGUser()->LogFileName;
470 configCopy.MSGUser()->FuncTitle = config->MSGUser()->FuncTitle;
471
472 auto r = std::make_shared<ResultGroup>(this->msg, this->config);
473 ThreadPoolEnqueue(J::template Process<JS>, configCopy, r, std::forward<Args>(args)...);
474 if (doSave)
475 {
476 ResultsToSave.emplace_back(r);
477 }
478
479 return r;
480 }
481
487 template <typename F, typename... Args>
488 void Analysis::ThreadPoolEnqueue(F &&f, Args &&...args)
489 {
490 if (pool == nullptr)
491 {
492 msg->MSG(MSGLevel::WARNING, "Thread pool not defined, only one thread will be created");
493 pool = std::make_shared<ThreadPool>(1);
494 }
495
497 record.Form("(Submit Job %d)", SubmittedJobs);
498 TimerUser()->Record(record);
500 jobnum.Form("Submit job %d at ", SubmittedJobs);
501 TimerUser()->PrintCurrent(jobnum);
502 vf.emplace_back(pool->enqueue(f, std::forward<Args>(args)...));
505 }
506
507} // namespace NAGASH
std::string line
Some global definitions.
Provide multi-thread interface to manipulate with Job.
Definition Analysis.h:235
std::shared_ptr< ResultGroup > ProcessEventLoopMainThread(const char *filename, bool dosave=true)
Single-thread mode of processing the user defined event loop. The event loop will be run in the main ...
Definition Analysis.h:392
std::shared_ptr< Timer > TimerUser()
return the Timer
Definition Analysis.h:288
std::vector< std::shared_ptr< ResultGroup > > ResultsToSave
Definition Analysis.h:270
~Analysis()
destructor of the Analysis class.
Definition Analysis.cxx:73
Analysis()
Default constructor of the Analysis class.
Definition Analysis.cxx:39
void ConfigThreadPool()
Config the thread pool using the info from ConfigTool.
Definition Analysis.cxx:49
std::vector< int > SubmittedJobCount
Definition Analysis.h:275
Analysis(Analysis &&ana)=delete
std::shared_ptr< ResultGroup > SubmitEventLoop(const char *filelist, bool dosave=true)
Submit the user defined event loop. This function will not automatically call Join() after submit the...
Definition Analysis.h:316
Analysis & operator=(const Analysis &ana)=delete
void Join()
Join the threads in the thread pool.
Definition Analysis.cxx:97
std::shared_ptr< ResultGroup > SubmitJob(bool doSave, Args &&...args)
Submit the user defined job. The Analysis will process the submitted jobs in multi-thread mode.
Definition Analysis.h:416
std::shared_ptr< ConfigTool > config
Definition Analysis.h:269
Analysis(const Analysis &ana)=delete
void SetThreadNumber(int num)
Set the thread number of the thread pool. Can only be set once.
Definition Analysis.cxx:83
Analysis & operator=(Analysis &&ana)=delete
std::shared_ptr< ConfigTool > ConfigUser()
return the ConfigTool
Definition Analysis.h:287
std::shared_ptr< MSGTool > MSGUser()
return the MSGTool
Definition Analysis.h:289
std::shared_ptr< ResultGroup > ProcessEventLoop(const char *filelist, bool dosave=true)
Process the user defined event loop. This function will automatically call Join() after submit the ev...
Definition Analysis.h:300
void ThreadPoolEnqueue(F &&f, Args &&...args)
General function to enqueue a function into the thread pool inside Analysis class.
Definition Analysis.h:488
std::vector< std::future< StatusCode > > vf
Definition Analysis.h:280
std::shared_ptr< Timer > timer
Definition Analysis.h:278
std::shared_ptr< ThreadPool > pool
Definition Analysis.h:279
std::shared_ptr< MSGTool > msg
Definition Analysis.h:277
provide interface to config objects in NAGASH.
Definition ConfigTool.h:14