SourceXtractorPlusPlus  0.13
Please provide a description of the project.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1 
17 /*
18  * MultiThreadedMeasurement.cpp
19  *
20  * Created on: May 23, 2018
21  * Author: mschefer
22  */
23 
24 #include <chrono>
25 #include <ElementsKernel/Logging.h>
26 #include <csignal>
27 
30 
31 using namespace SourceXtractor;
32 
34 
36 
38  m_output_thread = Euclid::make_unique<std::thread>(outputThreadStatic, this);
39 }
40 
42  m_input_done = true;
43  m_thread_pool->block();
44  m_output_thread->join();
45  logger.debug() << "All worker threads done!";
46 }
47 
48 void
50  // Force computation of SourceID here, where the order is still deterministic
51  for (auto& source : *source_group) {
52  source.getProperty<SourceID>();
53  }
54 
55  // Put the new SourceGroup into the input queue
56  auto order_number = m_group_counter;
57  m_thread_pool->submit([this, order_number, source_group]() {
58  // Trigger measurements
59  for (auto& source : *source_group) {
60  m_source_to_row(source);
61  }
62  // Pass to the output thread
63  {
65  m_output_queue.emplace_back(order_number, source_group);
66  }
68  });
70 }
71 
73  logger.debug() << "Starting output thread";
74  try {
75  measurement->outputThreadLoop();
76  }
77  catch (const Elements::Exception& e) {
78  logger.fatal() << "Output thread got an exception!";
79  logger.fatal() << e.what();
80  if (!measurement->m_abort_raised.exchange(true)) {
81  logger.fatal() << "Aborting the execution";
82  ::raise(SIGTERM);
83  }
84  }
85  logger.debug() << "Stopping output thread";
86 }
87 
89  while (true) {
91 
92  // Wait for something in the output queue
93  if (m_output_queue.empty()) {
95  }
96 
97  // Process the output queue
98  while (!m_output_queue.empty()) {
99  notifyObservers(m_output_queue.front().second);
100  m_output_queue.pop_front();
101  }
102 
103  if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
104  m_output_queue.empty()) {
105  break;
106  }
107  }
108 }
static auto logger
Definition: WCS.cpp:46
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition: Observable.h:71
constexpr double e
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
std::unique_ptr< std::thread > m_output_thread
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
const char * what() const noexceptoverride
static Logging getLogger(const std::string &name="")
static void outputThreadStatic(MultithreadedMeasurement *measurement)