12 #include <condition_variable> 15 #include "containers.h" 19 #include "concurrency.h" 27 template <
class TMemInterface >
class base_task;
28 template <
class TMemInterface >
class base_task_graph;
29 template <
class TMemInterface >
class base_thread_pool;
34 template <
class TMemInterface >
struct base_thread :
public TMemInterface
36 typedef base_task_graph< TMemInterface > task_graph_type;
37 typedef typename base_task< TMemInterface >::task_type task_type;
38 typedef typename task_graph_type::task_queue_type task_queue_type;
39 typedef base_thread< TMemInterface > thread_type;
40 typedef base_thread_pool< TMemInterface > thread_pool;
41 typedef typename task_graph_type::task_memory_allocator_type task_memory_allocator_type;
42 typedef thread_index_t< TMemInterface > thread_index_type;
43 typedef typename task_graph_type::task_vector task_vector;
50 base_thread(thread_num_t _thread_index, thread_pool *_pool);
67 void sleep(
bool (thread_type::*_wake_up)());
77 task_queue_type *task_queue[task_type::num_priority];
95 friend class std::thread;
115 bool is_task_available();
120 task_type *get_task();
133 std::condition_variable radio;
139 task_vector temporary_task_list;
142 template <
class TMemInterface >
147 for (
auto &queue : task_queue)
160 for (
auto &queue : task_queue)
167 template <
class TMemInterface >
170 static thread_num_t next_thread_num = 0;
171 return new thread_type(next_thread_num++, _pool);
178 tostringstream stringStream;
201 return ((this->*_wake_up)() || pool.
setup.
request_exit == thread_pool::request_stop);
215 for (
auto queue : task_queue)
229 profile::time scheduling(0ms), sleeping(0ms), working(0ms);
233 while (pool.setup.request_exit != thread_pool::request_stop)
236 task_type *run_task = profile::instrument< task_type *, thread_type, task_type *(thread_type::*)() >(
237 scheduling,
this, &thread_type::get_task);
242 profile::time task_time(0ms);
246 profile::instrument<
bool, task_type,
bool (task_type::*)() >(task_time, run_task, &task_type::operator()))
252 << chrono::duration_cast< chrono::milliseconds >(task_time).count() <<
"ms)");
253 working += task_time;
258 profile::instrument< void, task_type, void (task_type::*)() >(scheduling, run_task,
262 else if (!is_task_available())
265 profile::instrument< void, thread_type, void (thread_type::*)(bool (thread_type::*)()) >(
270 assert(!is_task_available());
272 auto scheduling_ms = std::chrono::duration_cast< std::chrono::milliseconds >(scheduling);
273 auto sleeping_ms = std::chrono::duration_cast< std::chrono::milliseconds >(sleeping);
274 auto working_ms = std::chrono::duration_cast< std::chrono::milliseconds >(working);
275 auto scheduling_ratio = double(working.count() / (scheduling.count() == 0 ? 1 : scheduling.count()));
276 ts_print(
"Complete, Scheduling Overhead=" << scheduling_ms.count() <<
"ms, sleep Time=" << sleeping_ms.count()
277 <<
"ms, Work Time=" << working_ms.count()
278 <<
"ms, Work/Schedule Ratio=" << fixed << setprecision(0)
279 << scheduling_ratio);
282 template <
class TMemInterface >
285 task_type *next_task =
nullptr;
286 for (uint32_t priority = 0; priority < task_type::num_priority; priority++)
289 if (task_queue[priority]->pop_front(next_task))
306 thread_index_type current_thread_index =
thread_index - 1;
309 task_type *stolen_task =
nullptr;
310 bool touched_thread =
false;
313 if (pool.threads[current_thread_index]->task_queue[priority]->pop_front(stolen_task))
315 touched_thread =
true;
319 next_task = stolen_task;
321 << uint32_t(current_thread_index));
327 temporary_task_list.push_back(stolen_task);
330 }
while (!pool.threads[current_thread_index]->task_queue[priority]->empty());
332 for (
auto task_ptr : temporary_task_list)
335 pool.threads[current_thread_index]->task_queue[priority]->push_back(task_ptr);
337 temporary_task_list.clear();
344 pool.threads[current_thread_index]->wake_up();
352 --current_thread_index;
std::condition_variable radio
The radio
Definition: threadpool.h:69
transient_container transient
The transient
Definition: task.h:259
~base_thread()
Finalizes an instance of the base_thread<TMemInterface> class.
Definition: thread.h:158
task_graph_type * task_graph
The task graph
Definition: threadpool.h:117
std::atomic_uint32_t thread_sync
The thread synchronize
Definition: threadpool.h:73
void wake_up()
Wakes up.
Definition: thread.h:206
task_graph_type & task_graph
The task graph
Definition: task.h:267
std::atomic_int64_t num_working
The number working
Definition: task.h:130
void sleep(bool(thread_type::*_wake_up)())
Sleeps the specified wake up.
Definition: thread.h:196
Class stl_allocator.
Definition: allocator.h:16
task_type * dequeue_task(uint32_t _priority)
Dequeues the task.
Definition: taskgraph.h:616
thread_index_type thread_index
The task queue
Definition: thread.h:81
base_thread(thread_num_t _thread_index, thread_pool *_pool)
Initializes a new instance of the base_thread<TMemInterface> struct.
Definition: thread.h:143
void join()
Joins this instance.
Definition: thread.h:211
bool is_set(thread_mask_int_t _other_mask)
Determines whether the specified other mask is set.
Definition: types.h:195
std::atomic< state_selector > request_exit
The request exit
Definition: threadpool.h:77
task_memory_allocator_type allocator
The allocator
Definition: thread.h:93
std::thread::id thread_id
The thread identifier
Definition: thread.h:85
uint64_t thread_affinity
The thread affinity
Definition: task.h:186
setup_container setup
The setup
Definition: threadpool.h:109
thread_local void * current_thread
The current thread
Definition: globals.h:76
Definition: concurrency.h:51
Struct base_thread
Definition: task.h:37
string_type task_name
The task name
Definition: task.h:94
void kick_dependent_tasks()
Kicks the dependent tasks.
Definition: task.h:366
std::thread task_thread
The task thread
Definition: thread.h:89
thread_local tstring thread_name
The thread name
Definition: globals.h:68
Class base_task.
Definition: task.h:44
debug_container debug
The debug
Definition: task.h:255
static thread_type * create_thread(thread_pool *_pool)
Creates the thread.
Definition: thread.h:168
Class base_thread_pool.
Definition: task.h:36
rank_type rank
The rank
Definition: task.h:182
bool is_task_available()
Determines whether [is task available].
Definition: taskgraph.h:623
persistent_container persistent
The persistent
Definition: task.h:263