Lockless Task Scheduler  v1.0a
A lockless task scheduler
thread.h
1 // ***********************************************************************
2 // Assembly : task_scheduler
3 // Author : viknash
4 // ***********************************************************************
5 // <copyright file="thread.h" >
6 // Copyright (c) viknash. All rights reserved.
7 // </copyright>
8 // <summary></summary>
9 // ***********************************************************************
10 #pragma once
11 
12 #include <condition_variable>
13 #include <iomanip>
14 
15 #include "containers.h"
16 #include "globals.h"
17 #include "print.h"
18 #include "profile.h"
19 #include "concurrency.h"
20 
24 namespace task_scheduler
25 {
26 
27  template < class TMemInterface > class base_task;
28  template < class TMemInterface > class base_task_graph;
29  template < class TMemInterface > class base_thread_pool;
30 
34  template < class TMemInterface > struct base_thread : public TMemInterface
35  {
36  typedef base_task_graph< TMemInterface > task_graph_type;
37  typedef typename base_task< TMemInterface >::task_type task_type;
38  typedef typename task_graph_type::task_queue_type task_queue_type;
39  typedef base_thread< TMemInterface > thread_type;
40  typedef base_thread_pool< TMemInterface > thread_pool;
41  typedef typename task_graph_type::task_memory_allocator_type task_memory_allocator_type;
42  typedef thread_index_t< TMemInterface > thread_index_type;
43  typedef typename task_graph_type::task_vector task_vector;
44 
50  base_thread(thread_num_t _thread_index, thread_pool *_pool);
54  ~base_thread();
55 
61  static thread_type *create_thread(thread_pool *_pool);
62 
67  void sleep(bool (thread_type::*_wake_up)());
71  void wake_up();
75  void join();
76 
77  task_queue_type *task_queue[task_type::num_priority];
81  thread_index_type thread_index;
85  std::thread::id thread_id;
89  std::thread task_thread;
93  task_memory_allocator_type allocator;
94 
95  friend class std::thread;
96  template < class TMemInterface > friend class base_thread_pool;
97 
98  private:
102  void init();
106  void start();
110  void run();
115  bool is_task_available();
120  task_type *get_task();
121 
125  thread_pool &pool;
129  std::mutex signal;
133  std::condition_variable radio;
134 
135  event sync_point;
136 
137  alarm execution;
138 
139  task_vector temporary_task_list;
140  };
141 
142  template < class TMemInterface >
143  base_thread< TMemInterface >::base_thread(thread_num_t _thread_index, thread_pool *_pool)
144  : thread_index(_pool, _thread_index)
145  , pool(*_pool)
146  {
147  for (auto &queue : task_queue)
148  {
149  queue = new task_queue_type(&allocator);
150  }
151 
152  task_thread = std::thread([&] {
153  init();
154  run();
155  });
156  }
157 
158  template < class TMemInterface > base_thread< TMemInterface >::~base_thread()
159  {
160  for (auto &queue : task_queue)
161  {
162  delete queue;
163  queue = nullptr;
164  }
165  }
166 
167  template < class TMemInterface >
169  {
170  static thread_num_t next_thread_num = 0;
171  return new thread_type(next_thread_num++, _pool);
172  }
173 
174  template < class TMemInterface > void base_thread< TMemInterface >::init()
175  {
176  thread_id = std::this_thread::get_id();
177  current_thread = this;
178  tostringstream stringStream;
179  stringStream << uint32_t(thread_index);
180  thread_name = stringStream.str();
181  //profile::thread::set_name(thread_name.c_str());
182 
183  // Signal thread_type has started
184  --pool.setup.thread_sync;
185  pool.setup.radio.notify_one();
186 
187  // wake_up when all threads have started
188  sync_point.wait();
189  }
190 
191  template < class TMemInterface > void base_thread< TMemInterface >::start()
192  {
193  sync_point.signal();
194  }
195 
196  template < class TMemInterface > void base_thread< TMemInterface >::sleep(bool (thread_type::*_wake_up)())
197  {
198  execution.sleep(
199  [&]()->bool
200  {
201  return ((this->*_wake_up)() || pool.setup.request_exit == thread_pool::request_stop);
202  }
203  );
204  }
205 
206  template < class TMemInterface > void base_thread< TMemInterface >::wake_up()
207  {
208  execution.wake_up();
209  }
210 
211  template < class TMemInterface > void base_thread< TMemInterface >::join() { task_thread.join(); }
212 
213  template < class TMemInterface > bool base_thread< TMemInterface >::is_task_available()
214  {
215  for (auto queue : task_queue)
216  {
217  if (!queue->empty())
218  {
219  return true;
220  }
221  }
222 
223  return pool.task_graph->is_task_available();
224  }
225 
226  template < class TMemInterface > void base_thread< TMemInterface >::run()
227  {
228  using namespace std;
229  profile::time scheduling(0ms), sleeping(0ms), working(0ms);
230 
231  ts_print("ready");
232 
233  while (pool.setup.request_exit != thread_pool::request_stop)
234  {
235  // Steal task_type
236  task_type *run_task = profile::instrument< task_type *, thread_type, task_type *(thread_type::*)() >(
237  scheduling, this, &thread_type::get_task);
238 
239  if (run_task)
240  {
241  // Run task_type
242  profile::time task_time(0ms);
243  ++pool.num_working;
244  ++run_task->transient.num_working;
245  while (
246  profile::instrument< bool, task_type, bool (task_type::*)() >(task_time, run_task, &task_type::operator()))
247  {
248  };
249  --run_task->transient.num_working;
250  --pool.num_working;
251  ts_print("run " << run_task->debug.task_name << "("
252  << chrono::duration_cast< chrono::milliseconds >(task_time).count() << "ms)");
253  working += task_time;
254 
255  if (run_task->transient.num_working == 0)
256  {
257  // Donate More Tasks
258  profile::instrument< void, task_type, void (task_type::*)() >(scheduling, run_task,
260  }
261  }
262  else if (!is_task_available())
263  {
264  // Go to sleep if there is no task to run
265  profile::instrument< void, thread_type, void (thread_type::*)(bool (thread_type::*)()) >(
266  sleeping, this, &thread_type::sleep, &thread_type::is_task_available);
267  }
268  };
269 
270  assert(!is_task_available());
271 
272  auto scheduling_ms = std::chrono::duration_cast< std::chrono::milliseconds >(scheduling);
273  auto sleeping_ms = std::chrono::duration_cast< std::chrono::milliseconds >(sleeping);
274  auto working_ms = std::chrono::duration_cast< std::chrono::milliseconds >(working);
275  auto scheduling_ratio = double(working.count() / (scheduling.count() == 0 ? 1 : scheduling.count()));
276  ts_print("Complete, Scheduling Overhead=" << scheduling_ms.count() << "ms, sleep Time=" << sleeping_ms.count()
277  << "ms, Work Time=" << working_ms.count()
278  << "ms, Work/Schedule Ratio=" << fixed << setprecision(0)
279  << scheduling_ratio);
280  }
281 
282  template < class TMemInterface >
284  {
285  task_type *next_task = nullptr;
286  for (uint32_t priority = 0; priority < task_type::num_priority; priority++)
287  {
288  // Try to get a task from the thread queue
289  if (task_queue[priority]->pop_front(next_task))
290  {
291  break;
292  }
293 
294  // Try to get a new manually scheduled task from the global queue
295  next_task = pool.task_graph->dequeue_task(priority);
296  if (next_task)
297  {
298  pool.queue_rank[priority][thread_index].fetch_add(next_task->persistent.rank);
299  break;
300  }
301 
302  // Try to steal task from other threads, starting from the previous thread
303  // In practice the spinning of the lock free queue, in a way locks the current thread we are trying to steal
304  // from
305  // Disabling thread affinities will make the stealing much more efficient
306  thread_index_type current_thread_index = thread_index - 1;
307  do
308  {
309  task_type *stolen_task = nullptr;
310  bool touched_thread = false;
311  do
312  {
313  if (pool.threads[current_thread_index]->task_queue[priority]->pop_front(stolen_task))
314  {
315  touched_thread = true;
316  if (current_thread_index.is_set(stolen_task->persistent.thread_affinity))
317  {
318  // Steal task
319  next_task = stolen_task;
320  ts_print("stole " << next_task->debug.task_name << " <- "
321  << uint32_t(current_thread_index));
322  break;
323  }
324  else
325  {
326  // Steal wrong task
327  temporary_task_list.push_back(stolen_task);
328  }
329  }
330  } while (!pool.threads[current_thread_index]->task_queue[priority]->empty());
331 
332  for (auto task_ptr : temporary_task_list)
333  {
334  // Return tasks
335  pool.threads[current_thread_index]->task_queue[priority]->push_back(task_ptr);
336  }
337  temporary_task_list.clear();
338 
339  // We have to wakeup the thread as long as its task queue is touched
340  // There could be task that was stolen and returned, but the thread went to sleep during the theft and
341  // before the return
342  if (touched_thread)
343  {
344  pool.threads[current_thread_index]->wake_up();
345  }
346 
347  if (next_task)
348  {
349  break;
350  }
351 
352  --current_thread_index;
353  } while (current_thread_index != thread_index);
354 
355  if (next_task)
356  {
357  break;
358  }
359  }
360 
361  return next_task;
362  }
363 };
std::condition_variable radio
The radio
Definition: threadpool.h:69
transient_container transient
The transient
Definition: task.h:259
~base_thread()
Finalizes an instance of the base_thread<TMemInterface> class.
Definition: thread.h:158
task_graph_type * task_graph
The task graph
Definition: threadpool.h:117
std::atomic_uint32_t thread_sync
The thread synchronize
Definition: threadpool.h:73
void wake_up()
Wakes up.
Definition: thread.h:206
task_graph_type & task_graph
The task graph
Definition: task.h:267
std::atomic_int64_t num_working
The number working
Definition: task.h:130
void sleep(bool(thread_type::*_wake_up)())
Sleeps the specified wake up.
Definition: thread.h:196
Class stl_allocator.
Definition: allocator.h:16
task_type * dequeue_task(uint32_t _priority)
Dequeues the task.
Definition: taskgraph.h:616
thread_index_type thread_index
The task queue
Definition: thread.h:81
base_thread(thread_num_t _thread_index, thread_pool *_pool)
Initializes a new instance of the base_thread<TMemInterface> struct.
Definition: thread.h:143
void join()
Joins this instance.
Definition: thread.h:211
bool is_set(thread_mask_int_t _other_mask)
Determines whether the specified other mask is set.
Definition: types.h:195
std::atomic< state_selector > request_exit
The request exit
Definition: threadpool.h:77
task_memory_allocator_type allocator
The allocator
Definition: thread.h:93
std::thread::id thread_id
The thread identifier
Definition: thread.h:85
uint64_t thread_affinity
The thread affinity
Definition: task.h:186
setup_container setup
The setup
Definition: threadpool.h:109
thread_local void * current_thread
The current thread
Definition: globals.h:76
Definition: concurrency.h:51
Struct base_thread
Definition: task.h:37
string_type task_name
The task name
Definition: task.h:94
void kick_dependent_tasks()
Kicks the dependent tasks.
Definition: task.h:366
std::thread task_thread
The task thread
Definition: thread.h:89
thread_local tstring thread_name
The thread name
Definition: globals.h:68
Class base_task.
Definition: task.h:44
debug_container debug
The debug
Definition: task.h:255
static thread_type * create_thread(thread_pool *_pool)
Creates the thread.
Definition: thread.h:168
Class base_thread_pool.
Definition: task.h:36
rank_type rank
The rank
Definition: task.h:182
bool is_task_available()
Determines whether [is task available].
Definition: taskgraph.h:623
persistent_container persistent
The persistent
Definition: task.h:263