mxnet
blockingconcurrentqueue.h
Go to the documentation of this file.
1 // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
3 // ©2015-2016 Cameron Desrochers. Distributed under the terms of the simplified
4 // BSD license, available at the top of concurrentqueue.h.
5 // Uses Jeff Preshing's semaphore implementation (under the terms of its
6 // separate zlib license, embedded below).
7 
8 #ifndef DMLC_BLOCKINGCONCURRENTQUEUE_H_
9 #define DMLC_BLOCKINGCONCURRENTQUEUE_H_
10 
11 #pragma once
12 
13 #include "concurrentqueue.h"
14 #include <type_traits>
15 #include <cerrno>
16 #include <memory>
17 #include <chrono>
18 #include <ctime>
19 
20 #if defined(_WIN32)
21 // Avoid including windows.h in a header; we only need a handful of
22 // items, so we'll redeclare them here (this is relatively safe since
23 // the API generally has to remain stable between Windows versions).
24 // I know this is an ugly hack but it still beats polluting the global
25 // namespace with thousands of generic names or adding a .cpp for nothing.
26 extern "C" {
27  struct _SECURITY_ATTRIBUTES;
28  __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
29  __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
30  __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
31  __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
32 }
33 #elif defined(__MACH__)
34 #include <mach/mach.h>
35 #elif defined(__unix__)
36 #include <semaphore.h>
37 #endif
38 
39 namespace dmlc {
40 
41 namespace moodycamel
42 {
43 namespace details
44 {
45  // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
46  // portable + lightweight semaphore implementations, originally from
47  // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
48  // LICENSE:
49  // Copyright (c) 2015 Jeff Preshing
50  //
51  // This software is provided 'as-is', without any express or implied
52  // warranty. In no event will the authors be held liable for any damages
53  // arising from the use of this software.
54  //
55  // Permission is granted to anyone to use this software for any purpose,
56  // including commercial applications, and to alter it and redistribute it
57  // freely, subject to the following restrictions:
58  //
59  // 1. The origin of this software must not be misrepresented; you must not
60  // claim that you wrote the original software. If you use this software
61  // in a product, an acknowledgement in the product documentation would be
62  // appreciated but is not required.
63  // 2. Altered source versions must be plainly marked as such, and must not be
64  // misrepresented as being the original software.
65  // 3. This notice may not be removed or altered from any source distribution.
66  namespace mpmc_sema
67  {
68 #if defined(_WIN32)
69  class Semaphore
70  {
71  private:
72  void* m_hSema;
73 
74  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
75  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
76 
77  public:
78  Semaphore(int initialCount = 0)
79  {
80  assert(initialCount >= 0);
81  const long maxLong = 0x7fffffff;
82  m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
83  }
84 
85  ~Semaphore()
86  {
87  CloseHandle(m_hSema);
88  }
89 
90  void wait()
91  {
92  const unsigned long infinite = 0xffffffff;
93  WaitForSingleObject(m_hSema, infinite);
94  }
95 
96  bool try_wait()
97  {
98  const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
99  return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
100  }
101 
102  bool timed_wait(std::uint64_t usecs)
103  {
104  const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
105  return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
106  }
107 
108  void signal(int count = 1)
109  {
110  ReleaseSemaphore(m_hSema, count, nullptr);
111  }
112  };
113 #elif defined(__MACH__)
114  //---------------------------------------------------------
115  // Semaphore (Apple iOS and OSX)
116  // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
117  //---------------------------------------------------------
118  class Semaphore
119  {
120  private:
121  semaphore_t m_sema;
122 
123  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
124  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
125 
126  public:
127  Semaphore(int initialCount = 0)
128  {
129  assert(initialCount >= 0);
130  semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
131  }
132 
133  ~Semaphore()
134  {
135  semaphore_destroy(mach_task_self(), m_sema);
136  }
137 
138  void wait()
139  {
140  semaphore_wait(m_sema);
141  }
142 
143  bool try_wait()
144  {
145  return timed_wait(0);
146  }
147 
148  bool timed_wait(std::uint64_t timeout_usecs)
149  {
150  mach_timespec_t ts;
151  ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
152  ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
153 
154  // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
155  kern_return_t rc = semaphore_timedwait(m_sema, ts);
156 
157  return rc != KERN_OPERATION_TIMED_OUT;
158  }
159 
160  void signal()
161  {
162  semaphore_signal(m_sema);
163  }
164 
165  void signal(int count)
166  {
167  while (count-- > 0)
168  {
169  semaphore_signal(m_sema);
170  }
171  }
172  };
173 #elif defined(__unix__)
174  //---------------------------------------------------------
175  // Semaphore (POSIX, Linux)
176  //---------------------------------------------------------
177  class Semaphore
178  {
179  private:
180  sem_t m_sema;
181 
182  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
183  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
184 
185  public:
186  Semaphore(int initialCount = 0)
187  {
188  assert(initialCount >= 0);
189  sem_init(&m_sema, 0, initialCount);
190  }
191 
192  ~Semaphore()
193  {
194  sem_destroy(&m_sema);
195  }
196 
197  void wait()
198  {
199  // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
200  int rc;
201  do {
202  rc = sem_wait(&m_sema);
203  } while (rc == -1 && errno == EINTR);
204  }
205 
206  bool try_wait()
207  {
208  int rc;
209  do {
210  rc = sem_trywait(&m_sema);
211  } while (rc == -1 && errno == EINTR);
212  return !(rc == -1 && errno == EAGAIN);
213  }
214 
215  bool timed_wait(std::uint64_t usecs)
216  {
217  struct timespec ts;
218  const int usecs_in_1_sec = 1000000;
219  const int nsecs_in_1_sec = 1000000000;
220  clock_gettime(CLOCK_REALTIME, &ts);
221  ts.tv_sec += usecs / usecs_in_1_sec;
222  ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
223  // sem_timedwait bombs if you have more than 1e9 in tv_nsec
224  // so we have to clean things up before passing it in
225  if (ts.tv_nsec >= nsecs_in_1_sec) {
226  ts.tv_nsec -= nsecs_in_1_sec;
227  ++ts.tv_sec;
228  }
229 
230  int rc;
231  do {
232  rc = sem_timedwait(&m_sema, &ts);
233  } while (rc == -1 && errno == EINTR);
234  return !(rc == -1 && errno == ETIMEDOUT);
235  }
236 
237  void signal()
238  {
239  sem_post(&m_sema);
240  }
241 
242  void signal(int count)
243  {
244  while (count-- > 0)
245  {
246  sem_post(&m_sema);
247  }
248  }
249  };
250 #else
251 #error Unsupported platform! (No semaphore wrapper available)
252 #endif
253 
254  //---------------------------------------------------------
255  // LightweightSemaphore
256  //---------------------------------------------------------
257  class LightweightSemaphore
258  {
259  public:
260  typedef std::make_signed<std::size_t>::type ssize_t;
261 
262  private:
263  std::atomic<ssize_t> m_count;
264  Semaphore m_sema;
265 
266  bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
267  {
268  ssize_t oldCount;
269  // Is there a better way to set the initial spin count?
270  // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
271  // as threads start hitting the kernel semaphore.
272  int spin = 10000;
273  while (--spin >= 0)
274  {
275  oldCount = m_count.load(std::memory_order_relaxed);
276  if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
277  return true;
278  std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
279  }
280  oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
281  if (oldCount > 0)
282  return true;
283  if (timeout_usecs < 0)
284  {
285  m_sema.wait();
286  return true;
287  }
288  if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
289  return true;
290  // At this point, we've timed out waiting for the semaphore, but the
291  // count is still decremented indicating we may still be waiting on
292  // it. So we have to re-adjust the count, but only if the semaphore
293  // wasn't signaled enough times for us too since then. If it was, we
294  // need to release the semaphore too.
295  while (true)
296  {
297  oldCount = m_count.load(std::memory_order_acquire);
298  if (oldCount >= 0 && m_sema.try_wait())
299  return true;
300  if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
301  return false;
302  }
303  }
304 
305  ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
306  {
307  assert(max > 0);
308  ssize_t oldCount;
309  int spin = 10000;
310  while (--spin >= 0)
311  {
312  oldCount = m_count.load(std::memory_order_relaxed);
313  if (oldCount > 0)
314  {
315  ssize_t newCount = oldCount > max ? oldCount - max : 0;
316  if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
317  return oldCount - newCount;
318  }
319  std::atomic_signal_fence(std::memory_order_acquire);
320  }
321  oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
322  if (oldCount <= 0)
323  {
324  if (timeout_usecs < 0)
325  m_sema.wait();
326  else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
327  {
328  while (true)
329  {
330  oldCount = m_count.load(std::memory_order_acquire);
331  if (oldCount >= 0 && m_sema.try_wait())
332  break;
333  if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
334  return 0;
335  }
336  }
337  }
338  if (max > 1)
339  return 1 + tryWaitMany(max - 1);
340  return 1;
341  }
342 
343  public:
344  LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
345  {
346  assert(initialCount >= 0);
347  }
348 
349  bool tryWait()
350  {
351  ssize_t oldCount = m_count.load(std::memory_order_relaxed);
352  while (oldCount > 0)
353  {
354  if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
355  return true;
356  }
357  return false;
358  }
359 
360  void wait()
361  {
362  if (!tryWait())
363  waitWithPartialSpinning();
364  }
365 
366  bool wait(std::int64_t timeout_usecs)
367  {
368  return tryWait() || waitWithPartialSpinning(timeout_usecs);
369  }
370 
371  // Acquires between 0 and (greedily) max, inclusive
372  ssize_t tryWaitMany(ssize_t max)
373  {
374  assert(max >= 0);
375  ssize_t oldCount = m_count.load(std::memory_order_relaxed);
376  while (oldCount > 0)
377  {
378  ssize_t newCount = oldCount > max ? oldCount - max : 0;
379  if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
380  return oldCount - newCount;
381  }
382  return 0;
383  }
384 
385  // Acquires at least one, and (greedily) at most max
386  ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
387  {
388  assert(max >= 0);
389  ssize_t result = tryWaitMany(max);
390  if (result == 0 && max > 0)
391  result = waitManyWithPartialSpinning(max, timeout_usecs);
392  return result;
393  }
394 
395  ssize_t waitMany(ssize_t max)
396  {
397  ssize_t result = waitMany(max, -1);
398  assert(result > 0);
399  return result;
400  }
401 
402  void signal(ssize_t count = 1)
403  {
404  assert(count >= 0);
405  ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
406  ssize_t toRelease = -oldCount < count ? -oldCount : count;
407  if (toRelease > 0)
408  {
409  m_sema.signal((int)toRelease);
410  }
411  }
412 
413  ssize_t availableApprox() const
414  {
415  ssize_t count = m_count.load(std::memory_order_relaxed);
416  return count > 0 ? count : 0;
417  }
418  };
419  } // end namespace mpmc_sema
420 } // end namespace details
421 
422 
423 // This is a blocking version of the queue. It has an almost identical interface to
424 // the normal non-blocking version, with the addition of various wait_dequeue() methods
425 // and the removal of producer-specific dequeue methods.
426 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
427 class BlockingConcurrentQueue
428 {
429 private:
430  typedef ::dmlc::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
431  typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
432 
433 public:
434  typedef typename ConcurrentQueue::producer_token_t producer_token_t;
435  typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
436 
437  typedef typename ConcurrentQueue::index_t index_t;
438  typedef typename ConcurrentQueue::size_t size_t;
439  typedef typename std::make_signed<size_t>::type ssize_t;
440 
441  static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
442  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
443  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
444  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
445  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
446  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
447  static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
448 
449 public:
450  // Creates a queue with at least `capacity` element slots; note that the
451  // actual number of elements that can be inserted without additional memory
452  // allocation depends on the number of producers and the block size (e.g. if
453  // the block size is equal to `capacity`, only a single block will be allocated
454  // up-front, which means only a single producer will be able to enqueue elements
455  // without an extra allocation -- blocks aren't shared between producers).
456  // This method is not thread safe -- it is up to the user to ensure that the
457  // queue is fully constructed before it starts being used by other threads (this
458  // includes making the memory effects of construction visible, possibly with a
459  // memory barrier).
460  explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
461  : inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
462  {
463  assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
464  if (!sema) {
465  MOODYCAMEL_THROW(std::bad_alloc());
466  }
467  }
468 
469  BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
470  : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
471  {
472  assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
473  if (!sema) {
474  MOODYCAMEL_THROW(std::bad_alloc());
475  }
476  }
477 
478  // Disable copying and copy assignment
479  BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
480  BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
481 
482  // Moving is supported, but note that it is *not* a thread-safe operation.
483  // Nobody can use the queue while it's being moved, and the memory effects
484  // of that move must be propagated to other threads before they can use it.
485  // Note: When a queue is moved, its tokens are still valid but can only be
486  // used with the destination queue (i.e. semantically they are moved along
487  // with the queue itself).
488  BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
489  : inner(std::move(other.inner)), sema(std::move(other.sema))
490  { }
491 
492  inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
493  {
494  return swap_internal(other);
495  }
496 
497  // Swaps this queue's state with the other's. Not thread-safe.
498  // Swapping two queues does not invalidate their tokens, however
499  // the tokens that were created for one queue must be used with
500  // only the swapped queue (i.e. the tokens are tied to the
501  // queue's movable state, not the object itself).
502  inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
503  {
504  swap_internal(other);
505  }
506 
507 private:
508  BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
509  {
510  if (this == &other) {
511  return *this;
512  }
513 
514  inner.swap(other.inner);
515  sema.swap(other.sema);
516  return *this;
517  }
518 
519 public:
520  // Enqueues a single item (by copying it).
521  // Allocates memory if required. Only fails if memory allocation fails (or implicit
522  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
523  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
524  // Thread-safe.
525  inline bool enqueue(T const& item)
526  {
527  if (details::likely(inner.enqueue(item))) {
528  sema->signal();
529  return true;
530  }
531  return false;
532  }
533 
534  // Enqueues a single item (by moving it, if possible).
535  // Allocates memory if required. Only fails if memory allocation fails (or implicit
536  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
537  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
538  // Thread-safe.
539  inline bool enqueue(T&& item)
540  {
541  if (details::likely(inner.enqueue(std::move(item)))) {
542  sema->signal();
543  return true;
544  }
545  return false;
546  }
547 
548  // Enqueues a single item (by copying it) using an explicit producer token.
549  // Allocates memory if required. Only fails if memory allocation fails (or
550  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
551  // Thread-safe.
552  inline bool enqueue(producer_token_t const& token, T const& item)
553  {
554  if (details::likely(inner.enqueue(token, item))) {
555  sema->signal();
556  return true;
557  }
558  return false;
559  }
560 
561  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
562  // Allocates memory if required. Only fails if memory allocation fails (or
563  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
564  // Thread-safe.
565  inline bool enqueue(producer_token_t const& token, T&& item)
566  {
567  if (details::likely(inner.enqueue(token, std::move(item)))) {
568  sema->signal();
569  return true;
570  }
571  return false;
572  }
573 
574  // Enqueues several items.
575  // Allocates memory if required. Only fails if memory allocation fails (or
576  // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
577  // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
578  // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
579  // Thread-safe.
580  template<typename It>
581  inline bool enqueue_bulk(It itemFirst, size_t count)
582  {
583  if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
584  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
585  return true;
586  }
587  return false;
588  }
589 
590  // Enqueues several items using an explicit producer token.
591  // Allocates memory if required. Only fails if memory allocation fails
592  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
593  // Note: Use std::make_move_iterator if the elements should be moved
594  // instead of copied.
595  // Thread-safe.
596  template<typename It>
597  inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
598  {
599  if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
600  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
601  return true;
602  }
603  return false;
604  }
605 
606  // Enqueues a single item (by copying it).
607  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
608  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
609  // is 0).
610  // Thread-safe.
611  inline bool try_enqueue(T const& item)
612  {
613  if (inner.try_enqueue(item)) {
614  sema->signal();
615  return true;
616  }
617  return false;
618  }
619 
620  // Enqueues a single item (by moving it, if possible).
621  // Does not allocate memory (except for one-time implicit producer).
622  // Fails if not enough room to enqueue (or implicit production is
623  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
624  // Thread-safe.
625  inline bool try_enqueue(T&& item)
626  {
627  if (inner.try_enqueue(std::move(item))) {
628  sema->signal();
629  return true;
630  }
631  return false;
632  }
633 
634  // Enqueues a single item (by copying it) using an explicit producer token.
635  // Does not allocate memory. Fails if not enough room to enqueue.
636  // Thread-safe.
637  inline bool try_enqueue(producer_token_t const& token, T const& item)
638  {
639  if (inner.try_enqueue(token, item)) {
640  sema->signal();
641  return true;
642  }
643  return false;
644  }
645 
646  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
647  // Does not allocate memory. Fails if not enough room to enqueue.
648  // Thread-safe.
649  inline bool try_enqueue(producer_token_t const& token, T&& item)
650  {
651  if (inner.try_enqueue(token, std::move(item))) {
652  sema->signal();
653  return true;
654  }
655  return false;
656  }
657 
658  // Enqueues several items.
659  // Does not allocate memory (except for one-time implicit producer).
660  // Fails if not enough room to enqueue (or implicit production is
661  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
662  // Note: Use std::make_move_iterator if the elements should be moved
663  // instead of copied.
664  // Thread-safe.
665  template<typename It>
666  inline bool try_enqueue_bulk(It itemFirst, size_t count)
667  {
668  if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
669  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
670  return true;
671  }
672  return false;
673  }
674 
675  // Enqueues several items using an explicit producer token.
676  // Does not allocate memory. Fails if not enough room to enqueue.
677  // Note: Use std::make_move_iterator if the elements should be moved
678  // instead of copied.
679  // Thread-safe.
680  template<typename It>
681  inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
682  {
683  if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
684  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
685  return true;
686  }
687  return false;
688  }
689 
690 
691  // Attempts to dequeue from the queue.
692  // Returns false if all producer streams appeared empty at the time they
693  // were checked (so, the queue is likely but not guaranteed to be empty).
694  // Never allocates. Thread-safe.
695  template<typename U>
696  inline bool try_dequeue(U& item)
697  {
698  if (sema->tryWait()) {
699  while (!inner.try_dequeue(item)) {
700  continue;
701  }
702  return true;
703  }
704  return false;
705  }
706 
707  // Attempts to dequeue from the queue using an explicit consumer token.
708  // Returns false if all producer streams appeared empty at the time they
709  // were checked (so, the queue is likely but not guaranteed to be empty).
710  // Never allocates. Thread-safe.
711  template<typename U>
712  inline bool try_dequeue(consumer_token_t& token, U& item)
713  {
714  if (sema->tryWait()) {
715  while (!inner.try_dequeue(token, item)) {
716  continue;
717  }
718  return true;
719  }
720  return false;
721  }
722 
723  // Attempts to dequeue several elements from the queue.
724  // Returns the number of items actually dequeued.
725  // Returns 0 if all producer streams appeared empty at the time they
726  // were checked (so, the queue is likely but not guaranteed to be empty).
727  // Never allocates. Thread-safe.
728  template<typename It>
729  inline size_t try_dequeue_bulk(It itemFirst, size_t max)
730  {
731  size_t count = 0;
732  max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
733  while (count != max) {
734  count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
735  }
736  return count;
737  }
738 
739  // Attempts to dequeue several elements from the queue using an explicit consumer token.
740  // Returns the number of items actually dequeued.
741  // Returns 0 if all producer streams appeared empty at the time they
742  // were checked (so, the queue is likely but not guaranteed to be empty).
743  // Never allocates. Thread-safe.
744  template<typename It>
745  inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
746  {
747  size_t count = 0;
748  max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
749  while (count != max) {
750  count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
751  }
752  return count;
753  }
754 
755 
756 
757  // Blocks the current thread until there's something to dequeue, then
758  // dequeues it.
759  // Never allocates. Thread-safe.
760  template<typename U>
761  inline void wait_dequeue(U& item)
762  {
763  sema->wait();
764  while (!inner.try_dequeue(item)) {
765  continue;
766  }
767  }
768 
769  // Blocks the current thread until either there's something to dequeue
770  // or the timeout (specified in microseconds) expires. Returns false
771  // without setting `item` if the timeout expires, otherwise assigns
772  // to `item` and returns true.
773  // Using a negative timeout indicates an indefinite timeout,
774  // and is thus functionally equivalent to calling wait_dequeue.
775  // Never allocates. Thread-safe.
776  template<typename U>
777  inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
778  {
779  if (!sema->wait(timeout_usecs)) {
780  return false;
781  }
782  while (!inner.try_dequeue(item)) {
783  continue;
784  }
785  return true;
786  }
787 
788  // Blocks the current thread until either there's something to dequeue
789  // or the timeout expires. Returns false without setting `item` if the
790  // timeout expires, otherwise assigns to `item` and returns true.
791  // Never allocates. Thread-safe.
792  template<typename U, typename Rep, typename Period>
793  inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
794  {
795  return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
796  }
797 
798  // Blocks the current thread until there's something to dequeue, then
799  // dequeues it using an explicit consumer token.
800  // Never allocates. Thread-safe.
801  template<typename U>
802  inline void wait_dequeue(consumer_token_t& token, U& item)
803  {
804  sema->wait();
805  while (!inner.try_dequeue(token, item)) {
806  continue;
807  }
808  }
809 
810  // Blocks the current thread until either there's something to dequeue
811  // or the timeout (specified in microseconds) expires. Returns false
812  // without setting `item` if the timeout expires, otherwise assigns
813  // to `item` and returns true.
814  // Using a negative timeout indicates an indefinite timeout,
815  // and is thus functionally equivalent to calling wait_dequeue.
816  // Never allocates. Thread-safe.
817  template<typename U>
818  inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
819  {
820  if (!sema->wait(timeout_usecs)) {
821  return false;
822  }
823  while (!inner.try_dequeue(token, item)) {
824  continue;
825  }
826  return true;
827  }
828 
829  // Blocks the current thread until either there's something to dequeue
830  // or the timeout expires. Returns false without setting `item` if the
831  // timeout expires, otherwise assigns to `item` and returns true.
832  // Never allocates. Thread-safe.
833  template<typename U, typename Rep, typename Period>
834  inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
835  {
836  return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
837  }
838 
839  // Attempts to dequeue several elements from the queue.
840  // Returns the number of items actually dequeued, which will
841  // always be at least one (this method blocks until the queue
842  // is non-empty) and at most max.
843  // Never allocates. Thread-safe.
844  template<typename It>
845  inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
846  {
847  size_t count = 0;
848  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
849  while (count != max) {
850  count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
851  }
852  return count;
853  }
854 
855  // Attempts to dequeue several elements from the queue.
856  // Returns the number of items actually dequeued, which can
857  // be 0 if the timeout expires while waiting for elements,
858  // and at most max.
859  // Using a negative timeout indicates an indefinite timeout,
860  // and is thus functionally equivalent to calling wait_dequeue_bulk.
861  // Never allocates. Thread-safe.
862  template<typename It>
863  inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
864  {
865  size_t count = 0;
866  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
867  while (count != max) {
868  count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
869  }
870  return count;
871  }
872 
873  // Attempts to dequeue several elements from the queue.
874  // Returns the number of items actually dequeued, which can
875  // be 0 if the timeout expires while waiting for elements,
876  // and at most max.
877  // Never allocates. Thread-safe.
878  template<typename It, typename Rep, typename Period>
879  inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
880  {
881  return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
882  }
883 
884  // Attempts to dequeue several elements from the queue using an explicit consumer token.
885  // Returns the number of items actually dequeued, which will
886  // always be at least one (this method blocks until the queue
887  // is non-empty) and at most max.
888  // Never allocates. Thread-safe.
889  template<typename It>
890  inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
891  {
892  size_t count = 0;
893  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
894  while (count != max) {
895  count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
896  }
897  return count;
898  }
899 
900  // Attempts to dequeue several elements from the queue using an explicit consumer token.
901  // Returns the number of items actually dequeued, which can
902  // be 0 if the timeout expires while waiting for elements,
903  // and at most max.
904  // Using a negative timeout indicates an indefinite timeout,
905  // and is thus functionally equivalent to calling wait_dequeue_bulk.
906  // Never allocates. Thread-safe.
907  template<typename It>
908  inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
909  {
910  size_t count = 0;
911  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
912  while (count != max) {
913  count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
914  }
915  return count;
916  }
917 
918  // Attempts to dequeue several elements from the queue using an explicit consumer token.
919  // Returns the number of items actually dequeued, which can
920  // be 0 if the timeout expires while waiting for elements,
921  // and at most max.
922  // Never allocates. Thread-safe.
923  template<typename It, typename Rep, typename Period>
924  inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
925  {
926  return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
927  }
928 
929 
930  // Returns an estimate of the total number of elements currently in the queue. This
931  // estimate is only accurate if the queue has completely stabilized before it is called
932  // (i.e. all enqueue and dequeue operations have completed and their memory effects are
933  // visible on the calling thread, and no further operations start while this method is
934  // being called).
935  // Thread-safe.
936  inline size_t size_approx() const
937  {
938  return (size_t)sema->availableApprox();
939  }
940 
941 
942  // Returns true if the underlying atomic variables used by
943  // the queue are lock-free (they should be on most platforms).
944  // Thread-safe.
945  static bool is_lock_free()
946  {
947  return ConcurrentQueue::is_lock_free();
948  }
949 
950 
951 private:
952  template<typename U>
953  static inline U* create()
954  {
955  auto p = (Traits::malloc)(sizeof(U));
956  return p != nullptr ? new (p) U : nullptr;
957  }
958 
959  template<typename U, typename A1>
960  static inline U* create(A1&& a1)
961  {
962  auto p = (Traits::malloc)(sizeof(U));
963  return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
964  }
965 
966  template<typename U>
967  static inline void destroy(U* p)
968  {
969  if (p != nullptr) {
970  p->~U();
971  }
972  (Traits::free)(p);
973  }
974 
975 private:
976  ConcurrentQueue inner;
977  std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
978 };
979 
980 
981 template<typename T, typename Traits>
982 inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
983 {
984  a.swap(b);
985 }
986 
987 } // end namespace moodycamel
988 } // namespace dmlc
989 
990 #endif // DMLC_BLOCKINGCONCURRENTQUEUE_H_
991 
Definition: optional.h:241
unsigned index_t
this defines the unsigned integer type that can normally be used to store feature index ...
Definition: data.h:32
namespace for dmlc
Definition: array_view.h:12