SourceXtractorPlusPlus  0.15
Please provide a description of the project.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Prefetcher.cpp
Go to the documentation of this file.
1 
18 #include <ElementsKernel/Logging.h>
21 
23 
24 
25 namespace SourceXtractor {
26 
30 template<typename Lock>
31 struct ReverseLock {
32  ReverseLock(Lock& lock) : m_lock(lock) {
33  m_lock.unlock();
34  }
35 
37  m_lock.lock();
38  }
39 
40 private:
41  Lock& m_lock;
42 };
43 
45  : m_thread_pool(thread_pool), m_stop(false) {
46  m_output_thread = Euclid::make_unique<std::thread>(&Prefetcher::outputLoop, this);
47 }
48 
50  if (m_output_thread->joinable())
51  wait();
52 }
53 
55  intptr_t source_addr = reinterpret_cast<intptr_t>(message.get());
56  {
58  m_received.emplace_back(EventType::SOURCE, source_addr);
59  }
60 
61  // Pre-fetch in separate threads
62  m_thread_pool->submit([this, source_addr, message]() {
63  for (auto& prop : m_prefetch_set) {
64  message->getProperty(prop);
65  }
66  {
68  m_finished_sources.emplace(source_addr, message);
69  }
71  });
72 }
73 
74 void Prefetcher::requestProperty(const PropertyId& property_id) {
75  m_prefetch_set.emplace(property_id);
76  logger.debug() << "Requesting prefetch of " << property_id.getString();
77 }
78 
80  logger.debug() << "Starting prefetcher output loop";
81 
82  while (m_thread_pool->activeThreads() > 0) {
84 
85  // Wait for something new
87 
88  // Process the output queue
89  // This is, release sources when the front of the received has been processed
90  while (!m_received.empty()) {
91  auto next = m_received.front();
92  // If the front is a ProcessSourceEvent, everything received before is done,
93  // so pass downstream
94  if (next.m_event_type == EventType::PROCESS_SOURCE) {
95  auto event = m_event_queue.front();
96  m_event_queue.pop_front();
97  logger.debug() << "ProcessSourceEvent released";
98  {
99  ReverseLock<decltype(output_lock)> release_lock(output_lock);
101  }
102  m_received.pop_front();
103  continue;
104  }
105  // Find if the matching source is done
106  auto processed = m_finished_sources.find(next.m_source_addr);
107  // If not, we can't keep going, so exit here
108  if (processed == m_finished_sources.end()) {
109  logger.debug() << "Next source " << next.m_source_addr << " not done yet";
110  break;
111  }
112  // If it is, send it downstream
113  logger.debug() << "Source " << next.m_source_addr << " sent downstream";
114  {
115  ReverseLock<decltype(output_lock)> release_lock(output_lock);
117  }
118  m_finished_sources.erase(processed);
119  m_received.pop_front();
120  }
121 
122  if (m_stop && m_received.empty()) {
123  break;
124  }
125  }
126  logger.debug() << "Stopping prefetcher output loop";
127 }
128 
130  {
133  m_event_queue.emplace_back(message);
134  }
136  logger.debug() << "ProcessSourceEvent received";
137 }
138 
140  m_stop = true;
141  m_output_thread->join();
142 }
143 
144 } // end of namespace SourceXtractor
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition: Prefetcher.h:109
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order...
Definition: Prefetcher.h:117
void notifyObservers(const T &message) const
Definition: Observable.h:71
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition: Prefetcher.h:107
std::atomic_bool m_stop
Termination condition for the output loop.
Definition: Prefetcher.h:122
Event received by SourceGrouping to request the processing of some of the Sources stored...
void requestProperty(const PropertyId &property_id)
Definition: Prefetcher.cpp:74
static Elements::Logging logger
Definition: Prefetcher.cpp:22
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition: Prefetcher.h:115
void debug(const std::string &logMessage)
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition: Prefetcher.h:111
Implements the Observer pattern. Notifications will be made using a message of type T...
Definition: Observable.h:51
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool)
Definition: Prefetcher.cpp:44
T next(T...args)
std::map< intptr_t, std::shared_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition: Prefetcher.h:113
T lock(T...args)
T get(T...args)
Identifier used to set and retrieve properties.
Definition: PropertyId.h:40
void handleMessage(const std::shared_ptr< SourceInterface > &message) override
Definition: Prefetcher.cpp:54
std::string getString() const
Definition: PropertyId.cpp:36
static Logging getLogger(const std::string &name="")
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition: Prefetcher.h:105