Lockless Task Scheduler  v1.0a
A lockless task scheduler
threadpool.h
1 // ***********************************************************************
2 // Assembly : task_scheduler
3 // Author : viknash
4 // ***********************************************************************
5 // <copyright file="threadpool.h" >
6 // Copyright (c) viknash. All rights reserved.
7 // </copyright>
8 // <summary></summary>
9 // ***********************************************************************
10 #pragma once
11 
12 #include <algorithm>
13 #include <atomic>
14 #include <condition_variable>
15 #include <iostream>
16 
17 #include "containers.h"
18 #include "globals.h"
19 #include "meta.h"
20 #include "profile.h"
21 
25 namespace task_scheduler
26 {
27 
28  template < class TMemInterface > class base_task;
29  template < class TMemInterface > class base_task_graph;
30  template < class TMemInterface > class base_thread_pool;
31  template < class TMemInterface > struct base_thread;
32 
36  template < class TMemInterface > class base_thread_pool
37  {
38  typedef base_thread< TMemInterface > thread_type;
39  typedef base_task_graph< TMemInterface > task_graph_type;
40  typedef base_thread_pool< TMemInterface > thread_pool;
41  typedef typename task_graph_type::task_type task_type;
42  typedef typename task_graph_type::task_queue_type task_queue_type;
43  typedef typename task_graph_type::task_memory_allocator_type task_memory_allocator_type;
44  typedef typename thread_type::thread_index_type thread_index_type;
45 
46  public:
51  {
52  run = 0,
53  request_pause,
54  request_stop
55  };
56 
61  {
65  std::mutex signal;
69  std::condition_variable radio;
73  std::atomic_uint32_t thread_sync;
77  std::atomic< state_selector > request_exit;
78  };
79 
80  public:
85  base_thread_pool(thread_num_t _num_threads = max_num_threads);
90  void start(task_graph_type &task_graph);
94  void stop();
99  void wake_up(thread_num_t _num_threads_to_wake_up = max_num_threads, uint64_t _thread_affinity_mask = std::numeric_limits<uint64_t>::max());
104  thread_type *get_current_thread();
105 
113  thread_num_t num_threads;
117  task_graph_type *task_graph;
118  thread_type *threads[max_num_threads];
122  task_memory_allocator_type task_memory_allocator;
126  std::atomic< uint32_t > num_working;
127 
128  optimization std::atomic< typename task_type::rank_type > queue_rank[task_type::num_priority][max_num_threads];
129  };
130 
131  template < class TMemInterface >
133  : num_threads(
134  std::min(std::min(static_cast< thread_num_t >(std::thread::hardware_concurrency()), max_num_threads),
135  _num_threads))
136  , task_graph(nullptr)
137  {
138  memset(threads, 0, sizeof(threads));
139  for (auto &priority_queue : queue_rank)
140  {
141  for (auto &rank : priority_queue)
142  {
143  rank.store(0);
144  }
145  }
146  }
147 
148  template < class TMemInterface > void base_thread_pool< TMemInterface >::start(task_graph_type &_task_graph)
149  {
150  task_graph = &_task_graph;
151  setup.thread_sync.store(uint32_t(num_threads));
152 
153  for (uint32_t thread_idx = 0; thread_idx < num_threads; thread_idx++)
154  {
155  threads[thread_idx] = thread_type::create_thread(this);
156  }
157 
158  // Wait until all threads are started
159  while (setup.thread_sync != 0)
160  {
161  std::unique_lock< std::mutex > signal(setup.signal);
162  setup.radio.wait(signal);
163  }
164 
165  // kick all sleeping threads
166  for (uint32_t iterations = 0; iterations < num_threads; ++iterations)
167  {
168  threads[iterations]->start();
169  }
170  }
171 
172  template < class TMemInterface > void base_thread_pool< TMemInterface >::stop()
173  {
174  setup.thread_sync = (uint32_t)task_graph->persistent.sub_graphs.size();
175  setup.request_exit.store(request_pause);
176 
177  for (uint32_t thread_idx = 0; thread_idx < num_threads; thread_idx++)
178  {
179  threads[thread_idx]->join();
180  }
181 
182  for (uint32_t thread_idx = 0; thread_idx < num_threads; thread_idx++)
183  {
184  delete threads[thread_idx];
185  }
186 
187  memset(threads, 0, sizeof(threads));
188  }
189 
190  template < class TMemInterface >
191  void base_thread_pool< TMemInterface >::wake_up(thread_num_t _num_threads_to_wake_up, uint64_t _thread_affinity_mask)
192  {
193  _num_threads_to_wake_up = std::min(num_threads, _num_threads_to_wake_up);
194  reduce_starvation(always_different_thread_woken_up_first) static thread_index_type next_thread_index(this, 0);
195  for (uint32_t iterations = 0, woke_up = 0; woke_up < _num_threads_to_wake_up && iterations < num_threads; ++next_thread_index, ++iterations)
196  {
197  if (!next_thread_index.is_set(_thread_affinity_mask))
198  continue; // Skip threads the task should not run on
199  threads[next_thread_index]->wake_up();
200  ++woke_up;
201  }
202 
203  reduce_starvation(always_different_thread_woken_up_first)++ next_thread_index;
204  }
205 
206  template < class TMemInterface >
208  {
209  return task_scheduler::get_current_thread< typename thread_pool::thread_type >();
210  }
211 };
void wake_up(thread_num_t _num_threads_to_wake_up=max_num_threads, uint64_t _thread_affinity_mask=std::numeric_limits< uint64_t >::max())
Wakes up.
Definition: threadpool.h:191
std::mutex signal
The signal
Definition: threadpool.h:65
std::condition_variable radio
The radio
Definition: threadpool.h:69
std::atomic< uint32_t > num_working
The number working
Definition: threadpool.h:126
task_graph_type * task_graph
The task graph
Definition: threadpool.h:117
std::atomic_uint32_t thread_sync
The thread synchronize
Definition: threadpool.h:73
const thread_num_t max_num_threads
The maximum number threads
Definition: globals.h:29
void wake_up()
Wakes up.
Definition: thread.h:206
Struct setup_container
Definition: threadpool.h:60
persistent_container persistent
The persistent
Definition: taskgraph.h:273
thread_type * get_current_thread()
Gets the current thread.
Definition: threadpool.h:207
Class stl_allocator.
Definition: allocator.h:16
void join()
Joins this instance.
Definition: thread.h:211
bool is_set(thread_mask_int_t _other_mask)
Determines whether the specified other mask is set.
Definition: types.h:195
std::atomic< state_selector > request_exit
The request exit
Definition: threadpool.h:77
setup_container setup
The setup
Definition: threadpool.h:109
sub_graph_vector sub_graphs
The sub graphs
Definition: taskgraph.h:104
task_memory_allocator_type task_memory_allocator
The threads
Definition: threadpool.h:122
void stop()
Stops this instance.
Definition: threadpool.h:172
Struct base_thread
Definition: task.h:37
thread_num_t num_threads
The number threads
Definition: threadpool.h:113
void start(task_graph_type &task_graph)
Starts the specified task graph.
Definition: threadpool.h:148
state_selector
Enum state_selector
Definition: threadpool.h:50
static thread_type * create_thread(thread_pool *_pool)
Creates the thread.
Definition: thread.h:168
base_thread_pool(thread_num_t _num_threads=max_num_threads)
Initializes a new instance of the base_thread_pool class.
Definition: threadpool.h:132