SourceXtractorPlusPlus
0.12
Please provide a description of the project.
Main Page
Related Pages
Namespaces
Classes
Files
File List
File Members
All
Classes
Namespaces
Files
Functions
Variables
Typedefs
Enumerations
Enumerator
Friends
Macros
Groups
Pages
SEImplementation
SEImplementation
Measurement
MultithreadedMeasurement.h
Go to the documentation of this file.
1
17
/*
18
* Multithreadedmeasurement->h
19
*
20
* Created on: May 17, 2018
21
* Author: mschefer
22
*/
23
24
#ifndef _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
25
#define _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
26
27
#include <atomic>
28
#include <thread>
29
#include <mutex>
30
#include <condition_variable>
31
#include <atomic>
32
33
#include "
SEFramework/Pipeline/Measurement.h
"
34
35
namespace
SourceXtractor {
36
37
class
MultithreadedMeasurement
:
public
Measurement
{
38
public
:
39
40
using
SourceToRowConverter
=
std::function<Euclid::Table::Row(const SourceInterface&)>
;
41
MultithreadedMeasurement
(
SourceToRowConverter
source_to_row,
int
worker_threads_nb)
42
:
m_source_to_row
(source_to_row),
43
m_worker_threads_nb
(worker_threads_nb),
44
m_active_threads
(0),
45
m_group_counter
(0),
46
m_input_done
(false),
m_abort_raised
(false) {}
47
48
void
handleMessage
(
const
std::shared_ptr<SourceGroupInterface>
& source_group)
override
;
49
50
void
startThreads
()
override
;
51
void
waitForThreads
()
override
;
52
53
public
:
54
static
std::recursive_mutex
g_global_mutex
;
55
56
private
:
57
static
void
workerThreadStatic
(
MultithreadedMeasurement
* measurement,
int
id
);
58
static
void
outputThreadStatic
(
MultithreadedMeasurement
* measurement,
int
id
);
59
void
workerThreadLoop
();
60
void
outputThreadLoop
();
61
62
SourceToRowConverter
m_source_to_row
;
63
64
std::shared_ptr<std::thread>
m_output_thread
;
65
66
int
m_worker_threads_nb
;
67
std::vector<std::shared_ptr<std::thread>
>
m_worker_threads
;
68
69
int
m_active_threads
;
70
std::mutex
m_active_threads_mutex
;
71
72
int
m_group_counter
;
73
std::atomic_bool
m_input_done
,
m_abort_raised
;
74
std::condition_variable
m_new_input
;
75
std::list<std::pair<int, std::shared_ptr<SourceGroupInterface>
>>
m_input_queue
;
76
std::mutex
m_input_queue_mutex
;
77
78
std::condition_variable
m_new_output
;
79
std::list<std::pair<int, std::shared_ptr<SourceGroupInterface>
>>
m_output_queue
;
80
std::mutex
m_output_queue_mutex
;
81
};
82
83
}
84
85
#endif
/* _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_ */
SourceXtractor::MultithreadedMeasurement::m_active_threads
int m_active_threads
Definition:
MultithreadedMeasurement.h:69
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:62
std::shared_ptr< SourceGroupInterface >
SourceXtractor::MultithreadedMeasurement::m_input_queue_mutex
std::mutex m_input_queue_mutex
Definition:
MultithreadedMeasurement.h:76
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:73
std::function< Euclid::Table::Row(const SourceInterface &)>
SourceXtractor::MultithreadedMeasurement::m_input_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_input_queue
Definition:
MultithreadedMeasurement.h:75
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::shared_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:64
SourceXtractor::MultithreadedMeasurement::m_new_input
std::condition_variable m_new_input
Definition:
MultithreadedMeasurement.h:74
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement, int id)
Definition:
MultithreadedMeasurement.cpp:100
std::condition_variable
SourceXtractor::MultithreadedMeasurement::MultithreadedMeasurement
MultithreadedMeasurement(SourceToRowConverter source_to_row, int worker_threads_nb)
Definition:
MultithreadedMeasurement.h:41
SourceXtractor::MultithreadedMeasurement::waitForThreads
void waitForThreads() override
Definition:
MultithreadedMeasurement.cpp:50
SourceXtractor::MultithreadedMeasurement::m_worker_threads
std::vector< std::shared_ptr< std::thread > > m_worker_threads
Definition:
MultithreadedMeasurement.h:67
std::mutex
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:78
Measurement.h
std::recursive_mutex
SourceXtractor::Measurement
Definition:
Measurement.h:34
std::list
STL class.
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:79
std::vector
STL class.
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:72
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:80
SourceXtractor::MultithreadedMeasurement::handleMessage
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
Definition:
MultithreadedMeasurement.cpp:69
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:37
SourceXtractor::MultithreadedMeasurement::workerThreadStatic
static void workerThreadStatic(MultithreadedMeasurement *measurement, int id)
Definition:
MultithreadedMeasurement.cpp:84
SourceXtractor::MultithreadedMeasurement::m_active_threads_mutex
std::mutex m_active_threads_mutex
Definition:
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::g_global_mutex
static std::recursive_mutex g_global_mutex
Definition:
MultithreadedMeasurement.h:54
SourceXtractor::MultithreadedMeasurement::workerThreadLoop
void workerThreadLoop()
Definition:
MultithreadedMeasurement.cpp:116
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:73
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:39
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:159
SourceXtractor::MultithreadedMeasurement::m_worker_threads_nb
int m_worker_threads_nb
Definition:
MultithreadedMeasurement.h:66
Generated by
1.8.5