7 #ifndef DMLC_CONCURRENCY_H_ 8 #define DMLC_CONCURRENCY_H_ 17 #include <condition_variable> 32 #pragma clang diagnostic push 33 #pragma clang diagnostic ignored "-Wbraced-scalar-init" 36 #pragma clang diagnostic pop 42 inline void lock() noexcept(
true);
46 inline void unlock() noexcept(
true);
49 std::atomic_flag lock_;
84 void Push(E&& e,
int priority = 0);
98 void PushFront(E&& e,
int priority = 0);
113 void SignalForKill();
124 inline bool operator<(
const Entry &b)
const {
125 return priority < b.priority;
130 std::condition_variable cv_;
131 std::atomic<bool> exit_now_;
134 std::vector<Entry> priority_queue_;
136 std::deque<T> fifo_queue_;
144 while (lock_.test_and_set(std::memory_order_acquire)) {
149 lock_.clear(std::memory_order_release);
152 template <
typename T, ConcurrentQueueType type>
154 : exit_now_{
false}, nwait_consumer_{0} {}
156 template <
typename T, ConcurrentQueueType type>
157 template <
typename E>
159 static_assert(std::is_same<
typename std::remove_cv<
160 typename std::remove_reference<E>::type>::type,
162 "Types must match.");
165 std::lock_guard<std::mutex> lock{mutex_};
167 fifo_queue_.emplace_back(std::forward<E>(e));
168 notify = nwait_consumer_ != 0;
171 entry.data = std::move(e);
172 entry.priority = priority;
173 priority_queue_.push_back(std::move(entry));
174 std::push_heap(priority_queue_.begin(), priority_queue_.end());
175 notify = nwait_consumer_ != 0;
178 if (notify) cv_.notify_one();
181 template <
typename T, ConcurrentQueueType type>
182 template <
typename E>
184 static_assert(std::is_same<
typename std::remove_cv<
185 typename std::remove_reference<E>::type>::type,
187 "Types must match.");
190 std::lock_guard<std::mutex> lock{mutex_};
192 fifo_queue_.emplace_front(std::forward<E>(e));
193 notify = nwait_consumer_ != 0;
196 entry.data = std::move(e);
197 entry.priority = priority;
198 priority_queue_.push_back(std::move(entry));
199 std::push_heap(priority_queue_.begin(), priority_queue_.end());
200 notify = nwait_consumer_ != 0;
203 if (notify) cv_.notify_one();
206 template <
typename T, ConcurrentQueueType type>
208 std::unique_lock<std::mutex> lock{mutex_};
211 cv_.wait(lock, [
this] {
212 return !fifo_queue_.empty() || exit_now_.load();
215 if (!exit_now_.load()) {
216 *rv = std::move(fifo_queue_.front());
217 fifo_queue_.pop_front();
224 cv_.wait(lock, [
this] {
225 return !priority_queue_.empty() || exit_now_.load();
228 if (!exit_now_.load()) {
229 std::pop_heap(priority_queue_.begin(), priority_queue_.end());
230 *rv = std::move(priority_queue_.back().data);
231 priority_queue_.pop_back();
239 template <
typename T, ConcurrentQueueType type>
242 std::lock_guard<std::mutex> lock{mutex_};
243 exit_now_.store(
true);
248 template <
typename T, ConcurrentQueueType type>
250 std::lock_guard<std::mutex> lock{mutex_};
252 return fifo_queue_.size();
254 return priority_queue_.size();
258 #endif // DMLC_USE_CXX11 259 #endif // DMLC_CONCURRENCY_H_ void lock() noexcept(true)
Acquire lock.
Definition: concurrency.h:143
bool Pop(T *rv)
Pop element from the queue.
Definition: concurrency.h:207
ConcurrentBlockingQueue()
Definition: concurrency.h:153
size_t Size()
Get the size of the queue.
Definition: concurrency.h:249
ConcurrentQueueType
type of concurrent queue
Definition: concurrency.h:57
void unlock() noexcept(true)
Release lock.
Definition: concurrency.h:148
void Push(E &&e, int priority=0)
Push element to the end of the queue.
Definition: concurrency.h:158
void SignalForKill()
Signal the queue for destruction.
Definition: concurrency.h:240
namespace for dmlc
Definition: array_view.h:12
#define DISALLOW_COPY_AND_ASSIGN(T)
Disable copy constructor and assignment operator.
Definition: base.h:165
void PushFront(E &&e, int priority=0)
Push element to the front of the queue. Only works for FIFO queue. For priority queue it is the same ...
Definition: concurrency.h:183
Spinlock()
Definition: concurrency.h:34
Simple userspace spinlock implementation.
Definition: concurrency.h:25
Cocurrent blocking queue.
Definition: concurrency.h:69