31 virtual void before_scheduled(thread_num_t _scheduled_on_num_workers) = 0;
32 virtual void after_run() = 0;
53 typedef std::vector< string_type, stl_allocator< string_type, TMemInterface > > string_vector;
54 typedef std::vector< task_type *, stl_allocator< task_type *, TMemInterface > > task_vector;
56 typedef std::function< void() > function_type;
57 typedef int64_t rank_type;
58 typedef std::vector< function_type > task_work_vector;
64 function_type *, TMemInterface, work_memory_allocator_type * >
210 void set_thread_affinity(thread_mask_int_t _mask);
215 void set_thread_exclusion(thread_mask_int_t _mask);
220 void set_num_workers(thread_num_t _num_workers);
229 void kick_dependent_tasks();
235 bool add_task_parallel_work(function_type _work_function);
241 bool link_task(task_type *_next_task);
251 virtual thread_num_t get_recommended_num_workers();
280 virtual bool run() = 0;
284 virtual void before_scheduled(thread_num_t _scheduled_on_num_workers);
288 virtual void after_run();
292 template <
class TMemInterface >
295 static const tchar_t *priority_to_string[] = {_t(
"REALTIME"), _t(
"HIGH"), _t(
"NORMAL"), _t(
"LOW")};
297 return priority_to_string[uint32_t(priority)];
300 template <
class TMemInterface >
302 : task_priority(normal)
306 , num_workers(
std::numeric_limits<thread_num_t>::max())
312 template <
class TMemInterface >
314 : work_queue(nullptr)
316 , minimum_batch_size(1)
318 using namespace std::chrono_literals;
351 template <
class TMemInterface >
357 template <
class TMemInterface >
382 if (--dependent_task->transient.start_gate == 0)
384 thread_num_t requested_workers = dependent_task->get_recommended_num_workers();
385 uint32_t dependent_task_priority = dependent_task->persistent.task_priority;
390 dependent_task->before_scheduled(requested_workers);
394 rank_type best_rank = std::numeric_limits< rank_type >::max();
397 best_thread =
nullptr;
398 best_rank = std::numeric_limits< rank_type >::max();
401 ++current_thread_index, ++iterations)
403 if (!current_thread_index.
is_set(dependent_task->persistent.thread_affinity))
406 int64_t current_thread_rank =
407 task_graph.
pool.queue_rank[dependent_task->persistent.task_priority][current_thread_index]
409 if (current_thread_rank < best_rank)
411 best_rank = current_thread_rank;
417 .compare_exchange_weak(best_rank, best_rank + dependent_task->persistent.rank));
420 uint32_t current_task_priority = dependent_task_priority;
423 }
while (!best_thread->task_queue[current_task_priority]->push_back(dependent_task) &&
424 ++current_task_priority < task_type::num_priority);
426 ts_print(
"schedule " << dependent_task->debug.task_name <<
" -> " 431 reduce_starvation(new_search_index) best_search_index = best_thread->
thread_index;
438 ++current_thread_index)
440 uint32_t current_task_priority = dependent_task_priority;
443 }
while (!
task_graph.
pool.threads[current_thread_index]->task_queue[current_task_priority]->push_back(dependent_task) &&
444 ++current_task_priority < task_type::num_priority);
445 ts_print(
"schedule " << dependent_task->debug.task_name <<
" -> " 469 bool initializedSubGraph =
false;
472 if (!initializedSubGraph)
475 initializedSubGraph =
true;
477 uint64_t start_gate = kick_task->transient.start_gate.load();
478 ts_assert(start_gate == 0);
488 template <
class TMemInterface >
492 assert(
transient.num_working == 0);
498 template <
class TMemInterface >
501 ts_unused(_scheduled_on_num_workers);
504 template <
class TMemInterface >
510 transient.work_queue->push_back(&work);
514 template <
class TMemInterface >
void wake_up(thread_num_t _num_threads_to_wake_up=max_num_threads, uint64_t _thread_affinity_mask=std::numeric_limits< uint64_t >::max())
Wakes up.
Definition: threadpool.h:191
void set_num_workers(thread_num_t _num_workers)
Sets the number workers.
Definition: task.h:341
task_vector kick_tasks
The kick tasks
Definition: task.h:174
std::atomic_uint32_t thread_sync
The thread synchronize
Definition: threadpool.h:73
void wake_up()
Wakes up.
Definition: thread.h:206
work_queue_type * work_queue
The work queue
Definition: task.h:122
base_task(task_graph_type &_task_graph)
Initializes a new instance of the base_task class.
Definition: task.h:352
task_graph_type & task_graph
The task graph
Definition: task.h:267
void queue_task(task_type *_task, thread_num_t _num_threads_to_wake_up=1)
Queues the task.
Definition: taskgraph.h:596
virtual ~base_task()
Finalizes an instance of the base_task class.
Definition: task.h:364
profile::time task_time
Total time spent running all work functions in this task
Definition: task.h:134
std::atomic_int64_t num_working
The number working
Definition: task.h:130
transient_container()
Initializes a new instance of the base_task<TMemInterface>.transient_container struct.
Definition: task.h:313
thread_type * get_current_thread()
Gets the current thread.
Definition: threadpool.h:207
void set_task_thread_exclusion(task_type *_task, uint64_t _mask)
Sets the task thread exclusion.
Definition: taskgraph.h:479
Class stl_allocator.
Definition: allocator.h:16
virtual void after_run()
Callback is called after a task is run
Definition: task.h:505
std::atomic_int64_t num_runned
Total number of times work function was called
Definition: task.h:138
thread_index_type thread_index
The task queue
Definition: thread.h:81
thread_pool & pool
The pool
Definition: taskgraph.h:285
bool link_task(task_type *_parent_task, task_type *_dependent_task)
Links the task.
Definition: taskgraph.h:636
~transient_container()
Finalizes an instance of the base_task<TMemInterface>.transient_container class.
Definition: task.h:324
Struct base_sub_graph
Definition: task.h:38
bool link_task(task_type *_next_task)
Links the task.
Definition: task.h:483
Definition: allocator.h:17
string_vector dependent_task_names
The dependent task names
Definition: task.h:98
Class constrained.
Definition: types.h:25
void setup(sub_graph_type *_sub_graph=nullptr)
Setups the specified sub graph.
Definition: taskgraph.h:307
bool is_set(thread_mask_int_t _other_mask)
Determines whether the specified other mask is set.
Definition: types.h:195
std::atomic< state_selector > request_exit
The request exit
Definition: threadpool.h:77
priority_selector task_priority
The task priority
Definition: task.h:162
Class scoped_enter_exit.
Definition: utils.h:140
void set_thread_affinity(thread_mask_int_t _mask)
Sets the thread affinity.
Definition: task.h:331
uint64_t thread_affinity
The thread affinity
Definition: task.h:186
Struct persistent_container
Definition: task.h:148
virtual void before_scheduled(thread_num_t _scheduled_on_num_workers)
Callback is called when a task is scheduled
Definition: task.h:499
setup_container setup
The setup
Definition: threadpool.h:109
sub_graph_type * sub_graph
The sub graph
Definition: task.h:178
bool add_task_parallel_work(function_type _work_function)
Adds the task parallel work.
Definition: task.h:489
std::atomic_int64_t start_gate
The start gate
Definition: task.h:118
~persistent_container()
Finalizes an instance of the base_task<TMemInterface>.persistent_container class. ...
Definition: task.h:310
work_memory_allocator_type work_allocator
The work allocator
Definition: task.h:126
uint32_t minimum_batch_size
Calculated minimum batch size
Definition: task.h:142
task_work_vector task_work
The task work
Definition: task.h:190
thread_unsafe_access_storage add_task_parallel_work_detector
The add task parallel work detector
Definition: task.h:272
Struct base_thread
Definition: task.h:37
thread_num_t num_threads
The number threads
Definition: threadpool.h:113
const tchar_t * priority_to_string(priority_selector _priority) const
Priorities to string.
Definition: task.h:293
void set_thread_exclusion(thread_mask_int_t _mask)
Sets the thread exclusion.
Definition: task.h:336
string_type task_name
The task name
Definition: task.h:94
bool operator()()
Operator()s this instance.
Definition: task.h:358
void kick_dependent_tasks()
Kicks the dependent tasks.
Definition: task.h:366
virtual bool run()=0
Calls the working function internally
virtual thread_num_t get_recommended_num_workers()
Gets the best number of workers for the task every frame
Definition: task.h:515
void set_num_workers(task_type *_task, thread_num_t _num_workers)
Sets the number workers.
Definition: taskgraph.h:489
Definition: lockfreequeue.h:20
Struct debug_container
Definition: task.h:82
Definition: lockfreequeue.h:235
void set_percentage_of_workers(task_type *_task, float _percentage_workers)
Sets the percentage of workers.
Definition: taskgraph.h:495
Class base_task.
Definition: task.h:44
Class base_task_graph.
Definition: task.h:35
persistent_container()
Initializes a new instance of the base_task<TMemInterface>.persistent_container struct.
Definition: task.h:301
debug_container debug
The debug
Definition: task.h:255
void set_task_thread_affinity(task_type *_task, uint64_t _mask)
Sets the task thread affinity.
Definition: taskgraph.h:466
Struct transient_container
Definition: task.h:104
task_vector parent_tasks
The parent tasks
Definition: task.h:166
thread_num_t num_workers
User set number of workers to use to run this task
Definition: task.h:194
Struct thread_unsafe_access_storage
Definition: utils.h:195
Class base_thread_pool.
Definition: task.h:36
rank_type rank
The rank
Definition: task.h:182
task_vector dependent_tasks
The dependent tasks
Definition: task.h:170
Definition: profileitt.h:173
persistent_container persistent
The persistent
Definition: task.h:263
priority_selector
Enum priority_selector
Definition: task.h:70