mxnet
concurrentqueue.h
Go to the documentation of this file.
1 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
3 // An overview, including benchmark results, is provided here:
4 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
5 // The full design is also described in excruciating detail at:
6 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
7 
8 // Simplified BSD license:
9 // Copyright (c) 2013-2016, Cameron Desrochers.
10 // All rights reserved.
11 //
12 // Redistribution and use in source and binary forms, with or without modification,
13 // are permitted provided that the following conditions are met:
14 //
15 // - Redistributions of source code must retain the above copyright notice, this list of
16 // conditions and the following disclaimer.
17 // - Redistributions in binary form must reproduce the above copyright notice, this list of
18 // conditions and the following disclaimer in the documentation and/or other materials
19 // provided with the distribution.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
22 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
23 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
24 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
26 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
28 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
29 // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 #ifndef DMLC_CONCURRENTQUEUE_H_
31 #define DMLC_CONCURRENTQUEUE_H_
32 #pragma once
33 
34 #if defined(__GNUC__)
35 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36 // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37 // upon assigning any computed values)
38 #pragma GCC diagnostic push
39 #pragma GCC diagnostic ignored "-Wconversion"
40 
41 #ifdef MCDBGQ_USE_RELACY
42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43 #endif
44 #endif
45 
46 #if defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) || defined(_WIN64)
47 #include <windows.h> // for GetCurrentThreadId()
48 #endif
49 
50 #if defined(__APPLE__)
51 #include "TargetConditionals.h"
52 #endif
53 
54 #ifdef MCDBGQ_USE_RELACY
55 #include "relacy/relacy_std.hpp"
56 #include "relacy_shims.h"
57 // We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
58 // We'll override the default trait malloc ourselves without a macro.
59 #undef new
60 #undef delete
61 #undef malloc
62 #undef free
63 #else
64 #include <atomic> // Requires C++11. Sorry VS2010.
65 #include <cassert>
66 #endif
67 #include <cstddef> // for max_align_t
68 #include <cstdint>
69 #include <cstdlib>
70 #include <type_traits>
71 #include <algorithm>
72 #include <utility>
73 #include <limits>
74 #include <climits> // for CHAR_BIT
75 #include <array>
76 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
77 
78 namespace dmlc {
79 
80 // Platform-specific definitions of a numeric thread ID type and an invalid value
81 namespace moodycamel { namespace details {
82 template<typename thread_id_t> struct thread_id_converter {
83  typedef thread_id_t thread_id_numeric_size_t;
84  typedef thread_id_t thread_id_hash_t;
85  static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
86 };
87 } }
88 #if defined(MCDBGQ_USE_RELACY)
89 namespace moodycamel { namespace details {
90  typedef std::uint32_t thread_id_t;
91  static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
92  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
93  static inline thread_id_t thread_id() { return rl::thread_index(); }
94 } }
95 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
96 // No sense pulling in windows.h in a header, we'll manually declare the function
97 // we use and rely on backwards-compatibility for this not to break
98 extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
99 namespace moodycamel { namespace details {
100  static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
101  typedef std::uint32_t thread_id_t;
102  static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
103  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
104  static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
105 } }
106 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
107 namespace moodycamel { namespace details {
108  static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
109 
110  typedef std::thread::id thread_id_t;
111  static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
112 
113  // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
114  // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
115  // be.
116  static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
117 
118  template<std::size_t> struct thread_id_size { };
119  template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
120  template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
121 
122  template<> struct thread_id_converter<thread_id_t> {
123  typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
124 #ifndef __APPLE__
125  typedef std::size_t thread_id_hash_t;
126 #else
127  typedef thread_id_numeric_size_t thread_id_hash_t;
128 #endif
129 
130  static thread_id_hash_t prehash(thread_id_t const& x)
131  {
132 #ifndef __APPLE__
133  return std::hash<std::thread::id>()(x);
134 #else
135  return *reinterpret_cast<thread_id_hash_t const*>(&x);
136 #endif
137  }
138  };
139 } }
140 #else
141 // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
142 // In order to get a numeric thread ID in a platform-independent way, we use a thread-local
143 // static variable's address as a thread identifier :-)
144 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
145 #define MOODYCAMEL_THREADLOCAL __thread
146 #elif defined(_MSC_VER)
147 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
148 #else
149 // Assume C++11 compliant compiler
150 #define MOODYCAMEL_THREADLOCAL thread_local
151 #endif
152 namespace moodycamel { namespace details {
153 typedef std::uintptr_t thread_id_t;
154 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
155 static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
156 static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
157 } }
158 #endif
159 
160 // Exceptions
161 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
162 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
163 #define MOODYCAMEL_EXCEPTIONS_ENABLED
164 #endif
165 #endif
166 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
167 #define MOODYCAMEL_TRY try
168 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
169 #define MOODYCAMEL_RETHROW throw
170 #define MOODYCAMEL_THROW(expr) throw (expr)
171 #else
172 #define MOODYCAMEL_TRY if (true)
173 #define MOODYCAMEL_CATCH(...) else if (false)
174 #define MOODYCAMEL_RETHROW
175 #define MOODYCAMEL_THROW(expr)
176 #endif
177 
178 #ifndef MOODYCAMEL_NOEXCEPT
179 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
180 #define MOODYCAMEL_NOEXCEPT
181 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
182 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
183 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
184 // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
185 // We have to assume *all* non-trivial constructors may throw on VS2012!
186 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
187 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
188 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
189 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
190 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
191 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
192 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
193 #else
194 #define MOODYCAMEL_NOEXCEPT noexcept
195 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
196 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
197 #endif
198 #endif
199 
200 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
201 #ifdef MCDBGQ_USE_RELACY
202 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
203 #else
204 // VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
205 // g++ <=4.7 doesn't support thread_local either.
206 // Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
207 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
208 // Assume `thread_local` is fully supported in all other C++11 compilers/platforms
209 //#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
210 #endif
211 #endif
212 #endif
213 
214 // VS2012 doesn't support deleted functions.
215 // In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
216 #ifndef MOODYCAMEL_DELETE_FUNCTION
217 #if defined(_MSC_VER) && _MSC_VER < 1800
218 #define MOODYCAMEL_DELETE_FUNCTION
219 #else
220 #define MOODYCAMEL_DELETE_FUNCTION = delete
221 #endif
222 #endif
223 
224 // Compiler-specific likely/unlikely hints
225 namespace moodycamel { namespace details {
226 #if defined(__GNUC__)
227 inline bool likely(bool x) { return __builtin_expect((x), true); }
228 inline bool unlikely(bool x) { return __builtin_expect((x), false); }
229 #else
230 inline bool likely(bool x) { return x; }
231  inline bool unlikely(bool x) { return x; }
232 #endif
233 } }
234 
235 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
236 #include "internal/concurrentqueue_internal_debug.h"
237 #endif
238 
239 namespace moodycamel {
240 namespace details {
241 template<typename T>
242 struct const_numeric_max {
243  static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
244  static const T value = std::numeric_limits<T>::is_signed
245  ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
246  : static_cast<T>(-1);
247 };
248 
249 #if defined(__GLIBCXX__)
250 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
251 #else
252 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
253 #endif
254 
255 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
256 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
257 typedef union {
258  std_max_align_t x;
259  long long y;
260  void* z;
261 } max_align_t;
262 }
263 
264 // Default traits for the ConcurrentQueue. To change some of the
265 // traits without re-implementing all of them, inherit from this
266 // struct and shadow the declarations you wish to be different;
267 // since the traits are used as a template type parameter, the
268 // shadowed declarations will be used where defined, and the defaults
269 // otherwise.
270 struct ConcurrentQueueDefaultTraits
271 {
272  // General-purpose size type. std::size_t is strongly recommended.
273  typedef std::size_t size_t;
274 
275  // The type used for the enqueue and dequeue indices. Must be at least as
276  // large as size_t. Should be significantly larger than the number of elements
277  // you expect to hold at once, especially if you have a high turnover rate;
278  // for example, on 32-bit x86, if you expect to have over a hundred million
279  // elements or pump several million elements through your queue in a very
280  // short space of time, using a 32-bit type *may* trigger a race condition.
281  // A 64-bit int type is recommended in that case, and in practice will
282  // prevent a race condition no matter the usage of the queue. Note that
283  // whether the queue is lock-free with a 64-int type depends on the whether
284  // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
285  typedef std::size_t index_t;
286 
287  // Internally, all elements are enqueued and dequeued from multi-element
288  // blocks; this is the smallest controllable unit. If you expect few elements
289  // but many producers, a smaller block size should be favoured. For few producers
290  // and/or many elements, a larger block size is preferred. A sane default
291  // is provided. Must be a power of 2.
292  static const size_t BLOCK_SIZE = 32;
293 
294  // For explicit producers (i.e. when using a producer token), the block is
295  // checked for being empty by iterating through a list of flags, one per element.
296  // For large block sizes, this is too inefficient, and switching to an atomic
297  // counter-based approach is faster. The switch is made for block sizes strictly
298  // larger than this threshold.
299  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
300 
301  // How many full blocks can be expected for a single explicit producer? This should
302  // reflect that number's maximum for optimal performance. Must be a power of 2.
303  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
304 
305  // How many full blocks can be expected for a single implicit producer? This should
306  // reflect that number's maximum for optimal performance. Must be a power of 2.
307  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
308 
309  // The initial size of the hash table mapping thread IDs to implicit producers.
310  // Note that the hash is resized every time it becomes half full.
311  // Must be a power of two, and either 0 or at least 1. If 0, implicit production
312  // (using the enqueue methods without an explicit producer token) is disabled.
313  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
314 
315  // Controls the number of items that an explicit consumer (i.e. one with a token)
316  // must consume before it causes all consumers to rotate and move on to the next
317  // internal queue.
318  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
319 
320  // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
321  // Enqueue operations that would cause this limit to be surpassed will fail. Note
322  // that this limit is enforced at the block level (for performance reasons), i.e.
323  // it's rounded up to the nearest block size.
324  static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
325 
326 
327 #ifndef MCDBGQ_USE_RELACY
328  // Memory allocation can be customized if needed.
329  // malloc should return nullptr on failure, and handle alignment like std::malloc.
330 #if defined(malloc) || defined(free)
331  // Gah, this is 2015, stop defining macros that break standard code already!
332  // Work around malloc/free being special macros:
333  static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
334  static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
335  static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
336  static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
337 #else
338  static inline void* malloc(size_t size) { return std::malloc(size); }
339  static inline void free(void* ptr) { return std::free(ptr); }
340 #endif
341 #else
342  // Debug versions when running under the Relacy race detector (ignore
343  // these in user code)
344  static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
345  static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
346 #endif
347 };
348 
349 
350 // When producing or consuming many elements, the most efficient way is to:
351 // 1) Use one of the bulk-operation methods of the queue with a token
352 // 2) Failing that, use the bulk-operation methods without a token
353 // 3) Failing that, create a token and use that with the single-item methods
354 // 4) Failing that, use the single-parameter methods of the queue
355 // Having said that, don't create tokens willy-nilly -- ideally there should be
356 // a maximum of one token per thread (of each kind).
357 struct ProducerToken;
358 struct ConsumerToken;
359 
360 template<typename T, typename Traits> class ConcurrentQueue;
361 template<typename T, typename Traits> class BlockingConcurrentQueue;
362 class ConcurrentQueueTests;
363 
364 
365 namespace details
366 {
367 struct ConcurrentQueueProducerTypelessBase
368 {
369  ConcurrentQueueProducerTypelessBase* next;
370  std::atomic<bool> inactive;
371  ProducerToken* token;
372 
373  ConcurrentQueueProducerTypelessBase()
374  : next(nullptr), inactive(false), token(nullptr)
375  {
376  }
377 };
378 
379 template<bool use32> struct _hash_32_or_64 {
380  static inline std::uint32_t hash(std::uint32_t h)
381  {
382  // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
383  // Since the thread ID is already unique, all we really want to do is propagate that
384  // uniqueness evenly across all the bits, so that we can use a subset of the bits while
385  // reducing collisions significantly
386  h ^= h >> 16;
387  h *= 0x85ebca6b;
388  h ^= h >> 13;
389  h *= 0xc2b2ae35;
390  return h ^ (h >> 16);
391  }
392 };
393 template<> struct _hash_32_or_64<1> {
394  static inline std::uint64_t hash(std::uint64_t h)
395  {
396  h ^= h >> 33;
397  h *= 0xff51afd7ed558ccd;
398  h ^= h >> 33;
399  h *= 0xc4ceb9fe1a85ec53;
400  return h ^ (h >> 33);
401  }
402 };
403 template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
404 
405 static inline size_t hash_thread_id(thread_id_t id)
406 {
407  static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
408  return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
409  thread_id_converter<thread_id_t>::prehash(id)));
410 }
411 
412 template<typename T>
413 static inline bool circular_less_than(T a, T b)
414 {
415 #ifdef _MSC_VER
416  #pragma warning(push)
417 #pragma warning(disable: 4554)
418 #endif
419  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
420  return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
421 #ifdef _MSC_VER
422 #pragma warning(pop)
423 #endif
424 }
425 
426 template<typename U>
427 static inline char* align_for(char* ptr)
428 {
429  const std::size_t alignment = std::alignment_of<U>::value;
430  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
431 }
432 
433 template<typename T>
434 static inline T ceil_to_pow_2(T x)
435 {
436  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
437 
438  // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
439  --x;
440  x |= x >> 1;
441  x |= x >> 2;
442  x |= x >> 4;
443  for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
444  x |= x >> (i << 3);
445  }
446  ++x;
447  return x;
448 }
449 
450 template<typename T>
451 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
452 {
453  T temp = std::move(left.load(std::memory_order_relaxed));
454  left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
455  right.store(std::move(temp), std::memory_order_relaxed);
456 }
457 
458 template<typename T>
459 static inline T const& nomove(T const& x)
460 {
461  return x;
462 }
463 
464 template<bool Enable>
465 struct nomove_if
466 {
467  template<typename T>
468  static inline T const& eval(T const& x)
469  {
470  return x;
471  }
472 };
473 
474 template<>
475 struct nomove_if<false>
476 {
477  template<typename U>
478  static inline auto eval(U&& x)
479  -> decltype(std::forward<U>(x))
480  {
481  return std::forward<U>(x);
482  }
483 };
484 
485 template<typename It>
486 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
487 {
488  return *it;
489 }
490 
491 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
492 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
493 #else
494 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
495 #endif
496 
497 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
498 #ifdef MCDBGQ_USE_RELACY
499  typedef RelacyThreadExitListener ThreadExitListener;
500  typedef RelacyThreadExitNotifier ThreadExitNotifier;
501 #else
502  struct ThreadExitListener
503  {
504  typedef void (*callback_t)(void*);
505  callback_t callback;
506  void* userData;
507 
508  ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
509  };
510 
511 
512  class ThreadExitNotifier
513  {
514  public:
515  static void subscribe(ThreadExitListener* listener)
516  {
517  auto& tlsInst = instance();
518  listener->next = tlsInst.tail;
519  tlsInst.tail = listener;
520  }
521 
522  static void unsubscribe(ThreadExitListener* listener)
523  {
524  auto& tlsInst = instance();
525  ThreadExitListener** prev = &tlsInst.tail;
526  for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
527  if (ptr == listener) {
528  *prev = ptr->next;
529  break;
530  }
531  prev = &ptr->next;
532  }
533  }
534 
535  private:
536  ThreadExitNotifier() : tail(nullptr) { }
537  ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
538  ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
539 
540  ~ThreadExitNotifier()
541  {
542  // This thread is about to exit, let everyone know!
543  assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
544  for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
545  ptr->callback(ptr->userData);
546  }
547  }
548 
549  // Thread-local
550  static inline ThreadExitNotifier& instance()
551  {
552  static thread_local ThreadExitNotifier notifier;
553  return notifier;
554  }
555 
556  private:
557  ThreadExitListener* tail;
558  };
559 #endif
560 #endif
561 
562 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
563 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
564 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
565 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
566 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
567 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
568 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
569 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
570 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
571 }
572 
573 
574 struct ProducerToken
575 {
576  template<typename T, typename Traits>
577  explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
578 
579  template<typename T, typename Traits>
580  explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
581 
582  ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
583  : producer(other.producer)
584  {
585  other.producer = nullptr;
586  if (producer != nullptr) {
587  producer->token = this;
588  }
589  }
590 
591  inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
592  {
593  swap(other);
594  return *this;
595  }
596 
597  void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
598  {
599  std::swap(producer, other.producer);
600  if (producer != nullptr) {
601  producer->token = this;
602  }
603  if (other.producer != nullptr) {
604  other.producer->token = &other;
605  }
606  }
607 
608  // A token is always valid unless:
609  // 1) Memory allocation failed during construction
610  // 2) It was moved via the move constructor
611  // (Note: assignment does a swap, leaving both potentially valid)
612  // 3) The associated queue was destroyed
613  // Note that if valid() returns true, that only indicates
614  // that the token is valid for use with a specific queue,
615  // but not which one; that's up to the user to track.
616  inline bool valid() const { return producer != nullptr; }
617 
618  ~ProducerToken()
619  {
620  if (producer != nullptr) {
621  producer->token = nullptr;
622  producer->inactive.store(true, std::memory_order_release);
623  }
624  }
625 
626  // Disable copying and assignment
627  ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
628  ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
629 
630  private:
631  template<typename T, typename Traits> friend class ConcurrentQueue;
632  friend class ConcurrentQueueTests;
633 
634  protected:
635  details::ConcurrentQueueProducerTypelessBase* producer;
636 };
637 
638 
639 struct ConsumerToken
640 {
641  template<typename T, typename Traits>
642  explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
643 
644  template<typename T, typename Traits>
645  explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
646 
647  ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
648  : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
649  {
650  }
651 
652  inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
653  {
654  swap(other);
655  return *this;
656  }
657 
658  void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
659  {
660  std::swap(initialOffset, other.initialOffset);
661  std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
662  std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
663  std::swap(currentProducer, other.currentProducer);
664  std::swap(desiredProducer, other.desiredProducer);
665  }
666 
667  // Disable copying and assignment
668  ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
669  ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
670 
671  private:
672  template<typename T, typename Traits> friend class ConcurrentQueue;
673  friend class ConcurrentQueueTests;
674 
675  private: // but shared with ConcurrentQueue
676  std::uint32_t initialOffset;
677  std::uint32_t lastKnownGlobalOffset;
678  std::uint32_t itemsConsumedFromCurrent;
679  details::ConcurrentQueueProducerTypelessBase* currentProducer;
680  details::ConcurrentQueueProducerTypelessBase* desiredProducer;
681 };
682 
683 // Need to forward-declare this swap because it's in a namespace.
684 // See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
685 template<typename T, typename Traits>
686 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
687 
688 
689 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
690 class ConcurrentQueue {
691  public:
692  typedef ::dmlc::moodycamel::ProducerToken producer_token_t;
693  typedef ::dmlc::moodycamel::ConsumerToken consumer_token_t;
694 
695  typedef typename Traits::index_t index_t;
696  typedef typename Traits::size_t size_t;
697 
698  static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
699  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
700  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
701  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
702  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
703  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
704 #ifdef _MSC_VER
705  #pragma warning(push)
706 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
707 #pragma warning(disable: 4309) // static_cast: Truncation of constant value
708 #endif
709  static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value -
710  static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) <
711  BLOCK_SIZE) ? details::const_numeric_max<size_t>::value
712  : (
713  (static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) +
714  (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
715 #ifdef _MSC_VER
716 #pragma warning(pop)
717 #endif
718 
719  static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
720  "Traits::size_t must be an unsigned integral type");
721  static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
722  "Traits::index_t must be an unsigned integral type");
723  static_assert(sizeof(index_t) >= sizeof(size_t),
724  "Traits::index_t must be at least as wide as Traits::size_t");
725  static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
726  "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
727  static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) &&
728  !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD &
729  (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
730  "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
731  static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) &&
732  !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
733  "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
734  static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) &&
735  !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
736  "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
737  static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) ||
738  !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
739  "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
740  static_assert(
741  INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
742  "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
743 
744  public:
745  // Creates a queue with at least `capacity` element slots; note that the
746  // actual number of elements that can be inserted without additional memory
747  // allocation depends on the number of producers and the block size (e.g. if
748  // the block size is equal to `capacity`, only a single block will be allocated
749  // up-front, which means only a single producer will be able to enqueue elements
750  // without an extra allocation -- blocks aren't shared between producers).
751  // This method is not thread safe -- it is up to the user to ensure that the
752  // queue is fully constructed before it starts being used by other threads (this
753  // includes making the memory effects of construction visible, possibly with a
754  // memory barrier).
755  explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
756  : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(
757  0), globalExplicitConsumerOffset(0) {
758  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
759  populate_initial_implicit_producer_hash();
760  populate_initial_block_list(
761  capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
762 
763 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
764  // Track all the producers using a fully-resolved typed list for
765  // each kind; this makes it possible to debug them starting from
766  // the root queue object (otherwise wacky casts are needed that
767  // don't compile in the debugger's expression evaluator).
768  explicitProducers.store(nullptr, std::memory_order_relaxed);
769  implicitProducers.store(nullptr, std::memory_order_relaxed);
770 #endif
771  }
772 
773  // Computes the correct amount of pre-allocated blocks for you based
774  // on the minimum number of elements you want available at any given
775  // time, and the maximum concurrent number of each type of producer.
776  ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
777  : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(
778  0), globalExplicitConsumerOffset(0) {
779  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
780  populate_initial_implicit_producer_hash();
781  size_t blocks =
782  (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) +
783  2 * (maxExplicitProducers + maxImplicitProducers);
784  populate_initial_block_list(blocks);
785 
786 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
787  explicitProducers.store(nullptr, std::memory_order_relaxed);
788  implicitProducers.store(nullptr, std::memory_order_relaxed);
789 #endif
790  }
791 
792  // Note: The queue should not be accessed concurrently while it's
793  // being deleted. It's up to the user to synchronize this.
794  // This method is not thread safe.
795  ~ConcurrentQueue() {
796  // Destroy producers
797  auto ptr = producerListTail.load(std::memory_order_relaxed);
798  while (ptr != nullptr) {
799  auto next = ptr->next_prod();
800  if (ptr->token != nullptr) {
801  ptr->token->producer = nullptr;
802  }
803  destroy(ptr);
804  ptr = next;
805  }
806 
807  // Destroy implicit producer hash tables
808  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
809  auto hash = implicitProducerHash.load(std::memory_order_relaxed);
810  while (hash != nullptr) {
811  auto prev = hash->prev;
812  if (prev !=
813  nullptr) { // The last hash is part of this object and was not allocated dynamically
814  for (size_t i = 0; i != hash->capacity; ++i) {
815  hash->entries[i].~ImplicitProducerKVP();
816  }
817  hash->~ImplicitProducerHash();
818  (Traits::free)(hash);
819  }
820  hash = prev;
821  }
822  }
823 
824  // Destroy global free list
825  auto block = freeList.head_unsafe();
826  while (block != nullptr) {
827  auto next = block->freeListNext.load(std::memory_order_relaxed);
828  if (block->dynamicallyAllocated) {
829  destroy(block);
830  }
831  block = next;
832  }
833 
834  // Destroy initial free list
835  destroy_array(initialBlockPool, initialBlockPoolSize);
836  }
837 
838  // Disable copying and copy assignment
839  ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
840 
841  ConcurrentQueue &operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
842 
843  // Moving is supported, but note that it is *not* a thread-safe operation.
844  // Nobody can use the queue while it's being moved, and the memory effects
845  // of that move must be propagated to other threads before they can use it.
846  // Note: When a queue is moved, its tokens are still valid but can only be
847  // used with the destination queue (i.e. semantically they are moved along
848  // with the queue itself).
849  ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
850  : producerListTail(other.producerListTail.load(std::memory_order_relaxed)), producerCount(
851  other.producerCount.load(std::memory_order_relaxed)), initialBlockPoolIndex(
852  other.initialBlockPoolIndex.load(std::memory_order_relaxed)), initialBlockPool(
853  other.initialBlockPool), initialBlockPoolSize(other.initialBlockPoolSize), freeList(
854  std::move(other.freeList)), nextExplicitConsumerId(
855  other.nextExplicitConsumerId.load(std::memory_order_relaxed)), globalExplicitConsumerOffset(
856  other.globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
857  // Move the other one into this, and leave the other one as an empty queue
858  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
859  populate_initial_implicit_producer_hash();
860  swap_implicit_producer_hashes(other);
861 
862  other.producerListTail.store(nullptr, std::memory_order_relaxed);
863  other.producerCount.store(0, std::memory_order_relaxed);
864  other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
865  other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
866 
867 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
868  explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
869  other.explicitProducers.store(nullptr, std::memory_order_relaxed);
870  implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
871  other.implicitProducers.store(nullptr, std::memory_order_relaxed);
872 #endif
873 
874  other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
875  other.initialBlockPoolSize = 0;
876  other.initialBlockPool = nullptr;
877 
878  reown_producers();
879  }
880 
881  inline ConcurrentQueue &operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT {
882  return swap_internal(other);
883  }
884 
885  // Swaps this queue's state with the other's. Not thread-safe.
886  // Swapping two queues does not invalidate their tokens, however
887  // the tokens that were created for one queue must be used with
888  // only the swapped queue (i.e. the tokens are tied to the
889  // queue's movable state, not the object itself).
890  inline void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT {
891  swap_internal(other);
892  }
893 
894  private:
895  ConcurrentQueue &swap_internal(ConcurrentQueue &other) {
896  if (this == &other) {
897  return *this;
898  }
899 
900  details::swap_relaxed(producerListTail, other.producerListTail);
901  details::swap_relaxed(producerCount, other.producerCount);
902  details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
903  std::swap(initialBlockPool, other.initialBlockPool);
904  std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
905  freeList.swap(other.freeList);
906  details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
907  details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
908 
909  swap_implicit_producer_hashes(other);
910 
911  reown_producers();
912  other.reown_producers();
913 
914 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
915  details::swap_relaxed(explicitProducers, other.explicitProducers);
916  details::swap_relaxed(implicitProducers, other.implicitProducers);
917 #endif
918 
919  return *this;
920  }
921 
922  public:
923  // Enqueues a single item (by copying it).
924  // Allocates memory if required. Only fails if memory allocation fails (or implicit
925  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
926  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
927  // Thread-safe.
928  inline bool enqueue(T const &item) {
929  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
930  return inner_enqueue<CanAlloc>(item);
931  }
932 
933  // Enqueues a single item (by moving it, if possible).
934  // Allocates memory if required. Only fails if memory allocation fails (or implicit
935  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
936  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
937  // Thread-safe.
938  inline bool enqueue(T &&item) {
939  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
940  return inner_enqueue<CanAlloc>(std::move(item));
941  }
942 
943  // Enqueues a single item (by copying it) using an explicit producer token.
944  // Allocates memory if required. Only fails if memory allocation fails (or
945  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
946  // Thread-safe.
947  inline bool enqueue(producer_token_t const &token, T const &item) {
948  return inner_enqueue<CanAlloc>(token, item);
949  }
950 
951  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
952  // Allocates memory if required. Only fails if memory allocation fails (or
953  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
954  // Thread-safe.
955  inline bool enqueue(producer_token_t const &token, T &&item) {
956  return inner_enqueue<CanAlloc>(token, std::move(item));
957  }
958 
959  // Enqueues several items.
960  // Allocates memory if required. Only fails if memory allocation fails (or
961  // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
962  // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
963  // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
964  // Thread-safe.
965  template<typename It>
966  bool enqueue_bulk(It itemFirst, size_t count) {
967  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
968  return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
969  }
970 
971  // Enqueues several items using an explicit producer token.
972  // Allocates memory if required. Only fails if memory allocation fails
973  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
974  // Note: Use std::make_move_iterator if the elements should be moved
975  // instead of copied.
976  // Thread-safe.
977  template<typename It>
978  bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
979  return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
980  }
981 
982  // Enqueues a single item (by copying it).
983  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
984  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
985  // is 0).
986  // Thread-safe.
987  inline bool try_enqueue(T const &item) {
988  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
989  return inner_enqueue<CannotAlloc>(item);
990  }
991 
992  // Enqueues a single item (by moving it, if possible).
993  // Does not allocate memory (except for one-time implicit producer).
994  // Fails if not enough room to enqueue (or implicit production is
995  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
996  // Thread-safe.
997  inline bool try_enqueue(T &&item) {
998  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
999  return inner_enqueue<CannotAlloc>(std::move(item));
1000  }
1001 
1002  // Enqueues a single item (by copying it) using an explicit producer token.
1003  // Does not allocate memory. Fails if not enough room to enqueue.
1004  // Thread-safe.
1005  inline bool try_enqueue(producer_token_t const &token, T const &item) {
1006  return inner_enqueue<CannotAlloc>(token, item);
1007  }
1008 
1009  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1010  // Does not allocate memory. Fails if not enough room to enqueue.
1011  // Thread-safe.
1012  inline bool try_enqueue(producer_token_t const &token, T &&item) {
1013  return inner_enqueue<CannotAlloc>(token, std::move(item));
1014  }
1015 
1016  // Enqueues several items.
1017  // Does not allocate memory (except for one-time implicit producer).
1018  // Fails if not enough room to enqueue (or implicit production is
1019  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1020  // Note: Use std::make_move_iterator if the elements should be moved
1021  // instead of copied.
1022  // Thread-safe.
1023  template<typename It>
1024  bool try_enqueue_bulk(It itemFirst, size_t count) {
1025  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1026  return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1027  }
1028 
1029  // Enqueues several items using an explicit producer token.
1030  // Does not allocate memory. Fails if not enough room to enqueue.
1031  // Note: Use std::make_move_iterator if the elements should be moved
1032  // instead of copied.
1033  // Thread-safe.
1034  template<typename It>
1035  bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1036  return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1037  }
1038 
1039 
1040  // Attempts to dequeue from the queue.
1041  // Returns false if all producer streams appeared empty at the time they
1042  // were checked (so, the queue is likely but not guaranteed to be empty).
1043  // Never allocates. Thread-safe.
1044  template<typename U>
1045  bool try_dequeue(U &item) {
1046  // Instead of simply trying each producer in turn (which could cause needless contention on the first
1047  // producer), we score them heuristically.
1048  size_t nonEmptyCount = 0;
1049  ProducerBase *best = nullptr;
1050  size_t bestSize = 0;
1051  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1052  nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1053  auto size = ptr->size_approx();
1054  if (size > 0) {
1055  if (size > bestSize) {
1056  bestSize = size;
1057  best = ptr;
1058  }
1059  ++nonEmptyCount;
1060  }
1061  }
1062 
1063  // If there was at least one non-empty queue but it appears empty at the time
1064  // we try to dequeue from it, we need to make sure every queue's been tried
1065  if (nonEmptyCount > 0) {
1066  if (details::likely(best->dequeue(item))) {
1067  return true;
1068  }
1069  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1070  ptr != nullptr; ptr = ptr->next_prod()) {
1071  if (ptr != best && ptr->dequeue(item)) {
1072  return true;
1073  }
1074  }
1075  }
1076  return false;
1077  }
1078 
1079  // Attempts to dequeue from the queue.
1080  // Returns false if all producer streams appeared empty at the time they
1081  // were checked (so, the queue is likely but not guaranteed to be empty).
1082  // This differs from the try_dequeue(item) method in that this one does
1083  // not attempt to reduce contention by interleaving the order that producer
1084  // streams are dequeued from. So, using this method can reduce overall throughput
1085  // under contention, but will give more predictable results in single-threaded
1086  // consumer scenarios. This is mostly only useful for internal unit tests.
1087  // Never allocates. Thread-safe.
1088  template<typename U>
1089  bool try_dequeue_non_interleaved(U &item) {
1090  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1091  ptr != nullptr; ptr = ptr->next_prod()) {
1092  if (ptr->dequeue(item)) {
1093  return true;
1094  }
1095  }
1096  return false;
1097  }
1098 
1099  // Attempts to dequeue from the queue using an explicit consumer token.
1100  // Returns false if all producer streams appeared empty at the time they
1101  // were checked (so, the queue is likely but not guaranteed to be empty).
1102  // Never allocates. Thread-safe.
1103  template<typename U>
1104  bool try_dequeue(consumer_token_t &token, U &item) {
1105  // The idea is roughly as follows:
1106  // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1107  // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1108  // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1109  // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1110 
1111  if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset !=
1112  globalExplicitConsumerOffset.load(
1113  std::memory_order_relaxed)) {
1114  if (!update_current_producer_after_rotation(token)) {
1115  return false;
1116  }
1117  }
1118 
1119  // If there was at least one non-empty queue but it appears empty at the time
1120  // we try to dequeue from it, we need to make sure every queue's been tried
1121  if (static_cast<ProducerBase *>(token.currentProducer)->dequeue(item)) {
1122  if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1123  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1124  }
1125  return true;
1126  }
1127 
1128  auto tail = producerListTail.load(std::memory_order_acquire);
1129  auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1130  if (ptr == nullptr) {
1131  ptr = tail;
1132  }
1133  while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1134  if (ptr->dequeue(item)) {
1135  token.currentProducer = ptr;
1136  token.itemsConsumedFromCurrent = 1;
1137  return true;
1138  }
1139  ptr = ptr->next_prod();
1140  if (ptr == nullptr) {
1141  ptr = tail;
1142  }
1143  }
1144  return false;
1145  }
1146 
1147  // Attempts to dequeue several elements from the queue.
1148  // Returns the number of items actually dequeued.
1149  // Returns 0 if all producer streams appeared empty at the time they
1150  // were checked (so, the queue is likely but not guaranteed to be empty).
1151  // Never allocates. Thread-safe.
1152  template<typename It>
1153  size_t try_dequeue_bulk(It itemFirst, size_t max) {
1154  size_t count = 0;
1155  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1156  ptr != nullptr; ptr = ptr->next_prod()) {
1157  count += ptr->dequeue_bulk(itemFirst, max - count);
1158  if (count == max) {
1159  break;
1160  }
1161  }
1162  return count;
1163  }
1164 
1165  // Attempts to dequeue several elements from the queue using an explicit consumer token.
1166  // Returns the number of items actually dequeued.
1167  // Returns 0 if all producer streams appeared empty at the time they
1168  // were checked (so, the queue is likely but not guaranteed to be empty).
1169  // Never allocates. Thread-safe.
1170  template<typename It>
1171  size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max) {
1172  if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset !=
1173  globalExplicitConsumerOffset.load(
1174  std::memory_order_relaxed)) {
1175  if (!update_current_producer_after_rotation(token)) {
1176  return 0;
1177  }
1178  }
1179 
1180  size_t count = static_cast<ProducerBase *>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1181  if (count == max) {
1182  if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >=
1183  EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1184  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1185  }
1186  return max;
1187  }
1188  token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1189  max -= count;
1190 
1191  auto tail = producerListTail.load(std::memory_order_acquire);
1192  auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1193  if (ptr == nullptr) {
1194  ptr = tail;
1195  }
1196  while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1197  auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1198  count += dequeued;
1199  if (dequeued != 0) {
1200  token.currentProducer = ptr;
1201  token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1202  }
1203  if (dequeued == max) {
1204  break;
1205  }
1206  max -= dequeued;
1207  ptr = ptr->next_prod();
1208  if (ptr == nullptr) {
1209  ptr = tail;
1210  }
1211  }
1212  return count;
1213  }
1214 
1215 
1216  // Attempts to dequeue from a specific producer's inner queue.
1217  // If you happen to know which producer you want to dequeue from, this
1218  // is significantly faster than using the general-case try_dequeue methods.
1219  // Returns false if the producer's queue appeared empty at the time it
1220  // was checked (so, the queue is likely but not guaranteed to be empty).
1221  // Never allocates. Thread-safe.
1222  template<typename U>
1223  inline bool try_dequeue_from_producer(producer_token_t const &producer, U &item) {
1224  return static_cast<ExplicitProducer *>(producer.producer)->dequeue(item);
1225  }
1226 
1227  // Attempts to dequeue several elements from a specific producer's inner queue.
1228  // Returns the number of items actually dequeued.
1229  // If you happen to know which producer you want to dequeue from, this
1230  // is significantly faster than using the general-case try_dequeue methods.
1231  // Returns 0 if the producer's queue appeared empty at the time it
1232  // was checked (so, the queue is likely but not guaranteed to be empty).
1233  // Never allocates. Thread-safe.
1234  template<typename It>
1235  inline size_t
1236  try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max) {
1237  return static_cast<ExplicitProducer *>(producer.producer)->dequeue_bulk(itemFirst, max);
1238  }
1239 
1240 
1241  // Returns an estimate of the total number of elements currently in the queue. This
1242  // estimate is only accurate if the queue has completely stabilized before it is called
1243  // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1244  // visible on the calling thread, and no further operations start while this method is
1245  // being called).
1246  // Thread-safe.
1247  size_t size_approx() const {
1248  size_t size = 0;
1249  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1250  ptr != nullptr; ptr = ptr->next_prod()) {
1251  size += ptr->size_approx();
1252  }
1253  return size;
1254  }
1255 
1256 
1257  // Returns true if the underlying atomic variables used by
1258  // the queue are lock-free (they should be on most platforms).
1259  // Thread-safe.
1260  static bool is_lock_free() {
1261  return
1262  details::static_is_lock_free<bool>::value == 2 &&
1263  details::static_is_lock_free<size_t>::value == 2 &&
1264  details::static_is_lock_free<std::uint32_t>::value == 2 &&
1265  details::static_is_lock_free<index_t>::value == 2 &&
1266  details::static_is_lock_free<void *>::value == 2 &&
1267  details::static_is_lock_free<typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value ==
1268  2;
1269  }
1270 
1271 
1272  private:
1273  friend struct ProducerToken;
1274  friend struct ConsumerToken;
1275  friend struct ExplicitProducer;
1276 
1277  friend class ConcurrentQueueTests;
1278 
1279  enum AllocationMode {
1280  CanAlloc, CannotAlloc
1281  };
1282 
1283 
1285  // Queue methods
1287 
1288  template<AllocationMode canAlloc, typename U>
1289  inline bool inner_enqueue(producer_token_t const &token, U &&element) {
1290  return static_cast<ExplicitProducer *>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1291  std::forward<U>(element));
1292  }
1293 
1294  template<AllocationMode canAlloc, typename U>
1295  inline bool inner_enqueue(U &&element) {
1296  auto producer = get_or_add_implicit_producer();
1297  return producer == nullptr ? false
1298  : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(
1299  std::forward<U>(element));
1300  }
1301 
1302  template<AllocationMode canAlloc, typename It>
1303  inline bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1304  return static_cast<ExplicitProducer *>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(
1305  itemFirst, count);
1306  }
1307 
1308  template<AllocationMode canAlloc, typename It>
1309  inline bool inner_enqueue_bulk(It itemFirst, size_t count) {
1310  auto producer = get_or_add_implicit_producer();
1311  return producer == nullptr ? false
1312  : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(
1313  itemFirst, count);
1314  }
1315 
1316  inline bool update_current_producer_after_rotation(consumer_token_t &token) {
1317  // Ah, there's been a rotation, figure out where we should be!
1318  auto tail = producerListTail.load(std::memory_order_acquire);
1319  if (token.desiredProducer == nullptr && tail == nullptr) {
1320  return false;
1321  }
1322  auto prodCount = producerCount.load(std::memory_order_relaxed);
1323  auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1324  if (details::unlikely(token.desiredProducer == nullptr)) {
1325  // Aha, first time we're dequeueing anything.
1326  // Figure out our local position
1327  // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1328  std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1329  token.desiredProducer = tail;
1330  for (std::uint32_t i = 0; i != offset; ++i) {
1331  token.desiredProducer = static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1332  if (token.desiredProducer == nullptr) {
1333  token.desiredProducer = tail;
1334  }
1335  }
1336  }
1337 
1338  std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1339  if (delta >= prodCount) {
1340  delta = delta % prodCount;
1341  }
1342  for (std::uint32_t i = 0; i != delta; ++i) {
1343  token.desiredProducer = static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1344  if (token.desiredProducer == nullptr) {
1345  token.desiredProducer = tail;
1346  }
1347  }
1348 
1349  token.lastKnownGlobalOffset = globalOffset;
1350  token.currentProducer = token.desiredProducer;
1351  token.itemsConsumedFromCurrent = 0;
1352  return true;
1353  }
1354 
1355 
1357  // Free list
1359 
1360  template<typename N>
1361  struct FreeListNode {
1362  FreeListNode()
1363  : freeListRefs(0), freeListNext(nullptr) {}
1364 
1365  std::atomic<std::uint32_t> freeListRefs;
1366  std::atomic<N *> freeListNext;
1367  };
1368 
1369  // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1370  // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1371  // speedy under low contention.
1372  template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1373  struct FreeList {
1374  FreeList()
1375  : freeListHead(nullptr) {}
1376 
1377  FreeList(FreeList &&other)
1378  : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) {
1379  other.freeListHead.store(nullptr, std::memory_order_relaxed);
1380  }
1381 
1382  void swap(FreeList &other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1383 
1384  FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1385 
1386  FreeList &operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1387 
1388  inline void add(N *node) {
1389 #if MCDBGQ_NOLOCKFREE_FREELIST
1390  debug::DebugLock lock(mutex);
1391 #endif
1392  // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1393  // set it using a fetch_add
1394  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1395  // Oh look! We were the last ones referencing this node, and we know
1396  // we want to add it to the free list, so let's do it!
1397  add_knowing_refcount_is_zero(node);
1398  }
1399  }
1400 
1401  inline N *try_get() {
1402 #if MCDBGQ_NOLOCKFREE_FREELIST
1403  debug::DebugLock lock(mutex);
1404 #endif
1405  auto head = freeListHead.load(std::memory_order_acquire);
1406  while (head != nullptr) {
1407  auto prevHead = head;
1408  auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1409  if ((refs & REFS_MASK) == 0 ||
1410  !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire,
1411  std::memory_order_relaxed)) {
1412  head = freeListHead.load(std::memory_order_acquire);
1413  continue;
1414  }
1415 
1416  // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1417  // next and not worry about it changing between now and the time we do the CAS
1418  auto next = head->freeListNext.load(std::memory_order_relaxed);
1419  if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire,
1420  std::memory_order_relaxed)) {
1421  // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1422  // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1423  assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1424 
1425  // Decrease refcount twice, once for our ref, and once for the list's ref
1426  head->freeListRefs.fetch_sub(2, std::memory_order_release);
1427  return head;
1428  }
1429 
1430  // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1431  // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1432  // count decrement happens-after the CAS on the head.
1433  refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1434  if (refs == SHOULD_BE_ON_FREELIST + 1) {
1435  add_knowing_refcount_is_zero(prevHead);
1436  }
1437  }
1438 
1439  return nullptr;
1440  }
1441 
1442  // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1443  N *head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1444 
1445  private:
1446  inline void add_knowing_refcount_is_zero(N *node) {
1447  // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1448  // only one copy of this method per node at a time, i.e. the single thread case), then we know
1449  // we can safely change the next pointer of the node; however, once the refcount is back above
1450  // zero, then other threads could increase it (happens under heavy contention, when the refcount
1451  // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1452  // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1453  // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1454  // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1455  auto head = freeListHead.load(std::memory_order_relaxed);
1456  while (true) {
1457  node->freeListNext.store(head, std::memory_order_relaxed);
1458  node->freeListRefs.store(1, std::memory_order_release);
1459  if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release,
1460  std::memory_order_relaxed)) {
1461  // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1462  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) ==
1463  1) {
1464  continue;
1465  }
1466  }
1467  return;
1468  }
1469  }
1470 
1471  private:
1472  // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1473  std::atomic<N *> freeListHead;
1474 
1475  static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1476  static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1477 
1478 #if MCDBGQ_NOLOCKFREE_FREELIST
1479  debug::DebugMutex mutex;
1480 #endif
1481  };
1482 
1483 
1485  // Block
1487 
1488  enum InnerQueueContext {
1489  implicit_context = 0, explicit_context = 1
1490  };
1491 
1492  struct Block {
1493  Block()
1494  : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr)
1495  , shouldBeOnFreeList(false), dynamicallyAllocated(true) {
1496 #if MCDBGQ_TRACKMEM
1497  owner = nullptr;
1498 #endif
1499  }
1500 
1501  template<InnerQueueContext context>
1502  inline bool is_empty() const {
1503  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1504  // Check flags
1505  for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1506  if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1507  return false;
1508  }
1509  }
1510 
1511  // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1512  std::atomic_thread_fence(std::memory_order_acquire);
1513  return true;
1514  } else {
1515  // Check counter
1516  if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1517  std::atomic_thread_fence(std::memory_order_acquire);
1518  return true;
1519  }
1520  assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1521  return false;
1522  }
1523  }
1524 
1525  // Returns true if the block is now empty (does not apply in explicit context)
1526  template<InnerQueueContext context>
1527  inline bool set_empty(index_t i) {
1528  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1529  // Set flag
1530  assert(!emptyFlags[BLOCK_SIZE - 1 -
1531  static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(
1532  std::memory_order_relaxed));
1533  emptyFlags[BLOCK_SIZE - 1 -
1534  static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true,
1535  std::memory_order_release);
1536  return false;
1537  } else {
1538  // Increment counter
1539  auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1540  assert(prevVal < BLOCK_SIZE);
1541  return prevVal == BLOCK_SIZE - 1;
1542  }
1543  }
1544 
1545  // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1546  // Returns true if the block is now empty (does not apply in explicit context).
1547  template<InnerQueueContext context>
1548  inline bool set_many_empty(index_t i, size_t count) {
1549  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1550  // Set flags
1551  std::atomic_thread_fence(std::memory_order_release);
1552  i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count +
1553  1;
1554  for (size_t j = 0; j != count; ++j) {
1555  assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1556  emptyFlags[i + j].store(true, std::memory_order_relaxed);
1557  }
1558  return false;
1559  } else {
1560  // Increment counter
1561  auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1562  assert(prevVal + count <= BLOCK_SIZE);
1563  return prevVal + count == BLOCK_SIZE;
1564  }
1565  }
1566 
1567  template<InnerQueueContext context>
1568  inline void set_all_empty() {
1569  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1570  // Set all flags
1571  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1572  emptyFlags[i].store(true, std::memory_order_relaxed);
1573  }
1574  } else {
1575  // Reset counter
1576  elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1577  }
1578  }
1579 
1580  template<InnerQueueContext context>
1581  inline void reset_empty() {
1582  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1583  // Reset flags
1584  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1585  emptyFlags[i].store(false, std::memory_order_relaxed);
1586  }
1587  } else {
1588  // Reset counter
1589  elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1590  }
1591  }
1592 
1593  inline T *operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
1594  return static_cast<T *>(static_cast<void *>(elements)) +
1595  static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1596  }
1597 
1598  inline T const *operator[](index_t idx) const MOODYCAMEL_NOEXCEPT {
1599  return static_cast<T const *>(static_cast<void const *>(elements)) +
1600  static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1601  }
1602 
1603  private:
1604  // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
1605  // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
1606  // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
1607  // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
1608  // alignment, but this is hard to do in a cross-platform way. Assert for this case:
1609  static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value,
1610  "The queue does not support super-aligned types at this time");
1611  // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
1612  // otherwise the appropriate padding will not be added at the end of Block in order to make
1613  // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
1614  // this.
1615  union {
1616  char elements[sizeof(T) * BLOCK_SIZE];
1617  details::max_align_t dummy;
1618  };
1619  public:
1620  Block *next;
1621  std::atomic<size_t> elementsCompletelyDequeued;
1622  std::atomic<bool> emptyFlags[
1623  BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1624  public:
1625  std::atomic<std::uint32_t> freeListRefs;
1626  std::atomic<Block *> freeListNext;
1627  std::atomic<bool> shouldBeOnFreeList;
1628  bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1629 
1630 #if MCDBGQ_TRACKMEM
1631  void* owner;
1632 #endif
1633  };
1634 
1635  static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value,
1636  "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1637 
1638 
1639 #if MCDBGQ_TRACKMEM
1640  public:
1641  struct MemStats;
1642  private:
1643 #endif
1644 
1646  // Producer base
1648 
1649  struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase {
1650  ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
1651  :
1652  tailIndex(0), headIndex(0), dequeueOptimisticCount(0), dequeueOvercommit(0), tailBlock(
1653  nullptr), isExplicit(isExplicit_), parent(parent_) {
1654  }
1655 
1656  virtual ~ProducerBase() {};
1657 
1658  template<typename U>
1659  inline bool dequeue(U &element) {
1660  if (isExplicit) {
1661  return static_cast<ExplicitProducer *>(this)->dequeue(element);
1662  } else {
1663  return static_cast<ImplicitProducer *>(this)->dequeue(element);
1664  }
1665  }
1666 
1667  template<typename It>
1668  inline size_t dequeue_bulk(It &itemFirst, size_t max) {
1669  if (isExplicit) {
1670  return static_cast<ExplicitProducer *>(this)->dequeue_bulk(itemFirst, max);
1671  } else {
1672  return static_cast<ImplicitProducer *>(this)->dequeue_bulk(itemFirst, max);
1673  }
1674  }
1675 
1676  inline ProducerBase *next_prod() const { return static_cast<ProducerBase *>(next); }
1677 
1678  inline size_t size_approx() const {
1679  auto tail = tailIndex.load(std::memory_order_relaxed);
1680  auto head = headIndex.load(std::memory_order_relaxed);
1681  return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1682  }
1683 
1684  inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1685 
1686  protected:
1687  std::atomic<index_t> tailIndex; // Where to enqueue to next
1688  std::atomic<index_t> headIndex; // Where to dequeue from next
1689 
1690  std::atomic<index_t> dequeueOptimisticCount;
1691  std::atomic<index_t> dequeueOvercommit;
1692 
1693  Block *tailBlock;
1694 
1695  public:
1696  bool isExplicit;
1697  ConcurrentQueue *parent;
1698 
1699  protected:
1700 #if MCDBGQ_TRACKMEM
1701  friend struct MemStats;
1702 #endif
1703  };
1704 
1705 
1707  // Explicit queue
1709 
1710  struct ExplicitProducer : public ProducerBase {
1711  explicit ExplicitProducer(ConcurrentQueue *parent)
1712  :
1713  ProducerBase(parent, true), blockIndex(nullptr), pr_blockIndexSlotsUsed(0), pr_blockIndexSize(
1714  EXPLICIT_INITIAL_INDEX_SIZE >> 1), pr_blockIndexFront(0), pr_blockIndexEntries(nullptr)
1715  , pr_blockIndexRaw(nullptr) {
1716  size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1717  if (poolBasedIndexSize > pr_blockIndexSize) {
1718  pr_blockIndexSize = poolBasedIndexSize;
1719  }
1720 
1721  new_block_index(
1722  0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1723  }
1724 
1725  ~ExplicitProducer() {
1726  // Destruct any elements not yet dequeued.
1727  // Since we're in the destructor, we can assume all elements
1728  // are either completely dequeued or completely not (no halfways).
1729  if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1730  // First find the block that's partially dequeued, if any
1731  Block *halfDequeuedBlock = nullptr;
1732  if ((this->headIndex.load(std::memory_order_relaxed) &
1733  static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1734  // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1735  // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1736  size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1737  while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1738  this->headIndex.load(
1739  std::memory_order_relaxed))) {
1740  i = (i + 1) & (pr_blockIndexSize - 1);
1741  }
1742  assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1743  this->headIndex.load(
1744  std::memory_order_relaxed)));
1745  halfDequeuedBlock = pr_blockIndexEntries[i].block;
1746  }
1747 
1748  // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1749  auto block = this->tailBlock;
1750  do {
1751  block = block->next;
1752  if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1753  continue;
1754  }
1755 
1756  size_t i = 0; // Offset into block
1757  if (block == halfDequeuedBlock) {
1758  i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
1759  static_cast<index_t>(BLOCK_SIZE - 1));
1760  }
1761 
1762  // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1763  auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) &
1764  static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE
1765  : static_cast<size_t>(
1766  this->tailIndex.load(std::memory_order_relaxed) &
1767  static_cast<index_t>(BLOCK_SIZE - 1));
1768  while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1769  (*block)[i++]->~T();
1770  }
1771  } while (block != this->tailBlock);
1772  }
1773 
1774  // Destroy all blocks that we own
1775  if (this->tailBlock != nullptr) {
1776  auto block = this->tailBlock;
1777  do {
1778  auto nextBlock = block->next;
1779  if (block->dynamicallyAllocated) {
1780  destroy(block);
1781  } else {
1782  this->parent->add_block_to_free_list(block);
1783  }
1784  block = nextBlock;
1785  } while (block != this->tailBlock);
1786  }
1787 
1788  // Destroy the block indices
1789  auto header = static_cast<BlockIndexHeader *>(pr_blockIndexRaw);
1790  while (header != nullptr) {
1791  auto prev = static_cast<BlockIndexHeader *>(header->prev);
1792  header->~BlockIndexHeader();
1793  (Traits::free)(header);
1794  header = prev;
1795  }
1796  }
1797 
1798  template<AllocationMode allocMode, typename U>
1799  inline bool enqueue(U &&element) {
1800  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1801  index_t newTailIndex = 1 + currentTailIndex;
1802  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1803  // We reached the end of a block, start a new one
1804  auto startBlock = this->tailBlock;
1805  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1806  if (this->tailBlock != nullptr &&
1807  this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1808  // We can re-use the block ahead of us, it's empty!
1809  this->tailBlock = this->tailBlock->next;
1810  this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1811 
1812  // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1813  // last block from it first -- except instead of removing then adding, we can just overwrite).
1814  // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1815  // it would have been re-attempted when adding the first block to the queue; since there is such
1816  // a block, a block index must have been successfully allocated.
1817  } else {
1818  // Whatever head value we see here is >= the last value we saw here (relatively),
1819  // and <= its current value. Since we have the most recent tail, the head must be
1820  // <= to it.
1821  auto head = this->headIndex.load(std::memory_order_relaxed);
1822  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1823  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1824  || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
1825  (MAX_SUBQUEUE_SIZE == 0 ||
1826  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1827  // We can't enqueue in another block because there's not enough leeway -- the
1828  // tail could surpass the head by the time the block fills up! (Or we'll exceed
1829  // the size limit, if the second part of the condition was true.)
1830  return false;
1831  }
1832  // We're going to need a new block; check that the block index has room
1833  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1834  // Hmm, the circular block index is already full -- we'll need
1835  // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1836  // the initial allocation failed in the constructor.
1837 
1838  if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1839  return false;
1840  }
1841  }
1842 
1843  // Insert a new block in the circular linked list
1844  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1845  if (newBlock == nullptr) {
1846  return false;
1847  }
1848 #if MCDBGQ_TRACKMEM
1849  newBlock->owner = this;
1850 #endif
1851  newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1852  if (this->tailBlock == nullptr) {
1853  newBlock->next = newBlock;
1854  } else {
1855  newBlock->next = this->tailBlock->next;
1856  this->tailBlock->next = newBlock;
1857  }
1858  this->tailBlock = newBlock;
1859  ++pr_blockIndexSlotsUsed;
1860  }
1861 
1862  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
1863  // The constructor may throw. We want the element not to appear in the queue in
1864  // that case (without corrupting the queue):
1865  MOODYCAMEL_TRY {
1866  new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1867  }
1868  MOODYCAMEL_CATCH (...) {
1869  // Revert change to the current block, but leave the new block available
1870  // for next time
1871  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1872  this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1873  MOODYCAMEL_RETHROW;
1874  }
1875  } else {
1876  (void) startBlock;
1877  (void) originalBlockIndexSlotsUsed;
1878  }
1879 
1880  // Add block to block index
1881  auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1882  entry.base = currentTailIndex;
1883  entry.block = this->tailBlock;
1884  blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront,
1885  std::memory_order_release);
1886  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1887 
1888  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
1889  this->tailIndex.store(newTailIndex, std::memory_order_release);
1890  return true;
1891  }
1892  }
1893 
1894  // Enqueue
1895  new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1896 
1897  this->tailIndex.store(newTailIndex, std::memory_order_release);
1898  return true;
1899  }
1900 
1901  template<typename U>
1902  bool dequeue(U &element) {
1903  auto tail = this->tailIndex.load(std::memory_order_relaxed);
1904  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1905  if (details::circular_less_than<index_t>(
1906  this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1907  // Might be something to dequeue, let's give it a try
1908 
1909  // Note that this if is purely for performance purposes in the common case when the queue is
1910  // empty and the values are eventually consistent -- we may enter here spuriously.
1911 
1912  // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1913  // change them) and must be the same value at this point (inside the if) as when the if condition was
1914  // evaluated.
1915 
1916  // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1917  // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1918  // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1919  // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1920  // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1921  // unfortunately that can't be shown to be correct using only the C++11 standard.
1922  // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1923  std::atomic_thread_fence(std::memory_order_acquire);
1924 
1925  // Increment optimistic counter, then check if it went over the boundary
1926  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1927 
1928  // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1929  // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1930  // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1931  // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1932  assert(overcommit <= myDequeueCount);
1933 
1934  // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1935  // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1936  // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1937  tail = this->tailIndex.load(std::memory_order_acquire);
1938  if (details::likely(
1939  details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1940  // Guaranteed to be at least one element to dequeue!
1941 
1942  // Get the index. Note that since there's guaranteed to be at least one element, this
1943  // will never exceed tail. We need to do an acquire-release fence here since it's possible
1944  // that whatever condition got us to this point was for an earlier enqueued element (that
1945  // we already see the memory effects for), but that by the time we increment somebody else
1946  // has incremented it, and we need to see the memory effects for *that* element, which is
1947  // in such a case is necessarily visible on the thread that incremented it in the first
1948  // place with the more current condition (they must have acquired a tail that is at least
1949  // as recent).
1950  auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1951 
1952 
1953  // Determine which block the element is in
1954 
1955  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1956  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1957 
1958  // We need to be careful here about subtracting and dividing because of index wrap-around.
1959  // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1960  // block size (in order to get a correct signed block count offset in all cases):
1961  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1962  auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1963  auto offset = static_cast<size_t>(
1964  static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) /
1965  BLOCK_SIZE);
1966  auto block = localBlockIndex->entries[(localBlockIndexHead + offset) &
1967  (localBlockIndex->size - 1)].block;
1968 
1969  // Dequeue
1970  auto &el = *((*block)[index]);
1971  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
1972  // Make sure the element is still fully dequeued and destroyed even if the assignment
1973  // throws
1974  struct Guard {
1975  Block *block;
1976  index_t index;
1977 
1978  ~Guard() {
1979  (*block)[index]->~T();
1980  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1981  }
1982  } guard = {block, index};
1983 
1984  element = std::move(el);
1985  } else {
1986  element = std::move(el);
1987  el.~T();
1988  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1989  }
1990 
1991  return true;
1992  } else {
1993  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1994  this->dequeueOvercommit.fetch_add(1,
1995  std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
1996  }
1997  }
1998 
1999  return false;
2000  }
2001 
2002  template<AllocationMode allocMode, typename It>
2003  bool enqueue_bulk(It itemFirst, size_t count) {
2004  // First, we need to make sure we have enough room to enqueue all of the elements;
2005  // this means pre-allocating blocks and putting them in the block index (but only if
2006  // all the allocations succeeded).
2007  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2008  auto startBlock = this->tailBlock;
2009  auto originalBlockIndexFront = pr_blockIndexFront;
2010  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2011 
2012  Block *firstAllocatedBlock = nullptr;
2013 
2014  // Figure out how many blocks we'll need to allocate, and do so
2015  size_t blockBaseDiff =
2016  ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2017  ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2018  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2019  if (blockBaseDiff > 0) {
2020  // Allocate as many blocks as possible from ahead
2021  while (blockBaseDiff > 0 && this->tailBlock != nullptr &&
2022  this->tailBlock->next != firstAllocatedBlock &&
2023  this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2024  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2025  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2026 
2027  this->tailBlock = this->tailBlock->next;
2028  firstAllocatedBlock =
2029  firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2030 
2031  auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2032  entry.base = currentTailIndex;
2033  entry.block = this->tailBlock;
2034  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2035  }
2036 
2037  // Now allocate as many blocks as necessary from the block pool
2038  while (blockBaseDiff > 0) {
2039  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2040  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2041 
2042  auto head = this->headIndex.load(std::memory_order_relaxed);
2043  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2044  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2045  (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2046  (MAX_SUBQUEUE_SIZE == 0 ||
2047  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2048  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2049  if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2050  // Failed to allocate, undo changes (but keep injected blocks)
2051  pr_blockIndexFront = originalBlockIndexFront;
2052  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2053  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2054  return false;
2055  }
2056 
2057  // pr_blockIndexFront is updated inside new_block_index, so we need to
2058  // update our fallback value too (since we keep the new index even if we
2059  // later fail)
2060  originalBlockIndexFront = originalBlockIndexSlotsUsed;
2061  }
2062 
2063  // Insert a new block in the circular linked list
2064  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2065  if (newBlock == nullptr) {
2066  pr_blockIndexFront = originalBlockIndexFront;
2067  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2068  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2069  return false;
2070  }
2071 
2072 #if MCDBGQ_TRACKMEM
2073  newBlock->owner = this;
2074 #endif
2075  newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2076  if (this->tailBlock == nullptr) {
2077  newBlock->next = newBlock;
2078  } else {
2079  newBlock->next = this->tailBlock->next;
2080  this->tailBlock->next = newBlock;
2081  }
2082  this->tailBlock = newBlock;
2083  firstAllocatedBlock =
2084  firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2085 
2086  ++pr_blockIndexSlotsUsed;
2087 
2088  auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2089  entry.base = currentTailIndex;
2090  entry.block = this->tailBlock;
2091  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2092  }
2093 
2094  // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2095  // publish the new block index front
2096  auto block = firstAllocatedBlock;
2097  while (true) {
2098  block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2099  if (block == this->tailBlock) {
2100  break;
2101  }
2102  block = block->next;
2103  }
2104 
2105  if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2106  new(nullptr) T(details::deref_noexcept(itemFirst)))) {
2107  blockIndex.load(std::memory_order_relaxed)->front.store(
2108  (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2109  }
2110  }
2111 
2112  // Enqueue, one block at a time
2113  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2114  currentTailIndex = startTailIndex;
2115  auto endBlock = this->tailBlock;
2116  this->tailBlock = startBlock;
2117  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2118  firstAllocatedBlock != nullptr || count == 0);
2119  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2120  firstAllocatedBlock != nullptr) {
2121  this->tailBlock = firstAllocatedBlock;
2122  }
2123  while (true) {
2124  auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2125  static_cast<index_t>(BLOCK_SIZE);
2126  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2127  stopIndex = newTailIndex;
2128  }
2129  if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2130  new(nullptr) T(details::deref_noexcept(itemFirst)))) {
2131  while (currentTailIndex != stopIndex) {
2132  new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2133  }
2134  } else {
2135  MOODYCAMEL_TRY {
2136  while (currentTailIndex != stopIndex) {
2137  // Must use copy constructor even if move constructor is available
2138  // because we may have to revert if there's an exception.
2139  // Sorry about the horrible templated next line, but it was the only way
2140  // to disable moving *at compile time*, which is important because a type
2141  // may only define a (noexcept) move constructor, and so calls to the
2142  // cctor will not compile, even if they are in an if branch that will never
2143  // be executed
2144  new((*this->tailBlock)[currentTailIndex]) T(
2145  details::nomove_if<(bool) !MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2146  new(nullptr) T(
2147  details::deref_noexcept(
2148  itemFirst)))>::eval(
2149  *itemFirst));
2150  ++currentTailIndex;
2151  ++itemFirst;
2152  }
2153  }
2154  MOODYCAMEL_CATCH (...) {
2155  // Oh dear, an exception's been thrown -- destroy the elements that
2156  // were enqueued so far and revert the entire bulk operation (we'll keep
2157  // any allocated blocks in our linked list for later, though).
2158  auto constructedStopIndex = currentTailIndex;
2159  auto lastBlockEnqueued = this->tailBlock;
2160 
2161  pr_blockIndexFront = originalBlockIndexFront;
2162  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2163  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2164 
2165  if (!details::is_trivially_destructible<T>::value) {
2166  auto block = startBlock;
2167  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2168  block = firstAllocatedBlock;
2169  }
2170  currentTailIndex = startTailIndex;
2171  while (true) {
2172  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2173  static_cast<index_t>(BLOCK_SIZE);
2174  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2175  stopIndex = constructedStopIndex;
2176  }
2177  while (currentTailIndex != stopIndex) {
2178  (*block)[currentTailIndex++]->~T();
2179  }
2180  if (block == lastBlockEnqueued) {
2181  break;
2182  }
2183  block = block->next;
2184  }
2185  }
2186  MOODYCAMEL_RETHROW;
2187  }
2188  }
2189 
2190  if (this->tailBlock == endBlock) {
2191  assert(currentTailIndex == newTailIndex);
2192  break;
2193  }
2194  this->tailBlock = this->tailBlock->next;
2195  }
2196 
2197  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2198  new(nullptr) T(details::deref_noexcept(itemFirst))) &&
2199  firstAllocatedBlock != nullptr) {
2200  blockIndex.load(std::memory_order_relaxed)->front.store(
2201  (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2202  }
2203 
2204  this->tailIndex.store(newTailIndex, std::memory_order_release);
2205  return true;
2206  }
2207 
2208  template<typename It>
2209  size_t dequeue_bulk(It &itemFirst, size_t max) {
2210  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2211  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2212  auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(
2213  std::memory_order_relaxed) - overcommit));
2214  if (details::circular_less_than<size_t>(0, desiredCount)) {
2215  desiredCount = desiredCount < max ? desiredCount : max;
2216  std::atomic_thread_fence(std::memory_order_acquire);
2217 
2218  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount,
2219  std::memory_order_relaxed);
2220  assert(overcommit <= myDequeueCount);
2221 
2222  tail = this->tailIndex.load(std::memory_order_acquire);
2223  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2224  if (details::circular_less_than<size_t>(0, actualCount)) {
2225  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2226  if (actualCount < desiredCount) {
2227  this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2228  std::memory_order_release);
2229  }
2230 
2231  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2232  // will never exceed tail.
2233  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2234 
2235  // Determine which block the first element is in
2236  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2237  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2238 
2239  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2240  auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2241  auto offset = static_cast<size_t>(
2242  static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) /
2243  BLOCK_SIZE);
2244  auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2245 
2246  // Iterate the blocks and dequeue
2247  auto index = firstIndex;
2248  do {
2249  auto firstIndexInBlock = index;
2250  auto endIndex =
2251  (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2252  endIndex = details::circular_less_than<index_t>(
2253  firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2254  static_cast<index_t>(actualCount)
2255  : endIndex;
2256  auto block = localBlockIndex->entries[indexIndex].block;
2257  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, details::deref_noexcept(itemFirst) = std::move(
2258  (*(*block)[index])))) {
2259  while (index != endIndex) {
2260  auto &el = *((*block)[index]);
2261  *itemFirst++ = std::move(el);
2262  el.~T();
2263  ++index;
2264  }
2265  } else {
2266  MOODYCAMEL_TRY {
2267  while (index != endIndex) {
2268  auto &el = *((*block)[index]);
2269  *itemFirst = std::move(el);
2270  ++itemFirst;
2271  el.~T();
2272  ++index;
2273  }
2274  }
2275  MOODYCAMEL_CATCH (...) {
2276  // It's too late to revert the dequeue, but we can make sure that all
2277  // the dequeued objects are properly destroyed and the block index
2278  // (and empty count) are properly updated before we propagate the exception
2279  do {
2280  block = localBlockIndex->entries[indexIndex].block;
2281  while (index != endIndex) {
2282  (*block)[index++]->~T();
2283  }
2284  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2285  firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2286  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2287 
2288  firstIndexInBlock = index;
2289  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2290  static_cast<index_t>(BLOCK_SIZE);
2291  endIndex = details::circular_less_than<index_t>(
2292  firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2293  static_cast<index_t>(actualCount)
2294  : endIndex;
2295  } while (index != firstIndex + actualCount);
2296 
2297  MOODYCAMEL_RETHROW;
2298  }
2299  }
2300  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2301  firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2302  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2303  } while (index != firstIndex + actualCount);
2304 
2305  return actualCount;
2306  } else {
2307  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2308  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2309  }
2310  }
2311 
2312  return 0;
2313  }
2314 
2315  private:
2316  struct BlockIndexEntry {
2317  index_t base;
2318  Block *block;
2319  };
2320 
2321  struct BlockIndexHeader {
2322  size_t size;
2323  std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2324  BlockIndexEntry *entries;
2325  void *prev;
2326  };
2327 
2328 
2329  bool new_block_index(size_t numberOfFilledSlotsToExpose) {
2330  auto prevBlockSizeMask = pr_blockIndexSize - 1;
2331 
2332  // Create the new block
2333  pr_blockIndexSize <<= 1;
2334  auto newRawPtr = static_cast<char *>((Traits::malloc)(
2335  sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
2336  sizeof(BlockIndexEntry) * pr_blockIndexSize));
2337  if (newRawPtr == nullptr) {
2338  pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2339  return false;
2340  }
2341 
2342  auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry *>(details::align_for<BlockIndexEntry>(
2343  newRawPtr + sizeof(BlockIndexHeader)));
2344 
2345  // Copy in all the old indices, if any
2346  size_t j = 0;
2347  if (pr_blockIndexSlotsUsed != 0) {
2348  auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2349  do {
2350  newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2351  i = (i + 1) & prevBlockSizeMask;
2352  } while (i != pr_blockIndexFront);
2353  }
2354 
2355  // Update everything
2356  auto header = new(newRawPtr) BlockIndexHeader;
2357  header->size = pr_blockIndexSize;
2358  header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2359  header->entries = newBlockIndexEntries;
2360  header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2361 
2362  pr_blockIndexFront = j;
2363  pr_blockIndexEntries = newBlockIndexEntries;
2364  pr_blockIndexRaw = newRawPtr;
2365  blockIndex.store(header, std::memory_order_release);
2366 
2367  return true;
2368  }
2369 
2370  private:
2371  std::atomic<BlockIndexHeader *> blockIndex;
2372 
2373  // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2374  size_t pr_blockIndexSlotsUsed;
2375  size_t pr_blockIndexSize;
2376  size_t pr_blockIndexFront; // Next slot (not current)
2377  BlockIndexEntry *pr_blockIndexEntries;
2378  void *pr_blockIndexRaw;
2379 
2380 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2381  public:
2382  ExplicitProducer* nextExplicitProducer;
2383  private:
2384 #endif
2385 
2386 #if MCDBGQ_TRACKMEM
2387  friend struct MemStats;
2388 #endif
2389  };
2390 
2391 
2393  // Implicit queue
2395 
2396  struct ImplicitProducer : public ProducerBase {
2397  ImplicitProducer(ConcurrentQueue *parent)
2398  :
2399  ProducerBase(parent, false), nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE), blockIndex(
2400  nullptr) {
2401  new_block_index();
2402  }
2403 
2404  ~ImplicitProducer() {
2405  // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2406  // completed already; this means that all undequeued elements are placed contiguously across
2407  // contiguous blocks, and that only the first and last remaining blocks can be only partially
2408  // empty (all other remaining blocks must be completely full).
2409 
2410 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2411  // Unregister ourselves for thread termination notification
2412  if (!this->inactive.load(std::memory_order_relaxed)) {
2413  details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2414  }
2415 #endif
2416 
2417  // Destroy all remaining elements!
2418  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2419  auto index = this->headIndex.load(std::memory_order_relaxed);
2420  Block *block = nullptr;
2421  assert(index == tail || details::circular_less_than(index, tail));
2422  bool forceFreeLastBlock =
2423  index != tail; // If we enter the loop, then the last (tail) block will not be freed
2424  while (index != tail) {
2425  if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2426  if (block != nullptr) {
2427  // Free the old block
2428  this->parent->add_block_to_free_list(block);
2429  }
2430 
2431  block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2432  }
2433 
2434  ((*block)[index])->~T();
2435  ++index;
2436  }
2437  // Even if the queue is empty, there's still one block that's not on the free list
2438  // (unless the head index reached the end of it, in which case the tail will be poised
2439  // to create a new block).
2440  if (this->tailBlock != nullptr &&
2441  (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2442  this->parent->add_block_to_free_list(this->tailBlock);
2443  }
2444 
2445  // Destroy block index
2446  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2447  if (localBlockIndex != nullptr) {
2448  for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2449  localBlockIndex->index[i]->~BlockIndexEntry();
2450  }
2451  do {
2452  auto prev = localBlockIndex->prev;
2453  localBlockIndex->~BlockIndexHeader();
2454  (Traits::free)(localBlockIndex);
2455  localBlockIndex = prev;
2456  } while (localBlockIndex != nullptr);
2457  }
2458  }
2459 
2460  template<AllocationMode allocMode, typename U>
2461  inline bool enqueue(U &&element) {
2462  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2463  index_t newTailIndex = 1 + currentTailIndex;
2464  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2465  // We reached the end of a block, start a new one
2466  auto head = this->headIndex.load(std::memory_order_relaxed);
2467  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2468  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2469  (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2470  (MAX_SUBQUEUE_SIZE == 0 ||
2471  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2472  return false;
2473  }
2474 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2475  debug::DebugLock lock(mutex);
2476 #endif
2477  // Find out where we'll be inserting this block in the block index
2478  BlockIndexEntry *idxEntry;
2479  if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2480  return false;
2481  }
2482 
2483  // Get ahold of a new block
2484  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2485  if (newBlock == nullptr) {
2486  rewind_block_index_tail();
2487  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2488  return false;
2489  }
2490 #if MCDBGQ_TRACKMEM
2491  newBlock->owner = this;
2492 #endif
2493  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2494 
2495  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
2496  // May throw, try to insert now before we publish the fact that we have this new block
2497  MOODYCAMEL_TRY {
2498  new((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2499  }
2500  MOODYCAMEL_CATCH (...) {
2501  rewind_block_index_tail();
2502  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2503  this->parent->add_block_to_free_list(newBlock);
2504  MOODYCAMEL_RETHROW;
2505  }
2506  }
2507 
2508  // Insert the new block into the index
2509  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2510 
2511  this->tailBlock = newBlock;
2512 
2513  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
2514  this->tailIndex.store(newTailIndex, std::memory_order_release);
2515  return true;
2516  }
2517  }
2518 
2519  // Enqueue
2520  new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2521 
2522  this->tailIndex.store(newTailIndex, std::memory_order_release);
2523  return true;
2524  }
2525 
2526  template<typename U>
2527  bool dequeue(U &element) {
2528  // See ExplicitProducer::dequeue for rationale and explanation
2529  index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2530  index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2531  if (details::circular_less_than<index_t>(
2532  this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2533  std::atomic_thread_fence(std::memory_order_acquire);
2534 
2535  index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1,
2536  std::memory_order_relaxed);
2537  assert(overcommit <= myDequeueCount);
2538  tail = this->tailIndex.load(std::memory_order_acquire);
2539  if (details::likely(
2540  details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2541  index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2542 
2543  // Determine which block the element is in
2544  auto entry = get_block_index_entry_for_index(index);
2545 
2546  // Dequeue
2547  auto block = entry->value.load(std::memory_order_relaxed);
2548  auto &el = *((*block)[index]);
2549 
2550  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2551 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2552  // Note: Acquiring the mutex with every dequeue instead of only when a block
2553  // is released is very sub-optimal, but it is, after all, purely debug code.
2554  debug::DebugLock lock(producer->mutex);
2555 #endif
2556  struct Guard {
2557  Block *block;
2558  index_t index;
2559  BlockIndexEntry *entry;
2560  ConcurrentQueue *parent;
2561 
2562  ~Guard() {
2563  (*block)[index]->~T();
2564  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2565  entry->value.store(nullptr, std::memory_order_relaxed);
2566  parent->add_block_to_free_list(block);
2567  }
2568  }
2569  } guard = {block, index, entry, this->parent};
2570 
2571  element = std::move(el);
2572  } else {
2573  element = std::move(el);
2574  el.~T();
2575 
2576  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2577  {
2578 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2579  debug::DebugLock lock(mutex);
2580 #endif
2581  // Add the block back into the global free pool (and remove from block index)
2582  entry->value.store(nullptr, std::memory_order_relaxed);
2583  }
2584  this->parent->add_block_to_free_list(block); // releases the above store
2585  }
2586  }
2587 
2588  return true;
2589  } else {
2590  this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2591  }
2592  }
2593 
2594  return false;
2595  }
2596 
2597  template<AllocationMode allocMode, typename It>
2598  bool enqueue_bulk(It itemFirst, size_t count) {
2599  // First, we need to make sure we have enough room to enqueue all of the elements;
2600  // this means pre-allocating blocks and putting them in the block index (but only if
2601  // all the allocations succeeded).
2602 
2603  // Note that the tailBlock we start off with may not be owned by us any more;
2604  // this happens if it was filled up exactly to the top (setting tailIndex to
2605  // the first index of the next block which is not yet allocated), then dequeued
2606  // completely (putting it on the free list) before we enqueue again.
2607 
2608  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2609  auto startBlock = this->tailBlock;
2610  Block *firstAllocatedBlock = nullptr;
2611  auto endBlock = this->tailBlock;
2612 
2613  // Figure out how many blocks we'll need to allocate, and do so
2614  size_t blockBaseDiff =
2615  ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2616  ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2617  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2618  if (blockBaseDiff > 0) {
2619 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2620  debug::DebugLock lock(mutex);
2621 #endif
2622  do {
2623  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2624  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2625 
2626  // Find out where we'll be inserting this block in the block index
2627  BlockIndexEntry *idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2628  Block *newBlock;
2629  bool indexInserted = false;
2630  auto head = this->headIndex.load(std::memory_order_relaxed);
2631  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2632  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2633  (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2634  (MAX_SUBQUEUE_SIZE == 0 ||
2635  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2636  if (full ||
2637  !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) ||
2638  (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
2639  nullptr) {
2640  // Index allocation or block allocation failed; revert any other allocations
2641  // and index insertions done so far for this operation
2642  if (indexInserted) {
2643  rewind_block_index_tail();
2644  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2645  }
2646  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2647  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2648  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2649  idxEntry = get_block_index_entry_for_index(currentTailIndex);
2650  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2651  rewind_block_index_tail();
2652  }
2653  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2654  this->tailBlock = startBlock;
2655 
2656  return false;
2657  }
2658 
2659 #if MCDBGQ_TRACKMEM
2660  newBlock->owner = this;
2661 #endif
2662  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2663  newBlock->next = nullptr;
2664 
2665  // Insert the new block into the index
2666  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2667 
2668  // Store the chain of blocks so that we can undo if later allocations fail,
2669  // and so that we can find the blocks when we do the actual enqueueing
2670  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2671  firstAllocatedBlock != nullptr) {
2672  assert(this->tailBlock != nullptr);
2673  this->tailBlock->next = newBlock;
2674  }
2675  this->tailBlock = newBlock;
2676  endBlock = newBlock;
2677  firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2678  } while (blockBaseDiff > 0);
2679  }
2680 
2681  // Enqueue, one block at a time
2682  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2683  currentTailIndex = startTailIndex;
2684  this->tailBlock = startBlock;
2685  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2686  firstAllocatedBlock != nullptr || count == 0);
2687  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2688  firstAllocatedBlock != nullptr) {
2689  this->tailBlock = firstAllocatedBlock;
2690  }
2691  while (true) {
2692  auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2693  static_cast<index_t>(BLOCK_SIZE);
2694  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2695  stopIndex = newTailIndex;
2696  }
2697  if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2698  new(nullptr) T(details::deref_noexcept(itemFirst)))) {
2699  while (currentTailIndex != stopIndex) {
2700  new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2701  }
2702  } else {
2703  MOODYCAMEL_TRY {
2704  while (currentTailIndex != stopIndex) {
2705  new((*this->tailBlock)[currentTailIndex]) T(
2706  details::nomove_if<(bool) !MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2707  new(nullptr) T(
2708  details::deref_noexcept(
2709  itemFirst)))>::eval(
2710  *itemFirst));
2711  ++currentTailIndex;
2712  ++itemFirst;
2713  }
2714  }
2715  MOODYCAMEL_CATCH (...) {
2716  auto constructedStopIndex = currentTailIndex;
2717  auto lastBlockEnqueued = this->tailBlock;
2718 
2719  if (!details::is_trivially_destructible<T>::value) {
2720  auto block = startBlock;
2721  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2722  block = firstAllocatedBlock;
2723  }
2724  currentTailIndex = startTailIndex;
2725  while (true) {
2726  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2727  static_cast<index_t>(BLOCK_SIZE);
2728  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2729  stopIndex = constructedStopIndex;
2730  }
2731  while (currentTailIndex != stopIndex) {
2732  (*block)[currentTailIndex++]->~T();
2733  }
2734  if (block == lastBlockEnqueued) {
2735  break;
2736  }
2737  block = block->next;
2738  }
2739  }
2740 
2741  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2742  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2743  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2744  auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2745  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2746  rewind_block_index_tail();
2747  }
2748  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2749  this->tailBlock = startBlock;
2750  MOODYCAMEL_RETHROW;
2751  }
2752  }
2753 
2754  if (this->tailBlock == endBlock) {
2755  assert(currentTailIndex == newTailIndex);
2756  break;
2757  }
2758  this->tailBlock = this->tailBlock->next;
2759  }
2760  this->tailIndex.store(newTailIndex, std::memory_order_release);
2761  return true;
2762  }
2763 
2764  template<typename It>
2765  size_t dequeue_bulk(It &itemFirst, size_t max) {
2766  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2767  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2768  auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(
2769  std::memory_order_relaxed) - overcommit));
2770  if (details::circular_less_than<size_t>(0, desiredCount)) {
2771  desiredCount = desiredCount < max ? desiredCount : max;
2772  std::atomic_thread_fence(std::memory_order_acquire);
2773 
2774  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount,
2775  std::memory_order_relaxed);
2776  assert(overcommit <= myDequeueCount);
2777 
2778  tail = this->tailIndex.load(std::memory_order_acquire);
2779  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2780  if (details::circular_less_than<size_t>(0, actualCount)) {
2781  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2782  if (actualCount < desiredCount) {
2783  this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2784  std::memory_order_release);
2785  }
2786 
2787  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2788  // will never exceed tail.
2789  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2790 
2791  // Iterate the blocks and dequeue
2792  auto index = firstIndex;
2793  BlockIndexHeader *localBlockIndex;
2794  auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2795  do {
2796  auto blockStartIndex = index;
2797  auto endIndex =
2798  (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2799  endIndex = details::circular_less_than<index_t>(
2800  firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2801  static_cast<index_t>(actualCount)
2802  : endIndex;
2803 
2804  auto entry = localBlockIndex->index[indexIndex];
2805  auto block = entry->value.load(std::memory_order_relaxed);
2806  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, details::deref_noexcept(itemFirst) = std::move(
2807  (*(*block)[index])))) {
2808  while (index != endIndex) {
2809  auto &el = *((*block)[index]);
2810  *itemFirst++ = std::move(el);
2811  el.~T();
2812  ++index;
2813  }
2814  } else {
2815  MOODYCAMEL_TRY {
2816  while (index != endIndex) {
2817  auto &el = *((*block)[index]);
2818  *itemFirst = std::move(el);
2819  ++itemFirst;
2820  el.~T();
2821  ++index;
2822  }
2823  }
2824  MOODYCAMEL_CATCH (...) {
2825  do {
2826  entry = localBlockIndex->index[indexIndex];
2827  block = entry->value.load(std::memory_order_relaxed);
2828  while (index != endIndex) {
2829  (*block)[index++]->~T();
2830  }
2831 
2832  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
2833  blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2834 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2835  debug::DebugLock lock(mutex);
2836 #endif
2837  entry->value.store(nullptr, std::memory_order_relaxed);
2838  this->parent->add_block_to_free_list(block);
2839  }
2840  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2841 
2842  blockStartIndex = index;
2843  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2844  static_cast<index_t>(BLOCK_SIZE);
2845  endIndex = details::circular_less_than<index_t>(
2846  firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2847  static_cast<index_t>(actualCount)
2848  : endIndex;
2849  } while (index != firstIndex + actualCount);
2850 
2851  MOODYCAMEL_RETHROW;
2852  }
2853  }
2854  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
2855  blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2856  {
2857 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2858  debug::DebugLock lock(mutex);
2859 #endif
2860  // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2861  // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2862  entry->value.store(nullptr, std::memory_order_relaxed);
2863  }
2864  this->parent->add_block_to_free_list(block); // releases the above store
2865  }
2866  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2867  } while (index != firstIndex + actualCount);
2868 
2869  return actualCount;
2870  } else {
2871  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2872  }
2873  }
2874 
2875  return 0;
2876  }
2877 
2878  private:
2879  // The block size must be > 1, so any number with the low bit set is an invalid block base index
2880  static const index_t INVALID_BLOCK_BASE = 1;
2881 
2882  struct BlockIndexEntry {
2883  std::atomic<index_t> key;
2884  std::atomic<Block *> value;
2885  };
2886 
2887  struct BlockIndexHeader {
2888  size_t capacity;
2889  std::atomic<size_t> tail;
2890  BlockIndexEntry *entries;
2891  BlockIndexEntry **index;
2892  BlockIndexHeader *prev;
2893  };
2894 
2895  template<AllocationMode allocMode>
2896  inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex) {
2897  auto localBlockIndex = blockIndex.load(
2898  std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2899  if (localBlockIndex == nullptr) {
2900  return false; // this can happen if new_block_index failed in the constructor
2901  }
2902  auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
2903  (localBlockIndex->capacity - 1);
2904  idxEntry = localBlockIndex->index[newTail];
2905  if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2906  idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2907 
2908  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2909  localBlockIndex->tail.store(newTail, std::memory_order_release);
2910  return true;
2911  }
2912 
2913  // No room in the old block index, try to allocate another one!
2914  if (allocMode == CannotAlloc || !new_block_index()) {
2915  return false;
2916  }
2917  localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2918  newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
2919  (localBlockIndex->capacity - 1);
2920  idxEntry = localBlockIndex->index[newTail];
2921  assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2922  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2923  localBlockIndex->tail.store(newTail, std::memory_order_release);
2924  return true;
2925  }
2926 
2927  inline void rewind_block_index_tail() {
2928  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2929  localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) &
2930  (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2931  }
2932 
2933  inline BlockIndexEntry *get_block_index_entry_for_index(index_t index) const {
2934  BlockIndexHeader *localBlockIndex;
2935  auto idx = get_block_index_index_for_index(index, localBlockIndex);
2936  return localBlockIndex->index[idx];
2937  }
2938 
2939  inline size_t
2940  get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const {
2941 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2942  debug::DebugLock lock(mutex);
2943 #endif
2944  index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2945  localBlockIndex = blockIndex.load(std::memory_order_acquire);
2946  auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2947  auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2948  assert(tailBase != INVALID_BLOCK_BASE);
2949  // Note: Must use division instead of shift because the index may wrap around, causing a negative
2950  // offset, whose negativity we want to preserve
2951  auto offset = static_cast<size_t>(
2952  static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2953  size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2954  assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index &&
2955  localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2956  return idx;
2957  }
2958 
2959  bool new_block_index() {
2960  auto prev = blockIndex.load(std::memory_order_relaxed);
2961  size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2962  auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2963  auto raw = static_cast<char *>((Traits::malloc)(
2964  sizeof(BlockIndexHeader) +
2965  std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2966  std::alignment_of<BlockIndexEntry *>::value - 1 +
2967  sizeof(BlockIndexEntry * ) * nextBlockIndexCapacity));
2968  if (raw == nullptr) {
2969  return false;
2970  }
2971 
2972  auto header = new(raw) BlockIndexHeader;
2973  auto entries = reinterpret_cast<BlockIndexEntry *>(details::align_for<BlockIndexEntry>(
2974  raw + sizeof(BlockIndexHeader)));
2975  auto index = reinterpret_cast<BlockIndexEntry **>(details::align_for<BlockIndexEntry *>(
2976  reinterpret_cast<char *>(entries) + sizeof(BlockIndexEntry) * entryCount));
2977  if (prev != nullptr) {
2978  auto prevTail = prev->tail.load(std::memory_order_relaxed);
2979  auto prevPos = prevTail;
2980  size_t i = 0;
2981  do {
2982  prevPos = (prevPos + 1) & (prev->capacity - 1);
2983  index[i++] = prev->index[prevPos];
2984  } while (prevPos != prevTail);
2985  assert(i == prevCapacity);
2986  }
2987  for (size_t i = 0; i != entryCount; ++i) {
2988  new(entries + i) BlockIndexEntry;
2989  entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2990  index[prevCapacity + i] = entries + i;
2991  }
2992  header->prev = prev;
2993  header->entries = entries;
2994  header->index = index;
2995  header->capacity = nextBlockIndexCapacity;
2996  header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1),
2997  std::memory_order_relaxed);
2998 
2999  blockIndex.store(header, std::memory_order_release);
3000 
3001  nextBlockIndexCapacity <<= 1;
3002 
3003  return true;
3004  }
3005 
3006  private:
3007  size_t nextBlockIndexCapacity;
3008  std::atomic<BlockIndexHeader *> blockIndex;
3009 
3010 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3011  public:
3012  details::ThreadExitListener threadExitListener;
3013  private:
3014 #endif
3015 
3016 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3017  public:
3018  ImplicitProducer* nextImplicitProducer;
3019  private:
3020 #endif
3021 
3022 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3023  mutable debug::DebugMutex mutex;
3024 #endif
3025 #if MCDBGQ_TRACKMEM
3026  friend struct MemStats;
3027 #endif
3028  };
3029 
3030 
3032  // Block pool manipulation
3034 
3035  void populate_initial_block_list(size_t blockCount) {
3036  initialBlockPoolSize = blockCount;
3037  if (initialBlockPoolSize == 0) {
3038  initialBlockPool = nullptr;
3039  return;
3040  }
3041 
3042  initialBlockPool = create_array<Block>(blockCount);
3043  if (initialBlockPool == nullptr) {
3044  initialBlockPoolSize = 0;
3045  }
3046  for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3047  initialBlockPool[i].dynamicallyAllocated = false;
3048  }
3049  }
3050 
3051  inline Block *try_get_block_from_initial_pool() {
3052  if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3053  return nullptr;
3054  }
3055 
3056  auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3057 
3058  return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3059  }
3060 
3061  inline void add_block_to_free_list(Block *block) {
3062 #if MCDBGQ_TRACKMEM
3063  block->owner = nullptr;
3064 #endif
3065  freeList.add(block);
3066  }
3067 
3068  inline void add_blocks_to_free_list(Block *block) {
3069  while (block != nullptr) {
3070  auto next = block->next;
3071  add_block_to_free_list(block);
3072  block = next;
3073  }
3074  }
3075 
3076  inline Block *try_get_block_from_free_list() {
3077  return freeList.try_get();
3078  }
3079 
3080  // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3081  template<AllocationMode canAlloc>
3082  Block *requisition_block() {
3083  auto block = try_get_block_from_initial_pool();
3084  if (block != nullptr) {
3085  return block;
3086  }
3087 
3088  block = try_get_block_from_free_list();
3089  if (block != nullptr) {
3090  return block;
3091  }
3092 
3093  if (canAlloc == CanAlloc) {
3094  return create<Block>();
3095  }
3096 
3097  return nullptr;
3098  }
3099 
3100 
3101 #if MCDBGQ_TRACKMEM
3102  public:
3103  struct MemStats {
3104  size_t allocatedBlocks;
3105  size_t usedBlocks;
3106  size_t freeBlocks;
3107  size_t ownedBlocksExplicit;
3108  size_t ownedBlocksImplicit;
3109  size_t implicitProducers;
3110  size_t explicitProducers;
3111  size_t elementsEnqueued;
3112  size_t blockClassBytes;
3113  size_t queueClassBytes;
3114  size_t implicitBlockIndexBytes;
3115  size_t explicitBlockIndexBytes;
3116 
3117  friend class ConcurrentQueue;
3118 
3119  private:
3120  static MemStats getFor(ConcurrentQueue* q)
3121  {
3122  MemStats stats = { 0 };
3123 
3124  stats.elementsEnqueued = q->size_approx();
3125 
3126  auto block = q->freeList.head_unsafe();
3127  while (block != nullptr) {
3128  ++stats.allocatedBlocks;
3129  ++stats.freeBlocks;
3130  block = block->freeListNext.load(std::memory_order_relaxed);
3131  }
3132 
3133  for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3134  bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3135  stats.implicitProducers += implicit ? 1 : 0;
3136  stats.explicitProducers += implicit ? 0 : 1;
3137 
3138  if (implicit) {
3139  auto prod = static_cast<ImplicitProducer*>(ptr);
3140  stats.queueClassBytes += sizeof(ImplicitProducer);
3141  auto head = prod->headIndex.load(std::memory_order_relaxed);
3142  auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3143  auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3144  if (hash != nullptr) {
3145  for (size_t i = 0; i != hash->capacity; ++i) {
3146  if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3147  ++stats.allocatedBlocks;
3148  ++stats.ownedBlocksImplicit;
3149  }
3150  }
3151  stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3152  for (; hash != nullptr; hash = hash->prev) {
3153  stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3154  }
3155  }
3156  for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3157  //auto block = prod->get_block_index_entry_for_index(head);
3158  ++stats.usedBlocks;
3159  }
3160  }
3161  else {
3162  auto prod = static_cast<ExplicitProducer*>(ptr);
3163  stats.queueClassBytes += sizeof(ExplicitProducer);
3164  auto tailBlock = prod->tailBlock;
3165  bool wasNonEmpty = false;
3166  if (tailBlock != nullptr) {
3167  auto block = tailBlock;
3168  do {
3169  ++stats.allocatedBlocks;
3170  if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3171  ++stats.usedBlocks;
3172  wasNonEmpty = wasNonEmpty || block != tailBlock;
3173  }
3174  ++stats.ownedBlocksExplicit;
3175  block = block->next;
3176  } while (block != tailBlock);
3177  }
3178  auto index = prod->blockIndex.load(std::memory_order_relaxed);
3179  while (index != nullptr) {
3180  stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3181  index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3182  }
3183  }
3184  }
3185 
3186  auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3187  stats.allocatedBlocks += freeOnInitialPool;
3188  stats.freeBlocks += freeOnInitialPool;
3189 
3190  stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3191  stats.queueClassBytes += sizeof(ConcurrentQueue);
3192 
3193  return stats;
3194  }
3195  };
3196 
3197  // For debugging only. Not thread-safe.
3198  MemStats getMemStats()
3199  {
3200  return MemStats::getFor(this);
3201  }
3202  private:
3203  friend struct MemStats;
3204 #endif
3205 
3206 
3208  // Producer list manipulation
3210 
3211  ProducerBase *recycle_or_create_producer(bool isExplicit) {
3212  bool recycled;
3213  return recycle_or_create_producer(isExplicit, recycled);
3214  }
3215 
3216  ProducerBase *recycle_or_create_producer(bool isExplicit, bool &recycled) {
3217 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3218  debug::DebugLock lock(implicitProdMutex);
3219 #endif
3220  // Try to re-use one first
3221  for (auto ptr = producerListTail.load(std::memory_order_acquire);
3222  ptr != nullptr; ptr = ptr->next_prod()) {
3223  if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3224  bool expected = true;
3225  if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false,
3226  std::memory_order_acquire,
3227  std::memory_order_relaxed)) {
3228  // We caught one! It's been marked as activated, the caller can have it
3229  recycled = true;
3230  return ptr;
3231  }
3232  }
3233  }
3234 
3235  recycled = false;
3236  return add_producer(isExplicit ? static_cast<ProducerBase *>(create<ExplicitProducer>(this))
3237  : create<ImplicitProducer>(this));
3238  }
3239 
3240  ProducerBase *add_producer(ProducerBase *producer) {
3241  // Handle failed memory allocation
3242  if (producer == nullptr) {
3243  return nullptr;
3244  }
3245 
3246  producerCount.fetch_add(1, std::memory_order_relaxed);
3247 
3248  // Add it to the lock-free list
3249  auto prevTail = producerListTail.load(std::memory_order_relaxed);
3250  do {
3251  producer->next = prevTail;
3252  } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release,
3253  std::memory_order_relaxed));
3254 
3255 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3256  if (producer->isExplicit) {
3257  auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3258  do {
3259  static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3260  } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3261  }
3262  else {
3263  auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3264  do {
3265  static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3266  } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3267  }
3268 #endif
3269 
3270  return producer;
3271  }
3272 
3273  void reown_producers() {
3274  // After another instance is moved-into/swapped-with this one, all the
3275  // producers we stole still think their parents are the other queue.
3276  // So fix them up!
3277  for (auto ptr = producerListTail.load(std::memory_order_relaxed);
3278  ptr != nullptr; ptr = ptr->next_prod()) {
3279  ptr->parent = this;
3280  }
3281  }
3282 
3283 
3285  // Implicit producer hash
3287 
3288  struct ImplicitProducerKVP {
3289  std::atomic<details::thread_id_t> key;
3290  ImplicitProducer *value; // No need for atomicity since it's only read by the thread that sets it in the first place
3291 
3292  ImplicitProducerKVP()
3293  : value(nullptr) {}
3294 
3295  ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3296  key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3297  value = other.value;
3298  }
3299 
3300  inline ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3301  swap(other);
3302  return *this;
3303  }
3304 
3305  inline void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT {
3306  if (this != &other) {
3307  details::swap_relaxed(key, other.key);
3308  std::swap(value, other.value);
3309  }
3310  }
3311  };
3312 
3313  template<typename XT, typename XTraits>
3314  friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &,
3315  typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT;
3316 
3317  struct ImplicitProducerHash {
3318  size_t capacity;
3319  ImplicitProducerKVP *entries;
3320  ImplicitProducerHash *prev;
3321  };
3322 
3323  inline void populate_initial_implicit_producer_hash() {
3324  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3325 
3326  implicitProducerHashCount.store(0, std::memory_order_relaxed);
3327  auto hash = &initialImplicitProducerHash;
3328  hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3329  hash->entries = &initialImplicitProducerHashEntries[0];
3330  for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3331  initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id,
3332  std::memory_order_relaxed);
3333  }
3334  hash->prev = nullptr;
3335  implicitProducerHash.store(hash, std::memory_order_relaxed);
3336  }
3337 
3338  void swap_implicit_producer_hashes(ConcurrentQueue &other) {
3339  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3340 
3341  // Swap (assumes our implicit producer hash is initialized)
3342  initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3343  initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3344  other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3345 
3346  details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3347 
3348  details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3349  if (implicitProducerHash.load(std::memory_order_relaxed) ==
3350  &other.initialImplicitProducerHash) {
3351  implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3352  } else {
3353  ImplicitProducerHash *hash;
3354  for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3355  hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3356  continue;
3357  }
3358  hash->prev = &initialImplicitProducerHash;
3359  }
3360  if (other.implicitProducerHash.load(std::memory_order_relaxed) ==
3361  &initialImplicitProducerHash) {
3362  other.implicitProducerHash.store(&other.initialImplicitProducerHash,
3363  std::memory_order_relaxed);
3364  } else {
3365  ImplicitProducerHash *hash;
3366  for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3367  hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3368  continue;
3369  }
3370  hash->prev = &other.initialImplicitProducerHash;
3371  }
3372  }
3373 
3374  // Only fails (returns nullptr) if memory allocation fails
3375  ImplicitProducer *get_or_add_implicit_producer() {
3376  // Note that since the data is essentially thread-local (key is thread ID),
3377  // there's a reduced need for fences (memory ordering is already consistent
3378  // for any individual thread), except for the current table itself.
3379 
3380  // Start by looking for the thread ID in the current and all previous hash tables.
3381  // If it's not found, it must not be in there yet, since this same thread would
3382  // have added it previously to one of the tables that we traversed.
3383 
3384  // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3385 
3386 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3387  debug::DebugLock lock(implicitProdMutex);
3388 #endif
3389 
3390  auto id = details::thread_id();
3391  auto hashedId = details::hash_thread_id(id);
3392 
3393  auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3394  for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3395  // Look for the id in this hash
3396  auto index = hashedId;
3397  while (true) { // Not an infinite loop because at least one slot is free in the hash table
3398  index &= hash->capacity - 1;
3399 
3400  auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3401  if (probedKey == id) {
3402  // Found it! If we had to search several hashes deep, though, we should lazily add it
3403  // to the current main hash table to avoid the extended search next time.
3404  // Note there's guaranteed to be room in the current hash table since every subsequent
3405  // table implicitly reserves space for all previous tables (there's only one
3406  // implicitProducerHashCount).
3407  auto value = hash->entries[index].value;
3408  if (hash != mainHash) {
3409  index = hashedId;
3410  while (true) {
3411  index &= mainHash->capacity - 1;
3412  probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3413  auto empty = details::invalid_thread_id;
3414 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3415  auto reusable = details::invalid_thread_id2;
3416  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3417  (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3418 #else
3419  if ((probedKey == empty &&
3420  mainHash->entries[index].key.compare_exchange_strong(empty, id,
3421  std::memory_order_relaxed,
3422  std::memory_order_relaxed))) {
3423 #endif
3424  mainHash->entries[index].value = value;
3425  break;
3426  }
3427  ++index;
3428  }
3429  }
3430 
3431  return value;
3432  }
3433  if (probedKey == details::invalid_thread_id) {
3434  break; // Not in this hash table
3435  }
3436  ++index;
3437  }
3438  }
3439 
3440  // Insert!
3441  auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3442  while (true) {
3443  if (newCount >= (mainHash->capacity >> 1) &&
3444  !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3445  // We've acquired the resize lock, try to allocate a bigger hash table.
3446  // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3447  // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3448  // locked block).
3449  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3450  if (newCount >= (mainHash->capacity >> 1)) {
3451  auto newCapacity = mainHash->capacity << 1;
3452  while (newCount >= (newCapacity >> 1)) {
3453  newCapacity <<= 1;
3454  }
3455  auto raw = static_cast<char *>((Traits::malloc)(
3456  sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 +
3457  sizeof(ImplicitProducerKVP) * newCapacity));
3458  if (raw == nullptr) {
3459  // Allocation failed
3460  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3461  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3462  return nullptr;
3463  }
3464 
3465  auto newHash = new(raw) ImplicitProducerHash;
3466  newHash->capacity = newCapacity;
3467  newHash->entries = reinterpret_cast<ImplicitProducerKVP *>(details::align_for<ImplicitProducerKVP>(
3468  raw + sizeof(ImplicitProducerHash)));
3469  for (size_t i = 0; i != newCapacity; ++i) {
3470  new(newHash->entries + i) ImplicitProducerKVP;
3471  newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3472  }
3473  newHash->prev = mainHash;
3474  implicitProducerHash.store(newHash, std::memory_order_release);
3475  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3476  mainHash = newHash;
3477  } else {
3478  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3479  }
3480  }
3481 
3482  // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3483  // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3484  // always be true)
3485  if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3486  bool recycled;
3487  auto producer = static_cast<ImplicitProducer *>(recycle_or_create_producer(false,
3488  recycled));
3489  if (producer == nullptr) {
3490  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3491  return nullptr;
3492  }
3493  if (recycled) {
3494  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3495  }
3496 
3497 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3498  producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3499  producer->threadExitListener.userData = producer;
3500  details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3501 #endif
3502 
3503  auto index = hashedId;
3504  while (true) {
3505  index &= mainHash->capacity - 1;
3506  auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3507 
3508  auto empty = details::invalid_thread_id;
3509 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3510  auto reusable = details::invalid_thread_id2;
3511  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3512  (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3513 #else
3514  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id,
3515  std::memory_order_relaxed,
3516  std::memory_order_relaxed))) {
3517 #endif
3518  mainHash->entries[index].value = producer;
3519  break;
3520  }
3521  ++index;
3522  }
3523  return producer;
3524  }
3525 
3526  // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3527  // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3528  // we try to allocate ourselves).
3529  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3530  }
3531  }
3532 
3533 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3534  void implicit_producer_thread_exited(ImplicitProducer* producer)
3535  {
3536  // Remove from thread exit listeners
3537  details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3538 
3539  // Remove from hash
3540 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3541  debug::DebugLock lock(implicitProdMutex);
3542 #endif
3543  auto hash = implicitProducerHash.load(std::memory_order_acquire);
3544  assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3545  auto id = details::thread_id();
3546  auto hashedId = details::hash_thread_id(id);
3547  details::thread_id_t probedKey;
3548 
3549  // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3550  // trying to add an entry thinking there's a free slot (because they reused a producer)
3551  for (; hash != nullptr; hash = hash->prev) {
3552  auto index = hashedId;
3553  do {
3554  index &= hash->capacity - 1;
3555  probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3556  if (probedKey == id) {
3557  hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3558  break;
3559  }
3560  ++index;
3561  } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3562  }
3563 
3564  // Mark the queue as being recyclable
3565  producer->inactive.store(true, std::memory_order_release);
3566  }
3567 
3568  static void implicit_producer_thread_exited_callback(void* userData)
3569  {
3570  auto producer = static_cast<ImplicitProducer*>(userData);
3571  auto queue = producer->parent;
3572  queue->implicit_producer_thread_exited(producer);
3573  }
3574 #endif
3575 
3577  // Utility functions
3579 
3580  template<typename U>
3581  static inline U *create_array(size_t count) {
3582  assert(count > 0);
3583  auto p = static_cast<U *>((Traits::malloc)(sizeof(U) * count));
3584  if (p == nullptr) {
3585  return nullptr;
3586  }
3587 
3588  for (size_t i = 0; i != count; ++i) {
3589  new(p + i) U();
3590  }
3591  return p;
3592  }
3593 
3594  template<typename U>
3595  static inline void destroy_array(U *p, size_t count) {
3596  if (p != nullptr) {
3597  assert(count > 0);
3598  for (size_t i = count; i != 0;) {
3599  (p + --i)->~U();
3600  }
3601  (Traits::free)(p);
3602  }
3603  }
3604 
3605  template<typename U>
3606  static inline U *create() {
3607  auto p = (Traits::malloc)(sizeof(U));
3608  return p != nullptr ? new(p) U : nullptr;
3609  }
3610 
3611  template<typename U, typename A1>
3612  static inline U *create(A1 &&a1) {
3613  auto p = (Traits::malloc)(sizeof(U));
3614  return p != nullptr ? new(p) U(std::forward<A1>(a1)) : nullptr;
3615  }
3616 
3617  template<typename U>
3618  static inline void destroy(U *p) {
3619  if (p != nullptr) {
3620  p->~U();
3621  }
3622  (Traits::free)(p);
3623  }
3624 
3625  private:
3626  std::atomic<ProducerBase *> producerListTail;
3627  std::atomic<std::uint32_t> producerCount;
3628 
3629  std::atomic<size_t> initialBlockPoolIndex;
3630  Block *initialBlockPool;
3631  size_t initialBlockPoolSize;
3632 
3633 #if !MCDBGQ_USEDEBUGFREELIST
3634  FreeList<Block> freeList;
3635 #else
3636  debug::DebugFreeList<Block> freeList;
3637 #endif
3638 
3639  std::atomic<ImplicitProducerHash *> implicitProducerHash;
3640  std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3641  ImplicitProducerHash initialImplicitProducerHash;
3642  std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3643  std::atomic_flag implicitProducerHashResizeInProgress;
3644 
3645  std::atomic<std::uint32_t> nextExplicitConsumerId;
3646  std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3647 
3648 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3649  debug::DebugMutex implicitProdMutex;
3650 #endif
3651 
3652 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3653  std::atomic<ExplicitProducer*> explicitProducers;
3654  std::atomic<ImplicitProducer*> implicitProducers;
3655 #endif
3656 };
3657 
3658 
3659 template<typename T, typename Traits>
3660 ProducerToken::ProducerToken(ConcurrentQueue<T, Traits> &queue)
3661  : producer(queue.recycle_or_create_producer(true)) {
3662  if (producer != nullptr) {
3663  producer->token = this;
3664  }
3665 }
3666 
3667 template<typename T, typename Traits>
3668 ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits> &queue)
3669  : producer(
3670  reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)->recycle_or_create_producer(true)) {
3671  if (producer != nullptr) {
3672  producer->token = this;
3673  }
3674 }
3675 
3676 template<typename T, typename Traits>
3677 ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits> &queue)
3678  : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3679  initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3680  lastKnownGlobalOffset = -1;
3681 }
3682 
3683 template<typename T, typename Traits>
3684 ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits> &queue)
3685  : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3686  initialOffset = reinterpret_cast<ConcurrentQueue <T, Traits> *>(&queue)->nextExplicitConsumerId.fetch_add(
3687  1, std::memory_order_release);
3688  lastKnownGlobalOffset = -1;
3689 }
3690 
3691 template<typename T, typename Traits>
3692 inline void swap(ConcurrentQueue<T, Traits> &a, ConcurrentQueue<T, Traits> &b) MOODYCAMEL_NOEXCEPT {
3693  a.swap(b);
3694 }
3695 
3696 inline void swap(ProducerToken &a, ProducerToken &b) MOODYCAMEL_NOEXCEPT {
3697  a.swap(b);
3698 }
3699 
3700 inline void swap(ConsumerToken &a, ConsumerToken &b) MOODYCAMEL_NOEXCEPT {
3701  a.swap(b);
3702 }
3703 
3704 template<typename T, typename Traits>
3705 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
3706  typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT {
3707  a.swap(b);
3708 }
3709 
3710 }
3711 
3712 } // namespace dmlc
3713 
3714 #if defined(__GNUC__)
3715 #pragma GCC diagnostic pop
3716 #endif
3717 
3718 #endif // DMLC_CONCURRENTQUEUE_H_
3719 
Definition: optional.h:241
unsigned index_t
this defines the unsigned integer type that can normally be used to store feature index ...
Definition: data.h:32
namespace for dmlc
Definition: array_view.h:12