Lockless Task Scheduler  v1.0a
A lockless task scheduler
taskgraph.h
1 // ***********************************************************************
2 // Assembly : task_scheduler
3 // Author : viknash
4 // ***********************************************************************
5 // <copyright file="taskgraph.h" >
6 // Copyright (c) viknash. All rights reserved.
7 // </copyright>
8 // <summary></summary>
9 // ***********************************************************************
10 #pragma once
11 
12 #include <algorithm>
13 #include <cinttypes>
14 #include <fstream>
15 #include <iostream>
16 #include <iterator>
17 #include <set>
18 #include <sstream>
19 #include <unordered_map>
20 
21 #include "containers.h"
22 
26 namespace task_scheduler
27 {
28 
29  template < class TMemInterface > class base_task;
30  template < class TMemInterface > class base_thread_pool;
31 
35  template < class task_type, class TMemInterface > struct base_sub_graph : public TMemInterface
36  {
37  typedef std::vector< task_type *, stl_allocator< task_type *, TMemInterface > > task_vector;
41  task_vector head_tasks;
45  task_vector tail_tasks;
49  task_vector task_list;
50 
51  // Frame Rate
52  // Periodic Updates
53  };
54 
59  template < class TMemInterface > class base_task_graph : public TMemInterface
60  {
61  public:
62  typedef base_task< TMemInterface > task_type;
63  typedef std::basic_string< tchar_t, std::char_traits< tchar_t >, stl_allocator< tchar_t, TMemInterface > > string_type;
64  typedef base_sub_graph< task_type, TMemInterface > sub_graph_type;
65  typedef lock_free_node_dispenser< task_type *, TMemInterface > task_memory_allocator_type;
66  typedef lock_free_queue<
68  typename base_task< TMemInterface >::task_type *, TMemInterface, task_memory_allocator_type * >
69  base_task_queue_type;
70  typedef std::unordered_map< string_type, task_type *, std::hash< string_type >, std::equal_to< string_type >,
72  task_name_to_task_map;
73  typedef std::vector< sub_graph_type *, stl_allocator< sub_graph_type *, TMemInterface > > sub_graph_vector;
74  typedef std::vector< task_type *, stl_allocator< task_type *, TMemInterface > > task_vector;
75  typedef base_thread_pool< TMemInterface > thread_pool;
76  typedef task_type *task_list;
77  typedef std::function< void(task_type *, void *&) > traversal_function_type;
78 
79  class task_queue_type : public base_task_queue_type
80  {
81  typedef base_task_queue_type super;
82  public:
83  task_queue_type(task_memory_allocator_type *allocator);
84  bool push_back(typename base_task< TMemInterface >::task_type * _new_task);
85  virtual ~task_queue_type() {}
86  };
87 
92  {
96  task_vector head_tasks;
100  task_vector tail_tasks;
104  sub_graph_vector sub_graphs;
105  };
106 
111  {
116  transient_container(task_memory_allocator_type *allocator)
117  {
118  for (auto &queue : task_queue)
119  {
120  queue = new task_queue_type(allocator);
121  }
122  }
127  {
128  for (auto &queue : task_queue)
129  {
130  delete queue;
131  queue = nullptr;
132  }
133  }
137  task_queue_type *task_queue[task_type::num_priority];
138  };
139 
144  {
148  task_name_to_task_map task_name_to_task;
152  task_vector task_list;
153  };
154 
155  public:
160  base_task_graph(thread_pool &_pool);
164  ~base_task_graph();
165 
170  void setup(sub_graph_type *_sub_graph = nullptr);
175  void load(string_type _file_name);
179  void initialize();
185  void set_task_thread_affinity(task_type *_task, uint64_t _mask);
191  void set_task_thread_exclusion(task_type *_task, uint64_t _mask);
197  void set_num_workers(task_type *_task, thread_num_t _num_workers);
203  void set_percentage_of_workers(task_type *_task, float _percentage_workers);
207  void setup_tail_kickers();
218  void depth_first_visitor(task_type *_task, traversal_function_type _preorder_functor,
219  traversal_function_type _inorder_functor, traversal_function_type _post_order_functor,
220  traversal_function_type _tail_functor, void *_param, bool _bottom_up = false);
224  void kick();
225 
231  void queue_task(task_type *_task, thread_num_t _num_threads_to_wake_up = 1);
237  task_type *dequeue_task(uint32_t _priority);
242  bool is_task_available();
249  bool link_task(task_type *_parent_task, task_type *_dependent_task);
250 
251  private:
257  bool find_head(task_vector &_head_list);
263  size_t size(task_list _task_list) const;
267  task_memory_allocator_type task_memory_allocator;
268 
269  public:
285  thread_pool &pool;
286  };
287 
288  template < class TMemInterface >
290  : base_task_queue_type(allocator)
291  {
292  }
293 
294  template < class TMemInterface >
296  {
297  return super::push_back(_new_task);
298  }
299 
300  template < class TMemInterface >
302  : transient(&task_memory_allocator)
303  , pool(_pool)
304  {
305  }
306 
307  template < class TMemInterface > void base_task_graph< TMemInterface >::setup(sub_graph_type *graph)
308  {
309  using namespace std::chrono_literals;
310 
311  task_vector *task_list;
312  if (graph)
313  {
314  task_list = &(graph->task_list);
315  }
316  else
317  {
318  task_list = &(debug.task_list);
319  }
320  for (auto task : *task_list)
321  {
322  // memset(&(task->transient), 0x0, sizeof(task_type::transient_container));
323  task->transient.start_gate = 0;
324  task->transient.num_working = 0;
325  task->transient.task_time = 0ms;
326  task->transient.num_runned = 0;
327  }
328  for (auto task : *task_list)
329  {
330  for (auto dependent_task : task->persistent.dependent_tasks)
331  {
332  ++dependent_task->transient.start_gate;
333  }
334  }
335  }
336 
337  template < class TMemInterface > bool base_task_graph< TMemInterface >::find_head(task_vector &_head_list)
338  {
339  auto found_at_least_one_head = false;
340  for (auto task : debug.task_list)
341  {
342  uint64_t start_gate = task->transient.start_gate.load();
343  if (start_gate == 0)
344  {
345  _head_list.push_back(task);
346  found_at_least_one_head = true;
347  }
348  }
349  return found_at_least_one_head;
350  }
351 
352  template < class TMemInterface > size_t base_task_graph< TMemInterface >::size(task_list _task_list) const
353  {
354  size_t count = 0;
355  task_type *itr = _task_list;
356  while (itr)
357  {
358  count++;
359  itr = itr->transient.next_task.load();
360  }
361  return count;
362  }
363 
364  template < class TMemInterface > void base_task_graph< TMemInterface >::initialize()
365  {
366  using namespace std::placeholders;
367 
368  // initialize tasks' queue pointers and start gates
369  setup();
370 
371  // initialize head tasks
372  auto found = find_head(persistent.head_tasks);
373  // Check if we have at least one head
374  ts_assert(found);
375 
376  // Setup end nodes to start head nodes
378 
379 #if defined(DEBUG)
380  // Check if there are more than one tail nodes per head node
381  std::set< task_type * > head_nodes_with_a_tail_node;
382  for (auto tail_task : tail_tasks)
383  {
384  for (auto kick_task : tail_task->kick_tasks)
385  {
386  auto result = head_nodes_with_a_tail_node.insert(kick_task);
387  assert(result.second);
388  }
389  }
390 #endif // DEBUG
391 
392  typedef std::set< task_type *, std::less< task_type * >, stl_allocator< task_type *, TMemInterface > > task_set;
393 
394  // Setup Subgraphs
395  // Each subgraph has only 1 tail, so we get the heads in the tail to traverse
396  // all nodes to build a graph
397  for (auto tail_task : persistent.tail_tasks)
398  {
399  auto *sub_graph = new sub_graph_type();
400  sub_graph->tail_tasks.push_back(tail_task);
401  task_set sub_graph_set;
402  for (auto kick_task : tail_task->persistent.kick_tasks)
403  {
404  sub_graph->head_tasks.push_back(kick_task);
405  void *param = nullptr;
406  depth_first_visitor(kick_task, bind(
407  [](task_type *node, void *&_param, task_set *_sub_graph_set,
408  sub_graph_type *_sub_graph) {
409  (void)_param;
410  _sub_graph_set->insert(node);
411  node->persistent.sub_graph = _sub_graph;
412  },
413  _1, _2, &sub_graph_set, sub_graph),
414  nullptr, nullptr, nullptr, param);
415  }
416  copy(sub_graph_set.begin(), sub_graph_set.end(), back_inserter(sub_graph->task_list));
417  persistent.sub_graphs.push_back(sub_graph);
418  }
419 
420  task_set ranked_tasks;
421  traversal_function_type ranking_func = [&](task_type *nodeTask, void *&param) {
422  param = (void *)1;
423  nodeTask->persistent.rank += reinterpret_cast< typename task_type::rank_type >(param);
424  ts_print(nodeTask->debug.task_name << " - rank_type :" << int32_t(nodeTask->persistent.rank));
425  };
426 
427  // rank_type Nodes
428  for (auto sub_graph : persistent.sub_graphs)
429  {
430  for (auto bottomHeadTask : sub_graph->tail_tasks)
431  {
432  void *param = nullptr;
433  depth_first_visitor(bottomHeadTask, ranking_func, nullptr, nullptr, nullptr, param, true);
434  }
435  }
436 
437  struct rank_sorter
438  {
439  static bool compare(task_type *&_a, task_type *&_b)
440  {
441  return (_a->persistent.rank < _b->persistent.rank ? true : false);
442  }
443  };
444 
445  // Sort dependent tasks for each task based on rank
446  for (auto &task : debug.task_list)
447  {
448  sort(task->persistent.dependent_tasks.begin(), task->persistent.dependent_tasks.end(),
449  rank_sorter::compare);
450  }
451 
452  // Correct thread affinity masks of all tasks
453  for (auto head_task : persistent.head_tasks)
454  {
455  void *param = nullptr;
456  depth_first_visitor(head_task,
457  [&](task_type *_task, void *&_param) {
458  (void)_param;
460  },
461  nullptr, nullptr, nullptr, param);
462  }
463  }
464 
465  template < class TMemInterface >
467  {
468  _task->persistent.thread_affinity = 0;
469  thread_mask_int_t valid_thread_mask = 1ull << pool.num_threads;
470  valid_thread_mask = valid_thread_mask - 1;
471  while (_mask)
472  {
473  _task->persistent.thread_affinity |= _mask & valid_thread_mask;
474  _mask = _mask >> pool.num_threads;
475  }
476  }
477 
478  template < class TMemInterface >
480  {
481  _mask = ~_mask;
482  thread_mask_int_t valid_thread_mask = 1ull << pool.num_threads;
483  valid_thread_mask = valid_thread_mask - 1;
484  _mask = _mask & valid_thread_mask;
485  set_task_thread_affinity(_task, _mask);
486  }
487 
488  template < class TMemInterface >
489  void base_task_graph< TMemInterface >::set_num_workers(task_type *_task, thread_num_t _num_workers)
490  {
491  _task->persistent.num_workers = min(pool.num_threads, _num_workers);
492  }
493 
494  template < class TMemInterface >
496  {
497  assert(_percentage_workers > .0f && _percentage_workers <= 1.0f);
498  _task->persistent.num_workers = ceil(_percentage_workers * (float)pool.num_threads);
499  }
500 
501  template < class TMemInterface > void base_task_graph< TMemInterface >::setup_tail_kickers()
502  {
503  using namespace std::placeholders;
504 
505  for (auto head_task : persistent.head_tasks)
506  {
507  void *param = nullptr;
509  head_task, nullptr, nullptr, nullptr,
510  bind(
511  [](task_type *_tail_task, void *&_param, task_type *_head_task, task_vector *_tail_tasks) {
512  (void)_param;
513  // Only add unique items
514  auto result = find(begin(*_tail_tasks), end(*_tail_tasks), _tail_task);
515  if (result == end(*_tail_tasks))
516  {
517  _tail_task->persistent.kick_tasks.push_back(_head_task);
518  _tail_tasks->push_back(_tail_task);
519  }
520  },
521  _1, _2, head_task, &persistent.tail_tasks),
522  param);
523  }
524  }
525 
526  template < class TMemInterface >
528  traversal_function_type _pre_order_functor,
529  traversal_function_type _in_order_functor,
530  traversal_function_type _post_order_functor,
531  traversal_function_type _tail_functor, void *_param,
532  bool _bottom_up)
533  {
534  if (_pre_order_functor)
535  _pre_order_functor(_task, _param);
536  if (_bottom_up)
537  {
538  if (_task->persistent.parent_tasks.size() == 0)
539  {
540  if (_tail_functor)
541  _tail_functor(_task, _param);
542  return;
543  }
544  for (auto parent_task : _task->persistent.parent_tasks)
545  {
546  depth_first_visitor(parent_task, _pre_order_functor, _in_order_functor, _post_order_functor,
547  _tail_functor, _param, _bottom_up);
548  if (_in_order_functor)
549  _in_order_functor(_task, _param);
550  }
551  }
552  else
553  {
554  if (_task->persistent.dependent_tasks.size() == 0)
555  {
556  if (_tail_functor)
557  _tail_functor(_task, _param);
558  return;
559  }
560  for (auto dependent_task : _task->persistent.dependent_tasks)
561  {
562  depth_first_visitor(dependent_task, _pre_order_functor, _in_order_functor, _post_order_functor,
563  _tail_functor, _param, _bottom_up);
564  if (_in_order_functor)
565  _in_order_functor(_task, _param);
566  }
567  }
568  if (_post_order_functor)
569  _post_order_functor(_task, _param);
570  }
571 
572  template < class TMemInterface > base_task_graph< TMemInterface >::~base_task_graph()
573  {
574  for (auto &it : persistent.sub_graphs)
575  {
576  delete it;
577  }
578  persistent.sub_graphs.clear();
579  for (auto &it : debug.task_list)
580  {
581  delete it;
582  }
583  debug.task_list.clear();
584  }
585 
586  template < class TMemInterface > void base_task_graph< TMemInterface >::kick()
587  {
588  initialize();
589  for (auto head_task : persistent.head_tasks)
590  {
591  queue_task(head_task);
592  }
593  }
594 
595  template < class TMemInterface >
596  void base_task_graph< TMemInterface >::queue_task(task_type *_task, thread_num_t _num_threads_to_wake_up)
597  {
598  using namespace std;
599 
600  uint32_t priority = _task->persistent.task_priority;
601 
602  thread_num_t requested_workers = min(min(_task->get_recommended_num_workers(), _num_threads_to_wake_up), pool.num_threads);
603  for (thread_num_t count = 0; count < requested_workers; ++count)
604  {
605  do
606  {
607  } while (!transient.task_queue[priority]->push_back(_task) && ++priority < task_type::num_priority);
608  assert(priority < task_type::num_priority);
609  }
610 
611  pool.wake_up(_num_threads_to_wake_up, _task->persistent.thread_affinity);
612  }
613 
614  template < class TMemInterface >
617  {
618  task_type *next_task = nullptr;
619  transient.task_queue[_priority]->pop_front(next_task);
620  return next_task;
621  }
622 
623  template < class TMemInterface > bool base_task_graph< TMemInterface >::is_task_available()
624  {
625  for (auto queue : transient.task_queue)
626  {
627  if (!queue->empty())
628  {
629  return true;
630  }
631  }
632  return false;
633  }
634 
635  template < class TMemInterface >
637  {
638  _parent_task->dependent_tasks.push_back(_dependent_task);
639  _dependent_task->parent_tasks.push_back(_parent_task);
640  return true;
641  }
642 };
void wake_up(thread_num_t _num_threads_to_wake_up=max_num_threads, uint64_t _thread_affinity_mask=std::numeric_limits< uint64_t >::max())
Wakes up.
Definition: threadpool.h:191
transient_container transient
The transient
Definition: task.h:259
task_vector kick_tasks
The kick tasks
Definition: task.h:174
void queue_task(task_type *_task, thread_num_t _num_threads_to_wake_up=1)
Queues the task.
Definition: taskgraph.h:596
persistent_container persistent
The persistent
Definition: taskgraph.h:273
void depth_first_visitor(task_type *_task, traversal_function_type _preorder_functor, traversal_function_type _inorder_functor, traversal_function_type _post_order_functor, traversal_function_type _tail_functor, void *_param, bool _bottom_up=false)
Depthes the first visitor.
Definition: taskgraph.h:527
void set_task_thread_exclusion(task_type *_task, uint64_t _mask)
Sets the task thread exclusion.
Definition: taskgraph.h:479
Class stl_allocator.
Definition: allocator.h:16
task_vector task_list
The task list
Definition: taskgraph.h:49
task_type * dequeue_task(uint32_t _priority)
Dequeues the task.
Definition: taskgraph.h:616
Struct debug_container
Definition: taskgraph.h:143
thread_pool & pool
The pool
Definition: taskgraph.h:285
bool link_task(task_type *_parent_task, task_type *_dependent_task)
Links the task.
Definition: taskgraph.h:636
~base_task_graph()
Finalizes an instance of the base_task_graph class.
Definition: taskgraph.h:572
void setup_tail_kickers()
Setups the tail kickers.
Definition: taskgraph.h:501
Struct base_sub_graph
Definition: task.h:38
Definition: allocator.h:17
task_vector task_list
The task list
Definition: taskgraph.h:152
Struct transient_container
Definition: taskgraph.h:110
void setup(sub_graph_type *_sub_graph=nullptr)
Setups the specified sub graph.
Definition: taskgraph.h:307
priority_selector task_priority
The task priority
Definition: task.h:162
transient_container transient
The transient
Definition: taskgraph.h:277
uint64_t thread_affinity
The thread affinity
Definition: task.h:186
transient_container(task_memory_allocator_type *allocator)
Initializes a new instance of the base_task_graph<TMemInterface>.transient_container struct...
Definition: taskgraph.h:116
sub_graph_type * sub_graph
The sub graph
Definition: task.h:178
std::atomic_int64_t start_gate
The start gate
Definition: task.h:118
sub_graph_vector sub_graphs
The sub graphs
Definition: taskgraph.h:104
task_vector tail_tasks
The tail tasks
Definition: taskgraph.h:100
thread_num_t num_threads
The number threads
Definition: threadpool.h:113
string_type task_name
The task name
Definition: task.h:94
virtual thread_num_t get_recommended_num_workers()
Gets the best number of workers for the task every frame
Definition: task.h:515
void set_num_workers(task_type *_task, thread_num_t _num_workers)
Sets the number workers.
Definition: taskgraph.h:489
void kick()
Kicks this instance.
Definition: taskgraph.h:586
Definition: lockfreequeue.h:20
Struct persistent_container
Definition: taskgraph.h:91
debug_container debug
The debug
Definition: taskgraph.h:281
task_vector head_tasks
The head tasks
Definition: taskgraph.h:41
Definition: lockfreequeue.h:235
task_vector head_tasks
The head tasks
Definition: taskgraph.h:96
void set_percentage_of_workers(task_type *_task, float _percentage_workers)
Sets the percentage of workers.
Definition: taskgraph.h:495
Class base_task.
Definition: task.h:44
Class base_task_graph.
Definition: task.h:35
debug_container debug
The debug
Definition: task.h:255
task_vector tail_tasks
The tail tasks
Definition: taskgraph.h:45
void set_task_thread_affinity(task_type *_task, uint64_t _mask)
Sets the task thread affinity.
Definition: taskgraph.h:466
task_vector parent_tasks
The parent tasks
Definition: task.h:166
void initialize()
Initializes this instance.
Definition: taskgraph.h:364
thread_num_t num_workers
User set number of workers to use to run this task
Definition: task.h:194
Class base_thread_pool.
Definition: task.h:36
rank_type rank
The rank
Definition: task.h:182
task_name_to_task_map task_name_to_task
The task name to task
Definition: taskgraph.h:148
~transient_container()
Finalizes an instance of the base_task_graph<TMemInterface>.transient_container class.
Definition: taskgraph.h:126
base_task_graph(thread_pool &_pool)
Initializes a new instance of the base_task_graph class.
Definition: taskgraph.h:301
bool is_task_available()
Determines whether [is task available].
Definition: taskgraph.h:623
task_vector dependent_tasks
The dependent tasks
Definition: task.h:170
persistent_container persistent
The persistent
Definition: task.h:263