mxnet
concurrency.h
Go to the documentation of this file.
1 
7 #ifndef DMLC_CONCURRENCY_H_
8 #define DMLC_CONCURRENCY_H_
9 // this code depends on c++11
10 #if DMLC_USE_CXX11
11 #include <atomic>
12 #include <deque>
13 #include <queue>
14 #include <mutex>
15 #include <vector>
16 #include <utility>
17 #include <condition_variable>
18 #include "dmlc/base.h"
19 
20 namespace dmlc {
21 
25 class Spinlock {
26  public:
27 #ifdef _MSC_VER
28  Spinlock() {
29  lock_.clear();
30  }
31 #else
32 #pragma clang diagnostic push
33 #pragma clang diagnostic ignored "-Wbraced-scalar-init"
34  Spinlock() : lock_(ATOMIC_FLAG_INIT) {
35  }
36 #pragma clang diagnostic pop
37 #endif
38  ~Spinlock() = default;
42  inline void lock() noexcept(true);
46  inline void unlock() noexcept(true);
47 
48  private:
49  std::atomic_flag lock_;
53  DISALLOW_COPY_AND_ASSIGN(Spinlock);
54 };
55 
57 enum class ConcurrentQueueType {
59  kFIFO,
61  kPriority
62 };
63 
67 template <typename T,
70  public:
72  ~ConcurrentBlockingQueue() = default;
83  template <typename E>
84  void Push(E&& e, int priority = 0);
85 
97  template <typename E>
98  void PushFront(E&& e, int priority = 0);
106  bool Pop(T* rv);
113  void SignalForKill();
118  size_t Size();
119 
120  private:
121  struct Entry {
122  T data;
123  int priority;
124  inline bool operator<(const Entry &b) const {
125  return priority < b.priority;
126  }
127  };
128 
129  std::mutex mutex_;
130  std::condition_variable cv_;
131  std::atomic<bool> exit_now_;
132  int nwait_consumer_;
133  // a priority queue
134  std::vector<Entry> priority_queue_;
135  // a FIFO queue
136  std::deque<T> fifo_queue_;
141 };
142 
143 inline void Spinlock::lock() noexcept(true) {
144  while (lock_.test_and_set(std::memory_order_acquire)) {
145  }
146 }
147 
148 inline void Spinlock::unlock() noexcept(true) {
149  lock_.clear(std::memory_order_release);
150 }
151 
152 template <typename T, ConcurrentQueueType type>
154  : exit_now_{false}, nwait_consumer_{0} {}
155 
156 template <typename T, ConcurrentQueueType type>
157 template <typename E>
158 void ConcurrentBlockingQueue<T, type>::Push(E&& e, int priority) {
159  static_assert(std::is_same<typename std::remove_cv<
160  typename std::remove_reference<E>::type>::type,
161  T>::value,
162  "Types must match.");
163  bool notify;
164  {
165  std::lock_guard<std::mutex> lock{mutex_};
166  if (type == ConcurrentQueueType::kFIFO) {
167  fifo_queue_.emplace_back(std::forward<E>(e));
168  notify = nwait_consumer_ != 0;
169  } else {
170  Entry entry;
171  entry.data = std::move(e);
172  entry.priority = priority;
173  priority_queue_.push_back(std::move(entry));
174  std::push_heap(priority_queue_.begin(), priority_queue_.end());
175  notify = nwait_consumer_ != 0;
176  }
177  }
178  if (notify) cv_.notify_one();
179 }
180 
181 template <typename T, ConcurrentQueueType type>
182 template <typename E>
184  static_assert(std::is_same<typename std::remove_cv<
185  typename std::remove_reference<E>::type>::type,
186  T>::value,
187  "Types must match.");
188  bool notify;
189  {
190  std::lock_guard<std::mutex> lock{mutex_};
191  if (type == ConcurrentQueueType::kFIFO) {
192  fifo_queue_.emplace_front(std::forward<E>(e));
193  notify = nwait_consumer_ != 0;
194  } else {
195  Entry entry;
196  entry.data = std::move(e);
197  entry.priority = priority;
198  priority_queue_.push_back(std::move(entry));
199  std::push_heap(priority_queue_.begin(), priority_queue_.end());
200  notify = nwait_consumer_ != 0;
201  }
202  }
203  if (notify) cv_.notify_one();
204 }
205 
206 template <typename T, ConcurrentQueueType type>
208  std::unique_lock<std::mutex> lock{mutex_};
209  if (type == ConcurrentQueueType::kFIFO) {
210  ++nwait_consumer_;
211  cv_.wait(lock, [this] {
212  return !fifo_queue_.empty() || exit_now_.load();
213  });
214  --nwait_consumer_;
215  if (!exit_now_.load()) {
216  *rv = std::move(fifo_queue_.front());
217  fifo_queue_.pop_front();
218  return true;
219  } else {
220  return false;
221  }
222  } else {
223  ++nwait_consumer_;
224  cv_.wait(lock, [this] {
225  return !priority_queue_.empty() || exit_now_.load();
226  });
227  --nwait_consumer_;
228  if (!exit_now_.load()) {
229  std::pop_heap(priority_queue_.begin(), priority_queue_.end());
230  *rv = std::move(priority_queue_.back().data);
231  priority_queue_.pop_back();
232  return true;
233  } else {
234  return false;
235  }
236  }
237 }
238 
239 template <typename T, ConcurrentQueueType type>
241  {
242  std::lock_guard<std::mutex> lock{mutex_};
243  exit_now_.store(true);
244  }
245  cv_.notify_all();
246 }
247 
248 template <typename T, ConcurrentQueueType type>
250  std::lock_guard<std::mutex> lock{mutex_};
251  if (type == ConcurrentQueueType::kFIFO) {
252  return fifo_queue_.size();
253  } else {
254  return priority_queue_.size();
255  }
256 }
257 } // namespace dmlc
258 #endif // DMLC_USE_CXX11
259 #endif // DMLC_CONCURRENCY_H_
void lock() noexcept(true)
Acquire lock.
Definition: concurrency.h:143
bool Pop(T *rv)
Pop element from the queue.
Definition: concurrency.h:207
ConcurrentBlockingQueue()
Definition: concurrency.h:153
size_t Size()
Get the size of the queue.
Definition: concurrency.h:249
ConcurrentQueueType
type of concurrent queue
Definition: concurrency.h:57
void unlock() noexcept(true)
Release lock.
Definition: concurrency.h:148
void Push(E &&e, int priority=0)
Push element to the end of the queue.
Definition: concurrency.h:158
~Spinlock()=default
void SignalForKill()
Signal the queue for destruction.
Definition: concurrency.h:240
namespace for dmlc
Definition: array_view.h:12
#define DISALLOW_COPY_AND_ASSIGN(T)
Disable copy constructor and assignment operator.
Definition: base.h:165
void PushFront(E &&e, int priority=0)
Push element to the front of the queue. Only works for FIFO queue. For priority queue it is the same ...
Definition: concurrency.h:183
Spinlock()
Definition: concurrency.h:34
Simple userspace spinlock implementation.
Definition: concurrency.h:25
Cocurrent blocking queue.
Definition: concurrency.h:69