Lockless Task Scheduler  v1.0a
A lockless task scheduler
datatask.h
1 // ***********************************************************************
2 // Assembly : task_scheduler
3 // Author : viknash
4 // ***********************************************************************
5 // <copyright file="datatask.h" >
6 // Copyright (c) viknash. All rights reserved.
7 // </copyright>
8 // <summary></summary>
9 // ***********************************************************************
10 #pragma once
11 
12 #include <atomic>
13 #include <cinttypes>
14 #include <vector>
15 
16 #include "memory.h"
17 #include "meta.h"
18 #include "print.h"
19 #include "types.h"
20 #include "task.h"
21 
22 namespace task_scheduler
23 {
24 
25  template < class TMemInterface, class TDataType > class base_data_task : public base_task<TMemInterface>
26  {
31 
32  struct transient_data_container
33  {
37  transient_data_container(size_t _max_data_parallel_workload);
41  ~transient_data_container();
45  data_vector data_workload;
49  data_dispatcher_type *data_dispatcher;
53  uint32_t minimum_batch_size;
54  };
55 
56  public:
60  transient_data_container data_transient;
65  base_data_task(typename super::task_graph_type &_task_graph, size_t _max_data_parallel_workload = 0);
75  bool add_data_parallel_work(typename data_vector::iterator _begin, typename data_vector::iterator _end);
76 
77  protected:
81  bool run() override;
85  void before_scheduled(thread_num_t _scheduled_on_num_workers) override;
89  void after_run() override;
94  virtual thread_num_t get_recommended_num_workers() override;
95 
96  private:
101  void run_internal(typename super::function_type* _work_function, TDataType& _data);
105  thread_unsafe_access_storage add_data_parallel_work_detector;
106  };
107 
108  template < class TMemInterface, class TDataType >
110  : data_workload(_max_data_parallel_workload)
111  , data_dispatcher(nullptr)
112  {
113  }
114 
115  template < class TMemInterface, class TDataType >
117  {
118  assert(data_dispatcher == nullptr);
119  }
120 
121  template < class TMemInterface, class TDataType >
122  base_data_task< TMemInterface, TDataType >::base_data_task(typename super::task_graph_type &_task_graph, size_t _max_data_parallel_workload)
123  : super(_task_graph)
124  , data_transient(_max_data_parallel_workload)
125  {
126  }
127 
128  template < class TMemInterface, class TDataType >
130  {
131  }
132 
133  template < class TMemInterface, class TDataType >
134  bool base_data_task< TMemInterface, TDataType >::add_data_parallel_work(typename data_vector::iterator _begin, typename data_vector::iterator _end)
135  {
136  thread_unsafe_access_guard guard(add_data_parallel_work_detector);
137  assert(super::transient.num_working == 0);
138  assert(super::persistent.task_work.size() <= 1);
139  super::transient.data_workload.insert(_begin, _end);
140  }
141 
142  template < class TMemInterface, class TDataType >
144  {
145  assert(data_transient.data_workload.size());
146  assert(!data_transient.data_workload.is_locked());
147  typename super::function_type *work_function = nullptr;
148  if (super::transient.work_queue->pop_front(this->work_function))
149  {
150  size_t available_batch_size = 0;
151  TDataType* batch = data_transient.data_dispatcher->get_next_batch(data_transient.minimum_batch_size, available_batch_size);
152  if (batch)
153  {
154  for (uint32_t batch_index; batch_index < available_batch_size; ++batch_index)
155  {
156  profile::instrument< void, data_task_type, void (data_task_type::*)(typename super::function_type*) >(super::transient.task_time, this, &data_task_type::run_internal, work_function, *(batch + batch_index));
157  }
158  }
160  return true;
161  }
162  return false;
163  }
164 
165  template < class TMemInterface, class TDataType >
166  void base_data_task< TMemInterface, TDataType >::run_internal(typename super::function_type* _work_function, TDataType& _data)
167  {
168  (*_work_function)(_data);
169  }
170 
171  template < class TMemInterface, class TDataType >
172  void base_data_task< TMemInterface, TDataType >::before_scheduled(thread_num_t _scheduled_on_num_workers)
173  {
175 
176  assert(super::persistent.task_work.size() == 1); //Only supports one work function
177  assert(super::transient.work_queue->empty());
178  for (thread_num_t count = 0; count < _scheduled_on_num_workers; ++count)
179  {
180  super::transient.work_queue->push_back(super::persistent.task_work[0]);
181  }
182 
183  if (data_transient.data_workload.size() && !data_transient.data_workload.is_locked())
184  {
185  assert(data_transient.data_dispatcher == nullptr);
186  data_transient.data_dispatcher = new data_dispatcher_type(data_transient.data_workload);
187  }
188  }
189 
190  template < class TMemInterface, class TDataType >
192  {
194  assert(super::persistent.task_work.size() == 1); //Only supports one work function
195  assert(super::transient.work_queue->empty());
196  assert(super::transient.data_dispatcher);
197  delete super::transient.data_dispatcher;
198  }
199 
200  template < class TMemInterface, class TDataType >
202  {
203  size_t optimum_num_threads = data_transient.data_workload.size() / data_transient.minimum_batch_size;
204  return std::min(super::persistent.num_workers, optimum_num_threads);
205  }
206 
207 }
Definition: datatask.h:25
transient_container transient
The transient
Definition: task.h:259
bool run() override
Callback to run task
Definition: datatask.h:143
work_queue_type * work_queue
The work queue
Definition: task.h:122
Definition: guarded.h:11
profile::time task_time
Total time spent running all work functions in this task
Definition: task.h:134
Class stl_allocator.
Definition: allocator.h:16
virtual void after_run()
Callback is called after a task is run
Definition: task.h:505
std::atomic_int64_t num_runned
Total number of times work function was called
Definition: task.h:138
bool add_data_parallel_work(typename data_vector::iterator _begin, typename data_vector::iterator _end)
Add data parallel work class.
Definition: datatask.h:134
void after_run() override
Callback is called after a task is run
Definition: datatask.h:191
virtual thread_num_t get_recommended_num_workers() override
Gets the best number of workers for the task every frame
Definition: datatask.h:201
Class scoped_enter_exit.
Definition: utils.h:140
virtual void before_scheduled(thread_num_t _scheduled_on_num_workers)
Callback is called when a task is scheduled
Definition: task.h:499
Class lock_free_batch_dispatcher.
Definition: lockfreebatchdispenser.h:17
base_data_task(typename super::task_graph_type &_task_graph, size_t _max_data_parallel_workload=0)
Initializes a new instance of the base_task class.
Definition: datatask.h:122
Class base_task.
Definition: task.h:44
Class base_task_graph.
Definition: task.h:35
transient_data_container data_transient
The transient
Definition: datatask.h:60
void before_scheduled(thread_num_t _scheduled_on_num_workers) override
Callback is called when a task is scheduled
Definition: datatask.h:172
Struct thread_unsafe_access_storage
Definition: utils.h:195
persistent_container persistent
The persistent
Definition: task.h:263
~base_data_task()
Finalizes an instance of the base_task class.
Definition: datatask.h:129