8 #ifndef DMLC_BLOCKINGCONCURRENTQUEUE_H_ 9 #define DMLC_BLOCKINGCONCURRENTQUEUE_H_ 14 #include <type_traits> 27 struct _SECURITY_ATTRIBUTES;
28 __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes,
long lInitialCount,
long lMaximumCount, const
wchar_t* lpName);
29 __declspec(dllimport)
int __stdcall CloseHandle(
void* hObject);
30 __declspec(dllimport)
unsigned long __stdcall WaitForSingleObject(
void* hHandle,
unsigned long dwMilliseconds);
31 __declspec(dllimport)
int __stdcall ReleaseSemaphore(
void* hSemaphore,
long lReleaseCount,
long* lpPreviousCount);
33 #elif defined(__MACH__) 34 #include <mach/mach.h> 35 #elif defined(__unix__) 36 #include <semaphore.h> 74 Semaphore(
const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
75 Semaphore& operator=(
const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
78 Semaphore(
int initialCount = 0)
80 assert(initialCount >= 0);
81 const long maxLong = 0x7fffffff;
82 m_hSema = CreateSemaphoreW(
nullptr, initialCount, maxLong,
nullptr);
92 const unsigned long infinite = 0xffffffff;
93 WaitForSingleObject(m_hSema, infinite);
98 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
99 return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
102 bool timed_wait(std::uint64_t usecs)
104 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
105 return WaitForSingleObject(m_hSema, (
unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
108 void signal(
int count = 1)
110 ReleaseSemaphore(m_hSema, count,
nullptr);
113 #elif defined(__MACH__) 123 Semaphore(
const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
124 Semaphore& operator=(
const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
127 Semaphore(
int initialCount = 0)
129 assert(initialCount >= 0);
130 semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
135 semaphore_destroy(mach_task_self(), m_sema);
140 semaphore_wait(m_sema);
145 return timed_wait(0);
148 bool timed_wait(std::uint64_t timeout_usecs)
151 ts.tv_sec =
static_cast<unsigned int>(timeout_usecs / 1000000);
152 ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
155 kern_return_t rc = semaphore_timedwait(m_sema, ts);
157 return rc != KERN_OPERATION_TIMED_OUT;
162 semaphore_signal(m_sema);
165 void signal(
int count)
169 semaphore_signal(m_sema);
173 #elif defined(__unix__) 182 Semaphore(
const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
183 Semaphore& operator=(
const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
186 Semaphore(
int initialCount = 0)
188 assert(initialCount >= 0);
189 sem_init(&m_sema, 0, initialCount);
194 sem_destroy(&m_sema);
202 rc = sem_wait(&m_sema);
203 }
while (rc == -1 && errno == EINTR);
210 rc = sem_trywait(&m_sema);
211 }
while (rc == -1 && errno == EINTR);
212 return !(rc == -1 && errno == EAGAIN);
215 bool timed_wait(std::uint64_t usecs)
218 const int usecs_in_1_sec = 1000000;
219 const int nsecs_in_1_sec = 1000000000;
220 clock_gettime(CLOCK_REALTIME, &ts);
221 ts.tv_sec += usecs / usecs_in_1_sec;
222 ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
225 if (ts.tv_nsec >= nsecs_in_1_sec) {
226 ts.tv_nsec -= nsecs_in_1_sec;
232 rc = sem_timedwait(&m_sema, &ts);
233 }
while (rc == -1 && errno == EINTR);
234 return !(rc == -1 && errno == ETIMEDOUT);
242 void signal(
int count)
251 #error Unsupported platform! (No semaphore wrapper available) 257 class LightweightSemaphore
260 typedef std::make_signed<std::size_t>::type ssize_t;
263 std::atomic<ssize_t> m_count;
266 bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
275 oldCount = m_count.load(std::memory_order_relaxed);
276 if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
278 std::atomic_signal_fence(std::memory_order_acquire);
280 oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
283 if (timeout_usecs < 0)
288 if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
297 oldCount = m_count.load(std::memory_order_acquire);
298 if (oldCount >= 0 && m_sema.try_wait())
300 if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
305 ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
312 oldCount = m_count.load(std::memory_order_relaxed);
315 ssize_t newCount = oldCount > max ? oldCount - max : 0;
316 if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
317 return oldCount - newCount;
319 std::atomic_signal_fence(std::memory_order_acquire);
321 oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
324 if (timeout_usecs < 0)
326 else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
330 oldCount = m_count.load(std::memory_order_acquire);
331 if (oldCount >= 0 && m_sema.try_wait())
333 if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
339 return 1 + tryWaitMany(max - 1);
344 LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
346 assert(initialCount >= 0);
351 ssize_t oldCount = m_count.load(std::memory_order_relaxed);
354 if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
363 waitWithPartialSpinning();
366 bool wait(std::int64_t timeout_usecs)
368 return tryWait() || waitWithPartialSpinning(timeout_usecs);
372 ssize_t tryWaitMany(ssize_t max)
375 ssize_t oldCount = m_count.load(std::memory_order_relaxed);
378 ssize_t newCount = oldCount > max ? oldCount - max : 0;
379 if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
380 return oldCount - newCount;
386 ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
389 ssize_t result = tryWaitMany(max);
390 if (result == 0 && max > 0)
391 result = waitManyWithPartialSpinning(max, timeout_usecs);
395 ssize_t waitMany(ssize_t max)
397 ssize_t result = waitMany(max, -1);
402 void signal(ssize_t count = 1)
405 ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
406 ssize_t toRelease = -oldCount < count ? -oldCount : count;
409 m_sema.signal((
int)toRelease);
413 ssize_t availableApprox()
const 415 ssize_t count = m_count.load(std::memory_order_relaxed);
416 return count > 0 ? count : 0;
426 template<
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
427 class BlockingConcurrentQueue
430 typedef ::dmlc::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
431 typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
434 typedef typename ConcurrentQueue::producer_token_t producer_token_t;
435 typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
438 typedef typename ConcurrentQueue::size_t size_t;
439 typedef typename std::make_signed<size_t>::type ssize_t;
441 static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
442 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
443 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
444 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
445 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
446 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
447 static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
460 explicit BlockingConcurrentQueue(
size_t capacity = 6 * BLOCK_SIZE)
461 : inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
463 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner &&
"BlockingConcurrentQueue must have ConcurrentQueue as its first member");
465 MOODYCAMEL_THROW(std::bad_alloc());
469 BlockingConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
470 : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
472 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner &&
"BlockingConcurrentQueue must have ConcurrentQueue as its first member");
474 MOODYCAMEL_THROW(std::bad_alloc());
479 BlockingConcurrentQueue(BlockingConcurrentQueue
const&) MOODYCAMEL_DELETE_FUNCTION;
480 BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
488 BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
489 : inner(
std::move(other.inner)), sema(
std::move(other.sema))
492 inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
494 return swap_internal(other);
502 inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
504 swap_internal(other);
508 BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
510 if (
this == &other) {
514 inner.swap(other.inner);
515 sema.swap(other.sema);
525 inline bool enqueue(T
const& item)
527 if (details::likely(inner.enqueue(item))) {
539 inline bool enqueue(T&& item)
541 if (details::likely(inner.enqueue(std::move(item)))) {
552 inline bool enqueue(producer_token_t
const& token, T
const& item)
554 if (details::likely(inner.enqueue(token, item))) {
565 inline bool enqueue(producer_token_t
const& token, T&& item)
567 if (details::likely(inner.enqueue(token, std::move(item)))) {
580 template<
typename It>
581 inline bool enqueue_bulk(It itemFirst,
size_t count)
583 if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
584 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
596 template<
typename It>
597 inline bool enqueue_bulk(producer_token_t
const& token, It itemFirst,
size_t count)
599 if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
600 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
611 inline bool try_enqueue(T
const& item)
613 if (inner.try_enqueue(item)) {
625 inline bool try_enqueue(T&& item)
627 if (inner.try_enqueue(std::move(item))) {
637 inline bool try_enqueue(producer_token_t
const& token, T
const& item)
639 if (inner.try_enqueue(token, item)) {
649 inline bool try_enqueue(producer_token_t
const& token, T&& item)
651 if (inner.try_enqueue(token, std::move(item))) {
665 template<
typename It>
666 inline bool try_enqueue_bulk(It itemFirst,
size_t count)
668 if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
669 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
680 template<
typename It>
681 inline bool try_enqueue_bulk(producer_token_t
const& token, It itemFirst,
size_t count)
683 if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
684 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
696 inline bool try_dequeue(U& item)
698 if (sema->tryWait()) {
699 while (!inner.try_dequeue(item)) {
712 inline bool try_dequeue(consumer_token_t& token, U& item)
714 if (sema->tryWait()) {
715 while (!inner.try_dequeue(token, item)) {
728 template<
typename It>
729 inline size_t try_dequeue_bulk(It itemFirst,
size_t max)
732 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
733 while (count != max) {
734 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
744 template<
typename It>
745 inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst,
size_t max)
748 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
749 while (count != max) {
750 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
761 inline void wait_dequeue(U& item)
764 while (!inner.try_dequeue(item)) {
777 inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
779 if (!sema->wait(timeout_usecs)) {
782 while (!inner.try_dequeue(item)) {
792 template<
typename U,
typename Rep,
typename Period>
793 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period>
const& timeout)
795 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
802 inline void wait_dequeue(consumer_token_t& token, U& item)
805 while (!inner.try_dequeue(token, item)) {
818 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
820 if (!sema->wait(timeout_usecs)) {
823 while (!inner.try_dequeue(token, item)) {
833 template<
typename U,
typename Rep,
typename Period>
834 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period>
const& timeout)
836 return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
844 template<
typename It>
845 inline size_t wait_dequeue_bulk(It itemFirst,
size_t max)
848 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
849 while (count != max) {
850 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
862 template<
typename It>
863 inline size_t wait_dequeue_bulk_timed(It itemFirst,
size_t max, std::int64_t timeout_usecs)
866 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
867 while (count != max) {
868 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
878 template<
typename It,
typename Rep,
typename Period>
879 inline size_t wait_dequeue_bulk_timed(It itemFirst,
size_t max, std::chrono::duration<Rep, Period>
const& timeout)
881 return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
889 template<
typename It>
890 inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst,
size_t max)
893 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
894 while (count != max) {
895 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
907 template<
typename It>
908 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst,
size_t max, std::int64_t timeout_usecs)
911 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
912 while (count != max) {
913 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
923 template<
typename It,
typename Rep,
typename Period>
924 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst,
size_t max, std::chrono::duration<Rep, Period>
const& timeout)
926 return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
936 inline size_t size_approx()
const 938 return (
size_t)sema->availableApprox();
945 static bool is_lock_free()
947 return ConcurrentQueue::is_lock_free();
953 static inline U* create()
955 auto p = (Traits::malloc)(
sizeof(U));
956 return p !=
nullptr ?
new (p) U :
nullptr;
959 template<
typename U,
typename A1>
960 static inline U* create(A1&& a1)
962 auto p = (Traits::malloc)(
sizeof(U));
963 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
967 static inline void destroy(U* p)
976 ConcurrentQueue inner;
977 std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
981 template<
typename T,
typename Traits>
982 inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
990 #endif // DMLC_BLOCKINGCONCURRENTQUEUE_H_ 991 Definition: optional.h:241
unsigned index_t
this defines the unsigned integer type that can normally be used to store feature index ...
Definition: data.h:32
namespace for dmlc
Definition: array_view.h:12