Lockless Task Scheduler  v1.0a
A lockless task scheduler
task.h
1 // ***********************************************************************
2 // Assembly : task_scheduler
3 // Author : viknash
4 // ***********************************************************************
5 // <copyright file="task.h" >
6 // Copyright (c) viknash. All rights reserved.
7 // </copyright>
8 // <summary></summary>
9 // ***********************************************************************
10 #pragma once
11 
12 #include <atomic>
13 #include <cinttypes>
14 #include <vector>
15 
16 #include "memory.h"
17 #include "meta.h"
18 #include "print.h"
19 #include "types.h"
20 #include "profile.h"
21 
25 namespace task_scheduler
26 {
27 
29  {
30  public:
31  virtual void before_scheduled(thread_num_t _scheduled_on_num_workers) = 0;
32  virtual void after_run() = 0;
33  };
34 
35  template < class TMemInterface > class base_task_graph;
36  template < class TMemInterface > class base_thread_pool;
37  template < class TMemInterface > struct base_thread;
38  template < class task_type, class TMemInterface > struct base_sub_graph;
39 
44  template < class TMemInterface > class base_task : public TMemInterface, public base_task_events
45  {
46  public:
47 
52  typedef std::basic_string< tchar_t, std::char_traits< tchar_t >, stl_allocator< tchar_t, TMemInterface > > string_type;
53  typedef std::vector< string_type, stl_allocator< string_type, TMemInterface > > string_vector;
54  typedef std::vector< task_type *, stl_allocator< task_type *, TMemInterface > > task_vector;
56  typedef std::function< void() > function_type;
57  typedef int64_t rank_type;
58  typedef std::vector< function_type > task_work_vector;
60 
62  typedef lock_free_queue<
64  function_type *, TMemInterface, work_memory_allocator_type * >
66 
71  {
72  realtime,
73  high,
74  normal,
75  low,
76  num_priority
77  };
78 
83  {
89  const tchar_t* priority_to_string(priority_selector _priority) const;
90 
94  string_type task_name;
98  string_vector dependent_task_names;
99  };
100 
105  {
114 
118  std::atomic_int64_t start_gate;
122  work_queue_type *work_queue;
126  work_memory_allocator_type work_allocator;
130  std::atomic_int64_t num_working;
134  profile::time task_time;
138  std::atomic_int64_t num_runned;
143  };
144 
149  {
158 
166  task_vector parent_tasks;
170  task_vector dependent_tasks;
174  task_vector kick_tasks;
178  sub_graph_type *sub_graph;
182  rank_type rank;
186  uint64_t thread_affinity;
190  task_work_vector task_work;
194  thread_num_t num_workers;
195  };
196 
201  base_task(task_graph_type &_task_graph);
205  virtual ~base_task();
210  void set_thread_affinity(thread_mask_int_t _mask);
215  void set_thread_exclusion(thread_mask_int_t _mask);
220  void set_num_workers(thread_num_t _num_workers);
225  void set_num_workers(percentage_t _percentage_workers);
229  void kick_dependent_tasks();
235  bool add_task_parallel_work(function_type _work_function);
241  bool link_task(task_type *_next_task);
246  bool operator()();
251  virtual thread_num_t get_recommended_num_workers();
267  task_graph_type &task_graph;
268 
273 
274  protected:
275  // Overridable functions
280  virtual bool run() = 0;
284  virtual void before_scheduled(thread_num_t _scheduled_on_num_workers);
288  virtual void after_run();
289 
290  };
291 
292  template < class TMemInterface >
294  {
295  static const tchar_t *priority_to_string[] = {_t("REALTIME"), _t("HIGH"), _t("NORMAL"), _t("LOW")};
296 
297  return priority_to_string[uint32_t(priority)];
298  }
299 
300  template < class TMemInterface >
302  : task_priority(normal)
303  , sub_graph(nullptr)
304  , rank(0)
305  , thread_affinity(0)
306  , num_workers(std::numeric_limits<thread_num_t>::max())
307  {
308  }
309 
311 
312  template < class TMemInterface >
314  : work_queue(nullptr)
315  , num_working(0)
316  , minimum_batch_size(1)
317  {
318  using namespace std::chrono_literals;
319 
320  task_time = 0ms;
322  }
323 
325  {
326  assert(work_queue);
327  delete work_queue;
328  work_queue = nullptr;
329  }
330 
331  template < class TMemInterface > void base_task< TMemInterface >::set_thread_affinity(thread_mask_int_t _mask)
332  {
334  }
335 
336  template < class TMemInterface > void base_task< TMemInterface >::set_thread_exclusion(thread_mask_int_t _mask)
337  {
339  }
340 
341  template < class TMemInterface > void base_task< TMemInterface >::set_num_workers(thread_num_t _num_workers)
342  {
343  task_graph.set_num_workers(this, _num_workers);
344  }
345 
346  template < class TMemInterface > void base_task< TMemInterface >::set_num_workers(percentage_t _percentage_workers)
347  {
348  task_graph.set_percentage_of_workers(this, _percentage_workers);
349  }
350 
351  template < class TMemInterface >
353  : task_graph(_task_graph)
354  {
355  }
356 
357  template < class TMemInterface >
359  {
360  profile::task_scoped_instrument profile_point(profile::task_param(nullptr, debug.task_name.c_str(), nullptr, debug.task_name.c_str()));
361  return this->run();
362  }
363 
364  template < class TMemInterface > base_task< TMemInterface >::~base_task() { persistent.task_work.clear(); }
365 
366  template < class TMemInterface > void base_task< TMemInterface >::kick_dependent_tasks()
367  {
368  // Reduce queue rank of queue that the current task is running on
370  .fetch_sub(persistent.rank);
371 
372  // Queue dependent tasks only when their start gates are 0
373  // i.e. all parent tasks have been executed
374 
375  // If we are scheduling many tasks at once search for the next best ranked queue, starting from just after the
376  // queue that was just scheduled
377  reduce_starvation(new_search_index) thread_index_type best_search_index =
379 
380  for (auto dependent_task : persistent.dependent_tasks)
381  {
382  if (--dependent_task->transient.start_gate == 0)
383  {
384  thread_num_t requested_workers = dependent_task->get_recommended_num_workers();
385  uint32_t dependent_task_priority = dependent_task->persistent.task_priority;
386 
387  //Search for best threads to run on only we do not require all workers
388  if (requested_workers < task_graph.pool.num_threads)
389  {
390  dependent_task->before_scheduled(requested_workers);
391 
392  // Find lowest ranking queue, aka best queue and increment its rank with dependent task rank
393  thread_type *best_thread = nullptr;
394  rank_type best_rank = std::numeric_limits< rank_type >::max();
395  do
396  {
397  best_thread = nullptr;
398  best_rank = std::numeric_limits< rank_type >::max();
399  thread_index_type current_thread_index = best_search_index;
400  for (thread_num_t iterations = 0; iterations < task_graph.pool.num_threads;
401  ++current_thread_index, ++iterations)
402  {
403  if (!current_thread_index.is_set(dependent_task->persistent.thread_affinity))
404  continue; // Skip threads the task should not run on
405 
406  int64_t current_thread_rank =
407  task_graph.pool.queue_rank[dependent_task->persistent.task_priority][current_thread_index]
408  .load();
409  if (current_thread_rank < best_rank)
410  {
411  best_rank = current_thread_rank;
412  best_thread = task_graph.pool.threads[current_thread_index];
413  }
414  }
415  } while (
416  !task_graph.pool.queue_rank[dependent_task->persistent.task_priority][best_thread->thread_index]
417  .compare_exchange_weak(best_rank, best_rank + dependent_task->persistent.rank));
418 
419  // Push task into the best queue
420  uint32_t current_task_priority = dependent_task_priority;
421  do
422  {
423  } while (!best_thread->task_queue[current_task_priority]->push_back(dependent_task) &&
424  ++current_task_priority < task_type::num_priority);
425 
426  ts_print("schedule " << dependent_task->debug.task_name << " -> "
427  << uint32_t(best_thread->thread_index));
428  // Wake up thread if its sleeping
429  best_thread->wake_up();
430 
431  reduce_starvation(new_search_index) best_search_index = best_thread->thread_index;
432  }
433  else
434  {
435  //Schedule task on all threads
436  dependent_task->before_scheduled(task_graph.pool.num_threads);
437  for (thread_num_t current_thread_index = 0; current_thread_index < task_graph.pool.num_threads;
438  ++current_thread_index)
439  {
440  uint32_t current_task_priority = dependent_task_priority;
441  do
442  {
443  } while (!task_graph.pool.threads[current_thread_index]->task_queue[current_task_priority]->push_back(dependent_task) &&
444  ++current_task_priority < task_type::num_priority);
445  ts_print("schedule " << dependent_task->debug.task_name << " -> "
446  << uint32_t(task_graph.pool.threads[current_thread_index]->thread_index));
447  }
448 
449  //Note: We do not modify best_search_index here
450  }
451  }
452  }
453 
454  // Stop kicking tasks when a request to pause has been received
455  // If all tail kickers have paused, then request the threads to stop
456  if (persistent.kick_tasks.size() && task_graph.pool.setup.request_exit == thread_pool::request_pause)
457  {
460  {
461  task_graph.pool.setup.request_exit.store(thread_pool::request_stop);
463  }
464  return;
465  }
466 
467  // Only Tail task_type Nodes should have kick tasks
468  // kick tasks are Head Tasks for the next frame
469  bool initializedSubGraph = false;
470  for (auto kick_task : persistent.kick_tasks)
471  {
472  if (!initializedSubGraph)
473  {
474  task_graph.setup(kick_task->persistent.sub_graph);
475  initializedSubGraph = true;
476  }
477  uint64_t start_gate = kick_task->transient.start_gate.load();
478  ts_assert(start_gate == 0);
479  task_graph.queue_task(kick_task);
480  }
481  }
482 
483  template < class TMemInterface > bool base_task< TMemInterface >::link_task(task_type *_next_task)
484  {
485  return task_graph.link_task(this, _next_task);
486  }
487 
488  template < class TMemInterface >
489  bool base_task< TMemInterface >::add_task_parallel_work(function_type _work_function)
490  {
492  assert(transient.num_working == 0);
493  persistent.task_work.push_back(_work_function);
494  transient.work_queue->push_back(&persistent.task_work.back());
495  return true;
496  }
497 
498  template < class TMemInterface >
499  void base_task< TMemInterface >::before_scheduled(thread_num_t _scheduled_on_num_workers)
500  {
501  ts_unused(_scheduled_on_num_workers);
502  }
503 
504  template < class TMemInterface >
506  {
507  //Repopulate task parallel work functions for next run
508  for (auto &work : persistent.task_work)
509  {
510  transient.work_queue->push_back(&work);
511  }
512  }
513 
514  template < class TMemInterface >
516  {
517  return persistent.num_workers;
518  }
519 
520 };
void wake_up(thread_num_t _num_threads_to_wake_up=max_num_threads, uint64_t _thread_affinity_mask=std::numeric_limits< uint64_t >::max())
Wakes up.
Definition: threadpool.h:191
void set_num_workers(thread_num_t _num_workers)
Sets the number workers.
Definition: task.h:341
Definition: types.h:66
task_vector kick_tasks
The kick tasks
Definition: task.h:174
std::atomic_uint32_t thread_sync
The thread synchronize
Definition: threadpool.h:73
void wake_up()
Wakes up.
Definition: thread.h:206
work_queue_type * work_queue
The work queue
Definition: task.h:122
base_task(task_graph_type &_task_graph)
Initializes a new instance of the base_task class.
Definition: task.h:352
task_graph_type & task_graph
The task graph
Definition: task.h:267
void queue_task(task_type *_task, thread_num_t _num_threads_to_wake_up=1)
Queues the task.
Definition: taskgraph.h:596
virtual ~base_task()
Finalizes an instance of the base_task class.
Definition: task.h:364
profile::time task_time
Total time spent running all work functions in this task
Definition: task.h:134
std::atomic_int64_t num_working
The number working
Definition: task.h:130
transient_container()
Initializes a new instance of the base_task<TMemInterface>.transient_container struct.
Definition: task.h:313
thread_type * get_current_thread()
Gets the current thread.
Definition: threadpool.h:207
void set_task_thread_exclusion(task_type *_task, uint64_t _mask)
Sets the task thread exclusion.
Definition: taskgraph.h:479
Class stl_allocator.
Definition: allocator.h:16
virtual void after_run()
Callback is called after a task is run
Definition: task.h:505
std::atomic_int64_t num_runned
Total number of times work function was called
Definition: task.h:138
thread_index_type thread_index
The task queue
Definition: thread.h:81
thread_pool & pool
The pool
Definition: taskgraph.h:285
bool link_task(task_type *_parent_task, task_type *_dependent_task)
Links the task.
Definition: taskgraph.h:636
~transient_container()
Finalizes an instance of the base_task<TMemInterface>.transient_container class.
Definition: task.h:324
Struct base_sub_graph
Definition: task.h:38
bool link_task(task_type *_next_task)
Links the task.
Definition: task.h:483
Definition: allocator.h:17
string_vector dependent_task_names
The dependent task names
Definition: task.h:98
Class constrained.
Definition: types.h:25
void setup(sub_graph_type *_sub_graph=nullptr)
Setups the specified sub graph.
Definition: taskgraph.h:307
bool is_set(thread_mask_int_t _other_mask)
Determines whether the specified other mask is set.
Definition: types.h:195
std::atomic< state_selector > request_exit
The request exit
Definition: threadpool.h:77
priority_selector task_priority
The task priority
Definition: task.h:162
Class scoped_enter_exit.
Definition: utils.h:140
void set_thread_affinity(thread_mask_int_t _mask)
Sets the thread affinity.
Definition: task.h:331
uint64_t thread_affinity
The thread affinity
Definition: task.h:186
Struct persistent_container
Definition: task.h:148
virtual void before_scheduled(thread_num_t _scheduled_on_num_workers)
Callback is called when a task is scheduled
Definition: task.h:499
setup_container setup
The setup
Definition: threadpool.h:109
sub_graph_type * sub_graph
The sub graph
Definition: task.h:178
bool add_task_parallel_work(function_type _work_function)
Adds the task parallel work.
Definition: task.h:489
std::atomic_int64_t start_gate
The start gate
Definition: task.h:118
~persistent_container()
Finalizes an instance of the base_task<TMemInterface>.persistent_container class. ...
Definition: task.h:310
work_memory_allocator_type work_allocator
The work allocator
Definition: task.h:126
uint32_t minimum_batch_size
Calculated minimum batch size
Definition: task.h:142
task_work_vector task_work
The task work
Definition: task.h:190
thread_unsafe_access_storage add_task_parallel_work_detector
The add task parallel work detector
Definition: task.h:272
Struct base_thread
Definition: task.h:37
thread_num_t num_threads
The number threads
Definition: threadpool.h:113
const tchar_t * priority_to_string(priority_selector _priority) const
Priorities to string.
Definition: task.h:293
void set_thread_exclusion(thread_mask_int_t _mask)
Sets the thread exclusion.
Definition: task.h:336
string_type task_name
The task name
Definition: task.h:94
bool operator()()
Operator()s this instance.
Definition: task.h:358
void kick_dependent_tasks()
Kicks the dependent tasks.
Definition: task.h:366
virtual bool run()=0
Calls the working function internally
virtual thread_num_t get_recommended_num_workers()
Gets the best number of workers for the task every frame
Definition: task.h:515
void set_num_workers(task_type *_task, thread_num_t _num_workers)
Sets the number workers.
Definition: taskgraph.h:489
Definition: lockfreequeue.h:20
Struct debug_container
Definition: task.h:82
Definition: lockfreequeue.h:235
void set_percentage_of_workers(task_type *_task, float _percentage_workers)
Sets the percentage of workers.
Definition: taskgraph.h:495
Class base_task.
Definition: task.h:44
Class base_task_graph.
Definition: task.h:35
persistent_container()
Initializes a new instance of the base_task<TMemInterface>.persistent_container struct.
Definition: task.h:301
debug_container debug
The debug
Definition: task.h:255
void set_task_thread_affinity(task_type *_task, uint64_t _mask)
Sets the task thread affinity.
Definition: taskgraph.h:466
Struct transient_container
Definition: task.h:104
task_vector parent_tasks
The parent tasks
Definition: task.h:166
thread_num_t num_workers
User set number of workers to use to run this task
Definition: task.h:194
Struct thread_unsafe_access_storage
Definition: utils.h:195
Class base_thread_pool.
Definition: task.h:36
rank_type rank
The rank
Definition: task.h:182
task_vector dependent_tasks
The dependent tasks
Definition: task.h:170
Definition: profileitt.h:173
persistent_container persistent
The persistent
Definition: task.h:263
priority_selector
Enum priority_selector
Definition: task.h:70