Lockless Task Scheduler  v1.0a
A lockless task scheduler
lockfreequeue.h
1 // ***********************************************************************
2 // <copyright file="lockfreequeue.h" >
3 // Copyright (c) viknash. All rights reserved.
4 // </copyright>
5 // <summary></summary>
6 // ***********************************************************************
7 #pragma once
8 
9 #include <atomic>
10 
11 #include "atomics.h"
12 #include "lockfreenode.h"
13 
14 namespace task_scheduler {
15 
16  // Multi Producer Multi Consumer Lock-free queue implementation
17  // Elements are contained into single-chained nodes provided by a lock_free_node_dispenser.
18  // param T Name of the type of object in the container
19 
20  template < typename T, class TMemInterface, class TDispenser > class multi_producer_multi_consumer
21  {
25 
26  public:
27  multi_producer_multi_consumer(TDispenser *_dispenser = 0)
28  : dispenser(_dispenser)
29  {
30  end_node.node = (node_type *)(this); // Magic Value for the end node
31  ts_debug_only(debug.counter = 1);
32  node_type *sentinelle = dispenser->new_node();
33  head.data.points_to.node = sentinelle;
34  head.data.access.as_atomic = 0;
35  tail.data.points_to.node = sentinelle;
36  tail.data.access.as_atomic = 0;
37  sentinelle->next.node = end_node.node;
38  }
39 
41  {
42  clear();
43  assert(head.data.points_to.node == tail.data.points_to.node);
44  assert(head.data.points_to.node->next.node == end_node.node); // Check if list is empty
45  ts_debug_only(head.data.points_to.node->next.node = nullptr;)
46  dispenser->free_node(head.data.points_to.node);
47  }
48 
52  inline bool push_back(const T &_value)
53  {
54  atomic_node new_node;
55  new_node.node = dispenser->new_node();
56  new_node.node->store(_value);
57  assert(new_node.node->next.node == nullptr);
58  new_node.node->next.node = end_node.node;
59 
60  atomic_node_ptr tail_snapshot;
61  tail_snapshot = tail;
62  ts_debug_only(atomics::increment(debug.counter););
63  while (end_node.as_atomic != atomics::compare_exchange_weak(tail.data.points_to.node->next.as_atomic,
64  end_node.as_atomic, new_node.as_atomic))
65  {
66  ts_debug_only(atomics::decrement(debug.counter););
67  update_tail(tail_snapshot);
68  tail_snapshot = tail;
69  ts_debug_only(atomics::increment(debug.counter););
70  }
71 
72  // If the tail remains the same then update the tail
73  atomic_node_ptr newTail;
74  newTail.data.access.as_atomic = tail_snapshot.data.access.as_atomic + 1;
75  newTail.data.points_to.node = new_node.node;
76  tail.compare_exchange_weak(tail_snapshot, newTail);
77 
78  return true;
79  }
80 
81  inline bool peek(T &_out)
82  {
83  T value = T();
84  atomic_node_ptr headSnapshot;
85  node_type *nextSnapshot;
86  bool ret = true;
87  bool skip = false;
88 
89  do
90  {
91  skip = false;
92  headSnapshot = head;
93  nextSnapshot = headSnapshot.data.points_to.node->next;
94 
95  if (headSnapshot.data.access != head.data.access)
96  {
97  skip = true;
98  continue;
99  }
100 
101  if (nextSnapshot == end_node.node)
102  {
103  _out = T();
104  ret = false;
105  break;
106  }
107 
108  ts_debug_only(if (!nextSnapshot) continue;);
109  value = nextSnapshot->load();
110 
111  } while (skip || !head.compare_exchange_weak(headSnapshot, headSnapshot));
112 
113  _out = value;
114 
115  return ret;
116  }
117 
118  inline bool pop_front(T &_out)
119  {
120  node_type *node = internal_remove();
121  if (node == nullptr)
122  return false;
123 
124  _out = node->load();
125  dispenser->free_node(node);
126 
127  return true;
128  }
129 
130  // Returns true if the queue is empty
131  inline bool empty() const { return head.data.points_to.node->next.node == end_node.node; }
132 
133  // Remove all objects from the queue.
134  void clear()
135  {
136  node_type *node = internal_remove();
137  while (node)
138  {
139  dispenser->free_node(node);
140  node = internal_remove();
141  }
142  }
143 
144  private:
145  inline void update_tail(atomic_node_ptr &_tail)
146  {
147  atomic_node_ptr newTail;
148  newTail.data.access.as_atomic = _tail.data.access.as_atomic + 1;
149  newTail.data.points_to.node = _tail.data.points_to.node->next.node;
150  if (newTail.data.points_to.node != end_node.node)
151  {
152  tail.compare_exchange_weak(_tail, newTail);
153  }
154  }
155 
156  node_type *internal_remove()
157  {
158  T value = T();
159  atomic_node_ptr ptr_head, ptr_tail, new_head;
160  node_type *ptr_next;
161  bool success = true;
162  bool skip = false;
163 
164  do
165  {
166  skip = false;
167  // Save a local copy of pointers to update them locally
168  ptr_head = head;
169  ptr_next = ptr_head.data.points_to.node->next.node;
170  ptr_tail = tail;
171 
172  // Early abort and retry if some other thread has already modified the head
173  if (ptr_head.data.access.as_atomic != head.data.access.as_atomic)
174  {
175  skip = true;
176  continue;
177  }
178 
179  // Abort if head points to the sentinelle
180  if (ptr_next == end_node.node)
181  {
182  ptr_head.data.points_to.node = nullptr;
183  success = false;
184  break;
185  }
186 
187  // Early abort and retry if the queue is empty
188  if (ptr_head.data.points_to.node == ptr_tail.data.points_to.node)
189  {
190  // Update tail for the next retry as it could have changed
191  update_tail(ptr_tail);
192  skip = true;
193  continue;
194  }
195 
196  ts_debug_only(if (!ptr_next) continue;);
197  // Save the value to be return if this try is successful
198  value = ptr_next->load();
199 
200  // Assemble a new head
201  new_head.data.access.as_atomic = ptr_head.data.access.as_atomic + 1;
202  new_head.data.points_to.node = ptr_next;
203 
204  // if the head we worked is the same as the current head, replace it with a new head
205  } while (skip || !head.compare_exchange_weak(ptr_head, new_head));
206 
207  if (success)
208  {
209  assert(atomics::decrement(debug.counter));
210  }
211 
212  // If we succeeded then return the new value the removed node
213  if (ptr_head.data.points_to.node != nullptr)
214  {
215  ptr_head.data.points_to.node->store(value);
216  ts_debug_only(ptr_head.data.points_to.node->next.node = nullptr;);
217  }
218 
219  return ptr_head.data.points_to.node;
220  }
221 
222  private:
223  atomic_node_ptr volatile head;
224  atomic_node_ptr volatile tail;
225  atomic_node end_node;
226  TDispenser *dispenser;
227  struct debug_container
228  {
229  volatile int32_t counter;
230  };
231  ts_debug_only(debug_container debug;);
232  };
233 
234  template < class TPolicy, class T, class TMemInterface, class TParam = void * >
235  class lock_free_queue : public TMemInterface
236  {
237  public:
238  lock_free_queue() {}
239 
240  lock_free_queue(TParam param)
241  : queue(param)
242  {
243  }
244 
245  virtual bool push_back(T newData) { return queue.push_back(newData); }
246 
247  //virtual bool push_front(T newData) { return queue.push_front(newData); }
248 
249  virtual bool pop_front(T &val) { return queue.pop_front(val); }
250 
251  virtual bool pop_back(T &val) { return queue.pop_front(val); }
252 
253  virtual bool empty() const { return queue.empty(); }
254 
255  virtual ~lock_free_queue() {}
256 
257  TPolicy queue;
258  };
259 
260 }
int64_t increment(volatile int64_t &_data)
Increments the specified data.
Definition: atomics.h:32
Class stl_allocator.
Definition: allocator.h:16
Definition: lockfreenode.h:26
int64_t decrement(volatile int64_t &_data)
Decrements the specified data.
Definition: atomics.h:39
Definition: lockfreequeue.h:20
Definition: lockfreequeue.h:235
Definition: lockfreenode.h:24
int32_t compare_exchange_weak(volatile int32_t &_data, int32_t _comperand, int32_t _value)
Compares the exchange weak.
Definition: atomics.h:68
Definition: lockfreenode.h:55
bool push_back(const T &_value)
Definition: lockfreequeue.h:52