AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
concurrentqueue.h
1 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2 // An overview, including benchmark results, is provided here:
3 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4 // The full design is also described in excruciating detail at:
5 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6 
7 // Simplified BSD license:
8 // Copyright (c) 2013-2020, Cameron Desrochers.
9 // All rights reserved.
10 //
11 // Redistribution and use in source and binary forms, with or without modification,
12 // are permitted provided that the following conditions are met:
13 //
14 // - Redistributions of source code must retain the above copyright notice, this list of
15 // conditions and the following disclaimer.
16 // - Redistributions in binary form must reproduce the above copyright notice, this list of
17 // conditions and the following disclaimer in the documentation and/or other materials
18 // provided with the distribution.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28 // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 
30 // Also dual-licensed under the Boost Software License (see LICENSE.md)
31 
32 #pragma once
33 
34 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
35 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36 // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37 // upon assigning any computed values)
38 #pragma GCC diagnostic push
39 #pragma GCC diagnostic ignored "-Wconversion"
40 
41 #ifdef MCDBGQ_USE_RELACY
42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43 #endif
44 #endif
45 
46 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
47  // VS2019 with /W4 warns about constant conditional expressions but unless /std=c++17 or higher
48 // does not support `if constexpr`, so we have no choice but to simply disable the warning
49 #pragma warning(push)
50 #pragma warning(disable: 4127) // conditional expression is constant
51 #endif
52 
53 #if defined(__APPLE__)
54 #include "TargetConditionals.h"
55 #endif
56 
57 #ifdef MCDBGQ_USE_RELACY
58  #include "relacy/relacy_std.hpp"
59 #include "relacy_shims.h"
60 // We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
61 // We'll override the default trait malloc ourselves without a macro.
62 #undef new
63 #undef delete
64 #undef malloc
65 #undef free
66 #else
67 #include <atomic> // Requires C++11. Sorry VS2010.
68 #include <cassert>
69 #endif
70 #include <cstddef> // for max_align_t
71 #include <cstdint>
72 #include <cstdlib>
73 #include <type_traits>
74 #include <algorithm>
75 #include <utility>
76 #include <limits>
77 #include <climits> // for CHAR_BIT
78 #include <array>
79 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
80 
81 // Platform-specific definitions of a numeric thread ID type and an invalid value
82 namespace moodycamel {
83 namespace details {
84 template<typename thread_id_t>
86  typedef thread_id_t thread_id_numeric_size_t;
87  typedef thread_id_t thread_id_hash_t;
88  static thread_id_hash_t prehash(thread_id_t const &x) { return x; }
89 };
90 }
91 }
92 #if defined(MCDBGQ_USE_RELACY)
93  namespace moodycamel { namespace details {
94  typedef std::uint32_t thread_id_t;
95  static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
96  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
97  static inline thread_id_t thread_id() { return rl::thread_index(); }
98 } }
99 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
100  // No sense pulling in windows.h in a header, we'll manually declare the function
101 // we use and rely on backwards-compatibility for this not to break
102 extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
103 namespace moodycamel { namespace details {
104  static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
105  typedef std::uint32_t thread_id_t;
106  static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
107  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
108  static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
109 } }
110 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || defined(MOODYCAMEL_NO_THREAD_LOCAL)
111  namespace moodycamel { namespace details {
112  static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
113 
114  typedef std::thread::id thread_id_t;
115  static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
116 
117  // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
118  // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
119  // be.
120  static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
121 
122  template<std::size_t> struct thread_id_size { };
123  template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
124  template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
125 
126  template<> struct thread_id_converter<thread_id_t> {
127  typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
128 #ifndef __APPLE__
129  typedef std::size_t thread_id_hash_t;
130 #else
131  typedef thread_id_numeric_size_t thread_id_hash_t;
132 #endif
133 
134  static thread_id_hash_t prehash(thread_id_t const& x)
135  {
136 #ifndef __APPLE__
137  return std::hash<std::thread::id>()(x);
138 #else
139  return *reinterpret_cast<thread_id_hash_t const*>(&x);
140 #endif
141  }
142  };
143 } }
144 #else
145 // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
146 // In order to get a numeric thread ID in a platform-independent way, we use a thread-local
147 // static variable's address as a thread identifier :-)
148 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
149 #define MOODYCAMEL_THREADLOCAL __thread
150 #elif defined(_MSC_VER)
151  #define MOODYCAMEL_THREADLOCAL __declspec(thread)
152 #else
153 // Assume C++11 compliant compiler
154 #define MOODYCAMEL_THREADLOCAL thread_local
155 #endif
156 namespace moodycamel {
157 namespace details {
158 typedef std::uintptr_t thread_id_t;
159 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
160 static const thread_id_t invalid_thread_id2 =
161  1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
162 inline thread_id_t thread_id() {
163  static MOODYCAMEL_THREADLOCAL int x;
164  return reinterpret_cast<thread_id_t>(&x);
165 }
166 }
167 }
168 #endif
169 
170 // Constexpr if
171 #ifndef MOODYCAMEL_CONSTEXPR_IF
172 #if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
173 #define MOODYCAMEL_CONSTEXPR_IF if constexpr
174 #define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
175 #else
176  #define MOODYCAMEL_CONSTEXPR_IF if
177 #define MOODYCAMEL_MAYBE_UNUSED
178 #endif
179 #endif
180 
181 // Exceptions
182 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
183 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
184 #define MOODYCAMEL_EXCEPTIONS_ENABLED
185 #endif
186 #endif
187 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
188 #define MOODYCAMEL_TRY try
189 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
190 #define MOODYCAMEL_RETHROW throw
191 #define MOODYCAMEL_THROW(expr) throw (expr)
192 #else
193  #define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF (true)
194 #define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF (false)
195 #define MOODYCAMEL_RETHROW
196 #define MOODYCAMEL_THROW(expr)
197 #endif
198 
199 #ifndef MOODYCAMEL_NOEXCEPT
200 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
201  #define MOODYCAMEL_NOEXCEPT
202 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
203 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
204 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
205  // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
206 // We have to assume *all* non-trivial constructors may throw on VS2012!
207 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
208 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
209 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
210 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
211  #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
212 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
213 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
214 #else
215 #define MOODYCAMEL_NOEXCEPT noexcept
216 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
217 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
218 #endif
219 #endif
220 
221 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
222 #ifdef MCDBGQ_USE_RELACY
223 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
224 #else
225 // VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
226 // g++ <=4.7 doesn't support thread_local either.
227 // Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
228 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
229 // Assume `thread_local` is fully supported in all other C++11 compilers/platforms
230 //#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
231 #endif
232 #endif
233 #endif
234 
235 // VS2012 doesn't support deleted functions.
236 // In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
237 #ifndef MOODYCAMEL_DELETE_FUNCTION
238 #if defined(_MSC_VER) && _MSC_VER < 1800
239 #define MOODYCAMEL_DELETE_FUNCTION
240 #else
241 #define MOODYCAMEL_DELETE_FUNCTION = delete
242 #endif
243 #endif
244 
245 namespace moodycamel {
246 namespace details {
247 #ifndef MOODYCAMEL_ALIGNAS
248 // VS2013 doesn't support alignas or alignof, and align() requires a constant literal
249 #if defined(_MSC_VER) && _MSC_VER <= 1800
250  #define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
251 #define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
252 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
253  template<int Align, typename T> struct Vs2013Aligned { }; // default, unsupported alignment
254  template<typename T> struct Vs2013Aligned<1, T> { typedef __declspec(align(1)) T type; };
255  template<typename T> struct Vs2013Aligned<2, T> { typedef __declspec(align(2)) T type; };
256  template<typename T> struct Vs2013Aligned<4, T> { typedef __declspec(align(4)) T type; };
257  template<typename T> struct Vs2013Aligned<8, T> { typedef __declspec(align(8)) T type; };
258  template<typename T> struct Vs2013Aligned<16, T> { typedef __declspec(align(16)) T type; };
259  template<typename T> struct Vs2013Aligned<32, T> { typedef __declspec(align(32)) T type; };
260  template<typename T> struct Vs2013Aligned<64, T> { typedef __declspec(align(64)) T type; };
261  template<typename T> struct Vs2013Aligned<128, T> { typedef __declspec(align(128)) T type; };
262  template<typename T> struct Vs2013Aligned<256, T> { typedef __declspec(align(256)) T type; };
263 #else
264 template<typename T>
265 struct identity { typedef T type; };
266 #define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
267 #define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
268 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
269 #endif
270 #endif
271 }
272 }
273 
274 
275 // TSAN can false report races in lock-free code. To enable TSAN to be used from projects that use this one,
276 // we can apply per-function compile-time suppression.
277 // See https://clang.llvm.org/docs/ThreadSanitizer.html#has-feature-thread-sanitizer
278 #define MOODYCAMEL_NO_TSAN
279 #if defined(__has_feature)
280  #if __has_feature(thread_sanitizer)
281  #undef MOODYCAMEL_NO_TSAN
282  #define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
283  #endif // TSAN
284 #endif // TSAN
285 
286 // Compiler-specific likely/unlikely hints
287 namespace moodycamel {
288 namespace details {
289 #if defined(__GNUC__)
290 static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
291 static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
292 #else
293  static inline bool (likely)(bool x) { return x; }
294  static inline bool (unlikely)(bool x) { return x; }
295 #endif
296 }
297 }
298 
299 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
300 #include "internal/concurrentqueue_internal_debug.h"
301 #endif
302 
303 namespace moodycamel {
304 namespace details {
305 template<typename T>
307  static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
308  static const T value = std::numeric_limits<T>::is_signed
309  ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
310  : static_cast<T>(-1);
311 };
312 
313 #if defined(__GLIBCXX__)
314 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
315 #else
316 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
317 #endif
318 
319 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
320 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
321 typedef union {
322  std_max_align_t x;
323  long long y;
324  void *z;
325 } max_align_t;
326 }
327 
328 // Default traits for the ConcurrentQueue. To change some of the
329 // traits without re-implementing all of them, inherit from this
330 // struct and shadow the declarations you wish to be different;
331 // since the traits are used as a template type parameter, the
332 // shadowed declarations will be used where defined, and the defaults
333 // otherwise.
335  // General-purpose size type. std::size_t is strongly recommended.
336  typedef std::size_t size_t;
337 
338  // The type used for the enqueue and dequeue indices. Must be at least as
339  // large as size_t. Should be significantly larger than the number of elements
340  // you expect to hold at once, especially if you have a high turnover rate;
341  // for example, on 32-bit x86, if you expect to have over a hundred million
342  // elements or pump several million elements through your queue in a very
343  // short space of time, using a 32-bit type *may* trigger a race condition.
344  // A 64-bit int type is recommended in that case, and in practice will
345  // prevent a race condition no matter the usage of the queue. Note that
346  // whether the queue is lock-free with a 64-int type depends on the whether
347  // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
348  typedef std::size_t index_t;
349 
350  // Internally, all elements are enqueued and dequeued from multi-element
351  // blocks; this is the smallest controllable unit. If you expect few elements
352  // but many producers, a smaller block size should be favoured. For few producers
353  // and/or many elements, a larger block size is preferred. A sane default
354  // is provided. Must be a power of 2.
355  static const size_t BLOCK_SIZE = 32;
356 
357  // For explicit producers (i.e. when using a producer token), the block is
358  // checked for being empty by iterating through a list of flags, one per element.
359  // For large block sizes, this is too inefficient, and switching to an atomic
360  // counter-based approach is faster. The switch is made for block sizes strictly
361  // larger than this threshold.
362  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
363 
364  // How many full blocks can be expected for a single explicit producer? This should
365  // reflect that number's maximum for optimal performance. Must be a power of 2.
366  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
367 
368  // How many full blocks can be expected for a single implicit producer? This should
369  // reflect that number's maximum for optimal performance. Must be a power of 2.
370  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
371 
372  // The initial size of the hash table mapping thread IDs to implicit producers.
373  // Note that the hash is resized every time it becomes half full.
374  // Must be a power of two, and either 0 or at least 1. If 0, implicit production
375  // (using the enqueue methods without an explicit producer token) is disabled.
376  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
377 
378  // Controls the number of items that an explicit consumer (i.e. one with a token)
379  // must consume before it causes all consumers to rotate and move on to the next
380  // internal queue.
381  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
382 
383  // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
384  // Enqueue operations that would cause this limit to be surpassed will fail. Note
385  // that this limit is enforced at the block level (for performance reasons), i.e.
386  // it's rounded up to the nearest block size.
387  static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
388 
389  // The number of times to spin before sleeping when waiting on a semaphore.
390  // Recommended values are on the order of 1000-10000 unless the number of
391  // consumer threads exceeds the number of idle cores (in which case try 0-100).
392  // Only affects instances of the BlockingConcurrentQueue.
393  static const int MAX_SEMA_SPINS = 10000;
394 
395 #ifndef MCDBGQ_USE_RELACY
396  // Memory allocation can be customized if needed.
397  // malloc should return nullptr on failure, and handle alignment like std::malloc.
398 #if defined(malloc) || defined(free)
399  // Gah, this is 2015, stop defining macros that break standard code already!
400  // Work around malloc/free being special macros:
401  static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
402  static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
403  static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
404  static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
405 #else
406  static inline void *malloc(size_t size) { return std::malloc(size); }
407  static inline void free(void *ptr) { return std::free(ptr); }
408 #endif
409 #else
410  // Debug versions when running under the Relacy race detector (ignore
411  // these in user code)
412  static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
413  static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
414 #endif
415 };
416 
417 // When producing or consuming many elements, the most efficient way is to:
418 // 1) Use one of the bulk-operation methods of the queue with a token
419 // 2) Failing that, use the bulk-operation methods without a token
420 // 3) Failing that, create a token and use that with the single-item methods
421 // 4) Failing that, use the single-parameter methods of the queue
422 // Having said that, don't create tokens willy-nilly -- ideally there should be
423 // a maximum of one token per thread (of each kind).
424 struct ProducerToken;
425 struct ConsumerToken;
426 
427 template<typename T, typename Traits>
428 class ConcurrentQueue;
429 template<typename T, typename Traits>
431 class ConcurrentQueueTests;
432 
433 namespace details {
436  std::atomic<bool> inactive;
437  ProducerToken *token;
438 
440  : next(nullptr), inactive(false), token(nullptr) {
441  }
442 };
443 
444 template<bool use32>
446  static inline std::uint32_t hash(std::uint32_t h) {
447  // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
448  // Since the thread ID is already unique, all we really want to do is propagate that
449  // uniqueness evenly across all the bits, so that we can use a subset of the bits while
450  // reducing collisions significantly
451  h ^= h >> 16;
452  h *= 0x85ebca6b;
453  h ^= h >> 13;
454  h *= 0xc2b2ae35;
455  return h ^ (h >> 16);
456  }
457 };
458 template<>
459 struct _hash_32_or_64<1> {
460  static inline std::uint64_t hash(std::uint64_t h) {
461  h ^= h >> 33;
462  h *= 0xff51afd7ed558ccd;
463  h ^= h >> 33;
464  h *= 0xc4ceb9fe1a85ec53;
465  return h ^ (h >> 33);
466  }
467 };
468 template<std::size_t size>
469 struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> {};
470 
471 static inline size_t hash_thread_id(thread_id_t id) {
472  static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
473  return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
475 }
476 
477 template<typename T>
478 static inline bool circular_less_than(T a, T b) {
479 #ifdef _MSC_VER
480  #pragma warning(push)
481 #pragma warning(disable: 4554)
482 #endif
483  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
484  "circular_less_than is intended to be used only with unsigned integer types");
485  return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
486 #ifdef _MSC_VER
487 #pragma warning(pop)
488 #endif
489 }
490 
491 template<typename U>
492 static inline char *align_for(char *ptr) {
493  const std::size_t alignment = std::alignment_of<U>::value;
494  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
495 }
496 
497 template<typename T>
498 static inline T ceil_to_pow_2(T x) {
499  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
500  "ceil_to_pow_2 is intended to be used only with unsigned integer types");
501 
502  // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
503  --x;
504  x |= x >> 1;
505  x |= x >> 2;
506  x |= x >> 4;
507  for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
508  x |= x >> (i << 3);
509  }
510  ++x;
511  return x;
512 }
513 
514 template<typename T>
515 static inline void swap_relaxed(std::atomic<T> &left, std::atomic<T> &right) {
516  T temp = std::move(left.load(std::memory_order_relaxed));
517  left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
518  right.store(std::move(temp), std::memory_order_relaxed);
519 }
520 
521 template<typename T>
522 static inline T const &nomove(T const &x) {
523  return x;
524 }
525 
526 template<bool Enable>
527 struct nomove_if {
528  template<typename T>
529  static inline T const &eval(T const &x) {
530  return x;
531  }
532 };
533 
534 template<>
535 struct nomove_if<false> {
536  template<typename U>
537  static inline auto eval(U &&x)
538  -> decltype(std::forward<U>(x)) {
539  return std::forward<U>(x);
540  }
541 };
542 
543 template<typename It>
544 static inline auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it) {
545  return *it;
546 }
547 
548 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
549 template<typename T>
550 struct is_trivially_destructible : std::is_trivially_destructible<T> {};
551 #else
552 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
553 #endif
554 
555 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
556  #ifdef MCDBGQ_USE_RELACY
557  typedef RelacyThreadExitListener ThreadExitListener;
558  typedef RelacyThreadExitNotifier ThreadExitNotifier;
559 #else
560  struct ThreadExitListener
561  {
562  typedef void (*callback_t)(void*);
563  callback_t callback;
564  void* userData;
565 
566  ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
567  };
568 
569 
570  class ThreadExitNotifier
571  {
572  public:
573  static void subscribe(ThreadExitListener* listener)
574  {
575  auto& tlsInst = instance();
576  listener->next = tlsInst.tail;
577  tlsInst.tail = listener;
578  }
579 
580  static void unsubscribe(ThreadExitListener* listener)
581  {
582  auto& tlsInst = instance();
583  ThreadExitListener** prev = &tlsInst.tail;
584  for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
585  if (ptr == listener) {
586  *prev = ptr->next;
587  break;
588  }
589  prev = &ptr->next;
590  }
591  }
592 
593  private:
594  ThreadExitNotifier() : tail(nullptr) { }
595  ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
596  ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
597 
598  ~ThreadExitNotifier()
599  {
600  // This thread is about to exit, let everyone know!
601  assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
602  for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
603  ptr->callback(ptr->userData);
604  }
605  }
606 
607  // Thread-local
608  static inline ThreadExitNotifier& instance()
609  {
610  static thread_local ThreadExitNotifier notifier;
611  return notifier;
612  }
613 
614  private:
615  ThreadExitListener* tail;
616  };
617 #endif
618 #endif
619 
620 template<typename T>
621 struct static_is_lock_free_num { enum { value = 0 }; };
622 template<>
623 struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
624 template<>
625 struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
626 template<>
627 struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
628 template<>
629 struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
630 template<>
631 struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
632 template<typename T>
633 struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> {};
634 template<>
635 struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
636 template<typename U>
637 struct static_is_lock_free<U *> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
638 }
639 
641  template<typename T, typename Traits>
642  explicit ProducerToken(ConcurrentQueue<T, Traits> &queue);
643 
644  template<typename T, typename Traits>
646 
647  ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
648  : producer(other.producer) {
649  other.producer = nullptr;
650  if (producer != nullptr) {
651  producer->token = this;
652  }
653  }
654 
655  inline ProducerToken &operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT {
656  swap(other);
657  return *this;
658  }
659 
660  void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT {
661  std::swap(producer, other.producer);
662  if (producer != nullptr) {
663  producer->token = this;
664  }
665  if (other.producer != nullptr) {
666  other.producer->token = &other;
667  }
668  }
669 
670  // A token is always valid unless:
671  // 1) Memory allocation failed during construction
672  // 2) It was moved via the move constructor
673  // (Note: assignment does a swap, leaving both potentially valid)
674  // 3) The associated queue was destroyed
675  // Note that if valid() returns true, that only indicates
676  // that the token is valid for use with a specific queue,
677  // but not which one; that's up to the user to track.
678  inline bool valid() const { return producer != nullptr; }
679 
680  ~ProducerToken() {
681  if (producer != nullptr) {
682  producer->token = nullptr;
683  producer->inactive.store(true, std::memory_order_release);
684  }
685  }
686 
687  // Disable copying and assignment
688  ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION;
689  ProducerToken &operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION;
690 
691  private:
692  template<typename T, typename Traits> friend
693  class ConcurrentQueue;
694  friend class ConcurrentQueueTests;
695 
696  protected:
698 };
699 
701  template<typename T, typename Traits>
703 
704  template<typename T, typename Traits>
706 
707  ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
708  : initialOffset(other.initialOffset),
709  lastKnownGlobalOffset(other.lastKnownGlobalOffset),
710  itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
711  currentProducer(other.currentProducer),
712  desiredProducer(other.desiredProducer) {
713  }
714 
715  inline ConsumerToken &operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT {
716  swap(other);
717  return *this;
718  }
719 
720  void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT {
721  std::swap(initialOffset, other.initialOffset);
722  std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
723  std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
724  std::swap(currentProducer, other.currentProducer);
725  std::swap(desiredProducer, other.desiredProducer);
726  }
727 
728  // Disable copying and assignment
729  ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION;
730  ConsumerToken &operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION;
731 
732  private:
733  template<typename T, typename Traits> friend
734  class ConcurrentQueue;
735  friend class ConcurrentQueueTests;
736 
737  private: // but shared with ConcurrentQueue
738  std::uint32_t initialOffset;
739  std::uint32_t lastKnownGlobalOffset;
740  std::uint32_t itemsConsumedFromCurrent;
743 };
744 
745 // Need to forward-declare this swap because it's in a namespace.
746 // See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
747 template<typename T, typename Traits>
748 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
749  typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT;
750 
751 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
753  public:
754  typedef ::moodycamel::ProducerToken producer_token_t;
755  typedef ::moodycamel::ConsumerToken consumer_token_t;
756 
757  typedef typename Traits::index_t index_t;
758  typedef typename Traits::size_t size_t;
759 
760  static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
761  static const size_t
762  EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
763  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
764  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
765  static const size_t
766  INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
767  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
768  static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
769 #ifdef _MSC_VER
770  #pragma warning(push)
771 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
772 #pragma warning(disable: 4309) // static_cast: Truncation of constant value
773 #endif
774  static const size_t MAX_SUBQUEUE_SIZE =
775  (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE)
776  ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1))
777  / BLOCK_SIZE * BLOCK_SIZE);
778 #ifdef _MSC_VER
779 #pragma warning(pop)
780 #endif
781 
782  static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
783  "Traits::size_t must be an unsigned integral type");
784  static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
785  "Traits::index_t must be an unsigned integral type");
786  static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
787  static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
788  "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
789  static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1)
790  && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
791  "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
792  static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
793  "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
794  static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
795  "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
796  static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
797  || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
798  "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
799  static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
800  "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
801 
802  public:
803  // Creates a queue with at least `capacity` element slots; note that the
804  // actual number of elements that can be inserted without additional memory
805  // allocation depends on the number of producers and the block size (e.g. if
806  // the block size is equal to `capacity`, only a single block will be allocated
807  // up-front, which means only a single producer will be able to enqueue elements
808  // without an extra allocation -- blocks aren't shared between producers).
809  // This method is not thread safe -- it is up to the user to ensure that the
810  // queue is fully constructed before it starts being used by other threads (this
811  // includes making the memory effects of construction visible, possibly with a
812  // memory barrier).
813  explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
814  : producerListTail(nullptr),
815  producerCount(0),
816  initialBlockPoolIndex(0),
817  nextExplicitConsumerId(0),
818  globalExplicitConsumerOffset(0) {
819  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
820  populate_initial_implicit_producer_hash();
821  populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
822 
823 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
824  // Track all the producers using a fully-resolved typed list for
825  // each kind; this makes it possible to debug them starting from
826  // the root queue object (otherwise wacky casts are needed that
827  // don't compile in the debugger's expression evaluator).
828  explicitProducers.store(nullptr, std::memory_order_relaxed);
829  implicitProducers.store(nullptr, std::memory_order_relaxed);
830 #endif
831  }
832 
833  // Computes the correct amount of pre-allocated blocks for you based
834  // on the minimum number of elements you want available at any given
835  // time, and the maximum concurrent number of each type of producer.
836  ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
837  : producerListTail(nullptr),
838  producerCount(0),
839  initialBlockPoolIndex(0),
840  nextExplicitConsumerId(0),
841  globalExplicitConsumerOffset(0) {
842  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
843  populate_initial_implicit_producer_hash();
844  size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1)
845  + 2 * (maxExplicitProducers + maxImplicitProducers);
846  populate_initial_block_list(blocks);
847 
848 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
849  explicitProducers.store(nullptr, std::memory_order_relaxed);
850  implicitProducers.store(nullptr, std::memory_order_relaxed);
851 #endif
852  }
853 
854  // Note: The queue should not be accessed concurrently while it's
855  // being deleted. It's up to the user to synchronize this.
856  // This method is not thread safe.
857  ~ConcurrentQueue() {
858  // Destroy producers
859  auto ptr = producerListTail.load(std::memory_order_relaxed);
860  while (ptr != nullptr) {
861  auto next = ptr->next_prod();
862  if (ptr->token != nullptr) {
863  ptr->token->producer = nullptr;
864  }
865  destroy(ptr);
866  ptr = next;
867  }
868 
869  // Destroy implicit producer hash tables
870  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
871  auto hash = implicitProducerHash.load(std::memory_order_relaxed);
872  while (hash != nullptr) {
873  auto prev = hash->prev;
874  if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
875  for (size_t i = 0; i != hash->capacity; ++i) {
876  hash->entries[i].~ImplicitProducerKVP();
877  }
878  hash->~ImplicitProducerHash();
879  (Traits::free)(hash);
880  }
881  hash = prev;
882  }
883  }
884 
885  // Destroy global free list
886  auto block = freeList.head_unsafe();
887  while (block != nullptr) {
888  auto next = block->freeListNext.load(std::memory_order_relaxed);
889  if (block->dynamicallyAllocated) {
890  destroy(block);
891  }
892  block = next;
893  }
894 
895  // Destroy initial free list
896  destroy_array(initialBlockPool, initialBlockPoolSize);
897  }
898 
899  // Disable copying and copy assignment
900  ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
901  ConcurrentQueue &operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
902 
903  // Moving is supported, but note that it is *not* a thread-safe operation.
904  // Nobody can use the queue while it's being moved, and the memory effects
905  // of that move must be propagated to other threads before they can use it.
906  // Note: When a queue is moved, its tokens are still valid but can only be
907  // used with the destination queue (i.e. semantically they are moved along
908  // with the queue itself).
909  ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
910  : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
911  producerCount(other.producerCount.load(std::memory_order_relaxed)),
912  initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
913  initialBlockPool(other.initialBlockPool),
914  initialBlockPoolSize(other.initialBlockPoolSize),
915  freeList(std::move(other.freeList)),
916  nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
917  globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
918  // Move the other one into this, and leave the other one as an empty queue
919  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
920  populate_initial_implicit_producer_hash();
921  swap_implicit_producer_hashes(other);
922 
923  other.producerListTail.store(nullptr, std::memory_order_relaxed);
924  other.producerCount.store(0, std::memory_order_relaxed);
925  other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
926  other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
927 
928 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
929  explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
930  other.explicitProducers.store(nullptr, std::memory_order_relaxed);
931  implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
932  other.implicitProducers.store(nullptr, std::memory_order_relaxed);
933 #endif
934 
935  other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
936  other.initialBlockPoolSize = 0;
937  other.initialBlockPool = nullptr;
938 
939  reown_producers();
940  }
941 
942  inline ConcurrentQueue &operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT {
943  return swap_internal(other);
944  }
945 
946  // Swaps this queue's state with the other's. Not thread-safe.
947  // Swapping two queues does not invalidate their tokens, however
948  // the tokens that were created for one queue must be used with
949  // only the swapped queue (i.e. the tokens are tied to the
950  // queue's movable state, not the object itself).
951  inline void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT {
952  swap_internal(other);
953  }
954 
955  private:
956  ConcurrentQueue &swap_internal(ConcurrentQueue &other) {
957  if (this == &other) {
958  return *this;
959  }
960 
961  details::swap_relaxed(producerListTail, other.producerListTail);
962  details::swap_relaxed(producerCount, other.producerCount);
963  details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
964  std::swap(initialBlockPool, other.initialBlockPool);
965  std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
966  freeList.swap(other.freeList);
967  details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
968  details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
969 
970  swap_implicit_producer_hashes(other);
971 
972  reown_producers();
973  other.reown_producers();
974 
975 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
976  details::swap_relaxed(explicitProducers, other.explicitProducers);
977  details::swap_relaxed(implicitProducers, other.implicitProducers);
978 #endif
979 
980  return *this;
981  }
982 
983  public:
984  // Enqueues a single item (by copying it).
985  // Allocates memory if required. Only fails if memory allocation fails (or implicit
986  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
987  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
988  // Thread-safe.
989  inline bool enqueue(T const &item) {
990  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
991  else return inner_enqueue<CanAlloc>(item);
992  }
993 
994  // Enqueues a single item (by moving it, if possible).
995  // Allocates memory if required. Only fails if memory allocation fails (or implicit
996  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
997  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
998  // Thread-safe.
999  inline bool enqueue(T &&item) {
1000  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1001  else return inner_enqueue<CanAlloc>(std::move(item));
1002  }
1003 
1004  // Enqueues a single item (by copying it) using an explicit producer token.
1005  // Allocates memory if required. Only fails if memory allocation fails (or
1006  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1007  // Thread-safe.
1008  inline bool enqueue(producer_token_t const &token, T const &item) {
1009  return inner_enqueue<CanAlloc>(token, item);
1010  }
1011 
1012  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1013  // Allocates memory if required. Only fails if memory allocation fails (or
1014  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1015  // Thread-safe.
1016  inline bool enqueue(producer_token_t const &token, T &&item) {
1017  return inner_enqueue<CanAlloc>(token, std::move(item));
1018  }
1019 
1020  // Enqueues several items.
1021  // Allocates memory if required. Only fails if memory allocation fails (or
1022  // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1023  // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1024  // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
1025  // Thread-safe.
1026  template<typename It>
1027  bool enqueue_bulk(It itemFirst, size_t count) {
1028  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1029  else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1030  }
1031 
1032  // Enqueues several items using an explicit producer token.
1033  // Allocates memory if required. Only fails if memory allocation fails
1034  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1035  // Note: Use std::make_move_iterator if the elements should be moved
1036  // instead of copied.
1037  // Thread-safe.
1038  template<typename It>
1039  bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1040  return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1041  }
1042 
1043  // Enqueues a single item (by copying it).
1044  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
1045  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1046  // is 0).
1047  // Thread-safe.
1048  inline bool try_enqueue(T const &item) {
1049  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1050  else return inner_enqueue<CannotAlloc>(item);
1051  }
1052 
1053  // Enqueues a single item (by moving it, if possible).
1054  // Does not allocate memory (except for one-time implicit producer).
1055  // Fails if not enough room to enqueue (or implicit production is
1056  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1057  // Thread-safe.
1058  inline bool try_enqueue(T &&item) {
1059  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1060  else return inner_enqueue<CannotAlloc>(std::move(item));
1061  }
1062 
1063  // Enqueues a single item (by copying it) using an explicit producer token.
1064  // Does not allocate memory. Fails if not enough room to enqueue.
1065  // Thread-safe.
1066  inline bool try_enqueue(producer_token_t const &token, T const &item) {
1067  return inner_enqueue<CannotAlloc>(token, item);
1068  }
1069 
1070  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1071  // Does not allocate memory. Fails if not enough room to enqueue.
1072  // Thread-safe.
1073  inline bool try_enqueue(producer_token_t const &token, T &&item) {
1074  return inner_enqueue<CannotAlloc>(token, std::move(item));
1075  }
1076 
1077  // Enqueues several items.
1078  // Does not allocate memory (except for one-time implicit producer).
1079  // Fails if not enough room to enqueue (or implicit production is
1080  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1081  // Note: Use std::make_move_iterator if the elements should be moved
1082  // instead of copied.
1083  // Thread-safe.
1084  template<typename It>
1085  bool try_enqueue_bulk(It itemFirst, size_t count) {
1086  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1087  else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1088  }
1089 
1090  // Enqueues several items using an explicit producer token.
1091  // Does not allocate memory. Fails if not enough room to enqueue.
1092  // Note: Use std::make_move_iterator if the elements should be moved
1093  // instead of copied.
1094  // Thread-safe.
1095  template<typename It>
1096  bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1097  return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1098  }
1099 
1100  // Attempts to dequeue from the queue.
1101  // Returns false if all producer streams appeared empty at the time they
1102  // were checked (so, the queue is likely but not guaranteed to be empty).
1103  // Never allocates. Thread-safe.
1104  template<typename U>
1105  bool try_dequeue(U &item) {
1106  // Instead of simply trying each producer in turn (which could cause needless contention on the first
1107  // producer), we score them heuristically.
1108  size_t nonEmptyCount = 0;
1109  ProducerBase *best = nullptr;
1110  size_t bestSize = 0;
1111  for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr;
1112  ptr = ptr->next_prod()) {
1113  auto size = ptr->size_approx();
1114  if (size > 0) {
1115  if (size > bestSize) {
1116  bestSize = size;
1117  best = ptr;
1118  }
1119  ++nonEmptyCount;
1120  }
1121  }
1122 
1123  // If there was at least one non-empty queue but it appears empty at the time
1124  // we try to dequeue from it, we need to make sure every queue's been tried
1125  if (nonEmptyCount > 0) {
1126  if ((details::likely)(best->dequeue(item))) {
1127  return true;
1128  }
1129  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1130  if (ptr != best && ptr->dequeue(item)) {
1131  return true;
1132  }
1133  }
1134  }
1135  return false;
1136  }
1137 
1138  // Attempts to dequeue from the queue.
1139  // Returns false if all producer streams appeared empty at the time they
1140  // were checked (so, the queue is likely but not guaranteed to be empty).
1141  // This differs from the try_dequeue(item) method in that this one does
1142  // not attempt to reduce contention by interleaving the order that producer
1143  // streams are dequeued from. So, using this method can reduce overall throughput
1144  // under contention, but will give more predictable results in single-threaded
1145  // consumer scenarios. This is mostly only useful for internal unit tests.
1146  // Never allocates. Thread-safe.
1147  template<typename U>
1148  bool try_dequeue_non_interleaved(U &item) {
1149  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1150  if (ptr->dequeue(item)) {
1151  return true;
1152  }
1153  }
1154  return false;
1155  }
1156 
1157  // Attempts to dequeue from the queue using an explicit consumer token.
1158  // Returns false if all producer streams appeared empty at the time they
1159  // were checked (so, the queue is likely but not guaranteed to be empty).
1160  // Never allocates. Thread-safe.
1161  template<typename U>
1162  bool try_dequeue(consumer_token_t &token, U &item) {
1163  // The idea is roughly as follows:
1164  // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1165  // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1166  // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1167  // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1168 
1169  if (token.desiredProducer == nullptr
1170  || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1171  if (!update_current_producer_after_rotation(token)) {
1172  return false;
1173  }
1174  }
1175 
1176  // If there was at least one non-empty queue but it appears empty at the time
1177  // we try to dequeue from it, we need to make sure every queue's been tried
1178  if (static_cast<ProducerBase *>(token.currentProducer)->dequeue(item)) {
1179  if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1180  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1181  }
1182  return true;
1183  }
1184 
1185  auto tail = producerListTail.load(std::memory_order_acquire);
1186  auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1187  if (ptr == nullptr) {
1188  ptr = tail;
1189  }
1190  while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1191  if (ptr->dequeue(item)) {
1192  token.currentProducer = ptr;
1193  token.itemsConsumedFromCurrent = 1;
1194  return true;
1195  }
1196  ptr = ptr->next_prod();
1197  if (ptr == nullptr) {
1198  ptr = tail;
1199  }
1200  }
1201  return false;
1202  }
1203 
1204  // Attempts to dequeue several elements from the queue.
1205  // Returns the number of items actually dequeued.
1206  // Returns 0 if all producer streams appeared empty at the time they
1207  // were checked (so, the queue is likely but not guaranteed to be empty).
1208  // Never allocates. Thread-safe.
1209  template<typename It>
1210  size_t try_dequeue_bulk(It itemFirst, size_t max) {
1211  size_t count = 0;
1212  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1213  count += ptr->dequeue_bulk(itemFirst, max - count);
1214  if (count == max) {
1215  break;
1216  }
1217  }
1218  return count;
1219  }
1220 
1221  // Attempts to dequeue several elements from the queue using an explicit consumer token.
1222  // Returns the number of items actually dequeued.
1223  // Returns 0 if all producer streams appeared empty at the time they
1224  // were checked (so, the queue is likely but not guaranteed to be empty).
1225  // Never allocates. Thread-safe.
1226  template<typename It>
1227  size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max) {
1228  if (token.desiredProducer == nullptr
1229  || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1230  if (!update_current_producer_after_rotation(token)) {
1231  return 0;
1232  }
1233  }
1234 
1235  size_t count = static_cast<ProducerBase *>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1236  if (count == max) {
1237  if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max))
1238  >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1239  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1240  }
1241  return max;
1242  }
1243  token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1244  max -= count;
1245 
1246  auto tail = producerListTail.load(std::memory_order_acquire);
1247  auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1248  if (ptr == nullptr) {
1249  ptr = tail;
1250  }
1251  while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1252  auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1253  count += dequeued;
1254  if (dequeued != 0) {
1255  token.currentProducer = ptr;
1256  token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1257  }
1258  if (dequeued == max) {
1259  break;
1260  }
1261  max -= dequeued;
1262  ptr = ptr->next_prod();
1263  if (ptr == nullptr) {
1264  ptr = tail;
1265  }
1266  }
1267  return count;
1268  }
1269 
1270  // Attempts to dequeue from a specific producer's inner queue.
1271  // If you happen to know which producer you want to dequeue from, this
1272  // is significantly faster than using the general-case try_dequeue methods.
1273  // Returns false if the producer's queue appeared empty at the time it
1274  // was checked (so, the queue is likely but not guaranteed to be empty).
1275  // Never allocates. Thread-safe.
1276  template<typename U>
1277  inline bool try_dequeue_from_producer(producer_token_t const &producer, U &item) {
1278  return static_cast<ExplicitProducer *>(producer.producer)->dequeue(item);
1279  }
1280 
1281  // Attempts to dequeue several elements from a specific producer's inner queue.
1282  // Returns the number of items actually dequeued.
1283  // If you happen to know which producer you want to dequeue from, this
1284  // is significantly faster than using the general-case try_dequeue methods.
1285  // Returns 0 if the producer's queue appeared empty at the time it
1286  // was checked (so, the queue is likely but not guaranteed to be empty).
1287  // Never allocates. Thread-safe.
1288  template<typename It>
1289  inline size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max) {
1290  return static_cast<ExplicitProducer *>(producer.producer)->dequeue_bulk(itemFirst, max);
1291  }
1292 
1293  // Returns an estimate of the total number of elements currently in the queue. This
1294  // estimate is only accurate if the queue has completely stabilized before it is called
1295  // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1296  // visible on the calling thread, and no further operations start while this method is
1297  // being called).
1298  // Thread-safe.
1299  size_t size_approx() const {
1300  size_t size = 0;
1301  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1302  size += ptr->size_approx();
1303  }
1304  return size;
1305  }
1306 
1307  // Returns true if the underlying atomic variables used by
1308  // the queue are lock-free (they should be on most platforms).
1309  // Thread-safe.
1310  static constexpr bool is_lock_free() {
1311  return
1318  == 2;
1319  }
1320 
1321  private:
1322  friend struct ProducerToken;
1323  friend struct ConsumerToken;
1324  struct ExplicitProducer;
1325  friend struct ExplicitProducer;
1326  struct ImplicitProducer;
1327  friend struct ImplicitProducer;
1328  friend class ConcurrentQueueTests;
1329 
1330  enum AllocationMode { CanAlloc, CannotAlloc };
1331 
1332 
1334  // Queue methods
1336 
1337  template<AllocationMode canAlloc, typename U>
1338  inline bool inner_enqueue(producer_token_t const &token, U &&element) {
1339  return static_cast<ExplicitProducer *>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1340  std::forward<U>(element));
1341  }
1342 
1343  template<AllocationMode canAlloc, typename U>
1344  inline bool inner_enqueue(U &&element) {
1345  auto producer = get_or_add_implicit_producer();
1346  return producer == nullptr ? false
1347  : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(
1348  element));
1349  }
1350 
1351  template<AllocationMode canAlloc, typename It>
1352  inline bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1353  return static_cast<ExplicitProducer *>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<
1354  canAlloc>(itemFirst, count);
1355  }
1356 
1357  template<AllocationMode canAlloc, typename It>
1358  inline bool inner_enqueue_bulk(It itemFirst, size_t count) {
1359  auto producer = get_or_add_implicit_producer();
1360  return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(
1361  itemFirst,
1362  count);
1363  }
1364 
1365  inline bool update_current_producer_after_rotation(consumer_token_t &token) {
1366  // Ah, there's been a rotation, figure out where we should be!
1367  auto tail = producerListTail.load(std::memory_order_acquire);
1368  if (token.desiredProducer == nullptr && tail == nullptr) {
1369  return false;
1370  }
1371  auto prodCount = producerCount.load(std::memory_order_relaxed);
1372  auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1373  if ((details::unlikely)(token.desiredProducer == nullptr)) {
1374  // Aha, first time we're dequeueing anything.
1375  // Figure out our local position
1376  // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1377  std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1378  token.desiredProducer = tail;
1379  for (std::uint32_t i = 0; i != offset; ++i) {
1380  token.desiredProducer = static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1381  if (token.desiredProducer == nullptr) {
1382  token.desiredProducer = tail;
1383  }
1384  }
1385  }
1386 
1387  std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1388  if (delta >= prodCount) {
1389  delta = delta % prodCount;
1390  }
1391  for (std::uint32_t i = 0; i != delta; ++i) {
1392  token.desiredProducer = static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1393  if (token.desiredProducer == nullptr) {
1394  token.desiredProducer = tail;
1395  }
1396  }
1397 
1398  token.lastKnownGlobalOffset = globalOffset;
1399  token.currentProducer = token.desiredProducer;
1400  token.itemsConsumedFromCurrent = 0;
1401  return true;
1402  }
1403 
1404 
1406  // Free list
1408 
1409  template<typename N>
1410  struct FreeListNode {
1411  FreeListNode() : freeListRefs(0), freeListNext(nullptr) {}
1412 
1413  std::atomic<std::uint32_t> freeListRefs;
1414  std::atomic<N *> freeListNext;
1415  };
1416 
1417  // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1418  // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1419  // speedy under low contention.
1420  template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1421  struct FreeList {
1422  FreeList() : freeListHead(nullptr) {}
1423  FreeList(FreeList &&other)
1424  : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) {
1425  other.freeListHead.store(nullptr,
1426  std::memory_order_relaxed);
1427  }
1428  void swap(FreeList &other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1429 
1430  FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1431  FreeList &operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1432 
1433  inline void add(N *node) {
1434 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1435  debug::DebugLock lock(mutex);
1436 #endif
1437  // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1438  // set it using a fetch_add
1439  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1440  // Oh look! We were the last ones referencing this node, and we know
1441  // we want to add it to the free list, so let's do it!
1442  add_knowing_refcount_is_zero(node);
1443  }
1444  }
1445 
1446  inline N *try_get() {
1447 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1448  debug::DebugLock lock(mutex);
1449 #endif
1450  auto head = freeListHead.load(std::memory_order_acquire);
1451  while (head != nullptr) {
1452  auto prevHead = head;
1453  auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1454  if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs,
1455  refs + 1,
1456  std::memory_order_acquire,
1457  std::memory_order_relaxed)) {
1458  head = freeListHead.load(std::memory_order_acquire);
1459  continue;
1460  }
1461 
1462  // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1463  // next and not worry about it changing between now and the time we do the CAS
1464  auto next = head->freeListNext.load(std::memory_order_relaxed);
1465  if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1466  // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1467  // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1468  assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1469 
1470  // Decrease refcount twice, once for our ref, and once for the list's ref
1471  head->freeListRefs.fetch_sub(2, std::memory_order_release);
1472  return head;
1473  }
1474 
1475  // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1476  // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1477  // count decrement happens-after the CAS on the head.
1478  refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1479  if (refs == SHOULD_BE_ON_FREELIST + 1) {
1480  add_knowing_refcount_is_zero(prevHead);
1481  }
1482  }
1483 
1484  return nullptr;
1485  }
1486 
1487  // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1488  N *head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1489 
1490  private:
1491  inline void add_knowing_refcount_is_zero(N *node) {
1492  // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1493  // only one copy of this method per node at a time, i.e. the single thread case), then we know
1494  // we can safely change the next pointer of the node; however, once the refcount is back above
1495  // zero, then other threads could increase it (happens under heavy contention, when the refcount
1496  // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1497  // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1498  // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1499  // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1500  auto head = freeListHead.load(std::memory_order_relaxed);
1501  while (true) {
1502  node->freeListNext.store(head, std::memory_order_relaxed);
1503  node->freeListRefs.store(1, std::memory_order_release);
1504  if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1505  // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1506  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1507  continue;
1508  }
1509  }
1510  return;
1511  }
1512  }
1513 
1514  private:
1515  // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1516  std::atomic<N *> freeListHead;
1517 
1518  static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1519  static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1520 
1521 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1522  debug::DebugMutex mutex;
1523 #endif
1524  };
1525 
1526 
1528  // Block
1530 
1531  enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1532 
1533  struct Block {
1534  Block()
1535  : next(nullptr),
1536  elementsCompletelyDequeued(0),
1537  freeListRefs(0),
1538  freeListNext(nullptr),
1539  shouldBeOnFreeList(false),
1540  dynamicallyAllocated(true) {
1541 #ifdef MCDBGQ_TRACKMEM
1542  owner = nullptr;
1543 #endif
1544  }
1545 
1546  template<InnerQueueContext context>
1547  inline bool is_empty() const {
1548  MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1549  // Check flags
1550  for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1551  if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1552  return false;
1553  }
1554  }
1555 
1556  // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1557  std::atomic_thread_fence(std::memory_order_acquire);
1558  return true;
1559  } else {
1560  // Check counter
1561  if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1562  std::atomic_thread_fence(std::memory_order_acquire);
1563  return true;
1564  }
1565  assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1566  return false;
1567  }
1568  }
1569 
1570  // Returns true if the block is now empty (does not apply in explicit context)
1571  template<InnerQueueContext context>
1572  inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i) {
1573  MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1574  // Set flag
1575  assert(!emptyFlags[BLOCK_SIZE - 1
1576  - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1577  emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true,
1578  std::memory_order_release);
1579  return false;
1580  } else {
1581  // Increment counter
1582  auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1583  assert(prevVal < BLOCK_SIZE);
1584  return prevVal == BLOCK_SIZE - 1;
1585  }
1586  }
1587 
1588  // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1589  // Returns true if the block is now empty (does not apply in explicit context).
1590  template<InnerQueueContext context>
1591  inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count) {
1592  MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1593  // Set flags
1594  std::atomic_thread_fence(std::memory_order_release);
1595  i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1596  for (size_t j = 0; j != count; ++j) {
1597  assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1598  emptyFlags[i + j].store(true, std::memory_order_relaxed);
1599  }
1600  return false;
1601  } else {
1602  // Increment counter
1603  auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1604  assert(prevVal + count <= BLOCK_SIZE);
1605  return prevVal + count == BLOCK_SIZE;
1606  }
1607  }
1608 
1609  template<InnerQueueContext context>
1610  inline void set_all_empty() {
1611  MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1612  // Set all flags
1613  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1614  emptyFlags[i].store(true, std::memory_order_relaxed);
1615  }
1616  } else {
1617  // Reset counter
1618  elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1619  }
1620  }
1621 
1622  template<InnerQueueContext context>
1623  inline void reset_empty() {
1624  MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1625  // Reset flags
1626  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1627  emptyFlags[i].store(false, std::memory_order_relaxed);
1628  }
1629  } else {
1630  // Reset counter
1631  elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1632  }
1633  }
1634 
1635  inline T *operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
1636  return static_cast<T *>(static_cast<void *>(elements))
1637  + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1638  }
1639  inline T const *operator[](index_t idx) const MOODYCAMEL_NOEXCEPT {
1640  return static_cast<T const *>(static_cast<void const *>(elements))
1641  + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1642  }
1643 
1644  private:
1645  static_assert(std::alignment_of<T>::value <= sizeof(T),
1646  "The queue does not support types with an alignment greater than their size at this time");
1647  MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T) elements;
1648  public:
1649  Block *next;
1650  std::atomic<size_t> elementsCompletelyDequeued;
1651  std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1652  public:
1653  std::atomic<std::uint32_t> freeListRefs;
1654  std::atomic<Block *> freeListNext;
1655  std::atomic<bool> shouldBeOnFreeList;
1656  bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1657 
1658 #ifdef MCDBGQ_TRACKMEM
1659  void* owner;
1660 #endif
1661  };
1662  static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value,
1663  "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1664 
1665 #ifdef MCDBGQ_TRACKMEM
1666  public:
1667  struct MemStats;
1668 private:
1669 #endif
1670 
1672  // Producer base
1674 
1675  struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase {
1676  ProducerBase(ConcurrentQueue *parent_, bool isExplicit_) :
1677  tailIndex(0),
1678  headIndex(0),
1679  dequeueOptimisticCount(0),
1680  dequeueOvercommit(0),
1681  tailBlock(nullptr),
1682  isExplicit(isExplicit_),
1683  parent(parent_) {
1684  }
1685 
1686  virtual ~ProducerBase() {}
1687 
1688  template<typename U>
1689  inline bool dequeue(U &element) {
1690  if (isExplicit) {
1691  return static_cast<ExplicitProducer *>(this)->dequeue(element);
1692  } else {
1693  return static_cast<ImplicitProducer *>(this)->dequeue(element);
1694  }
1695  }
1696 
1697  template<typename It>
1698  inline size_t dequeue_bulk(It &itemFirst, size_t max) {
1699  if (isExplicit) {
1700  return static_cast<ExplicitProducer *>(this)->dequeue_bulk(itemFirst, max);
1701  } else {
1702  return static_cast<ImplicitProducer *>(this)->dequeue_bulk(itemFirst, max);
1703  }
1704  }
1705 
1706  inline ProducerBase *next_prod() const { return static_cast<ProducerBase *>(next); }
1707 
1708  inline size_t size_approx() const {
1709  auto tail = tailIndex.load(std::memory_order_relaxed);
1710  auto head = headIndex.load(std::memory_order_relaxed);
1711  return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1712  }
1713 
1714  inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1715  protected:
1716  std::atomic<index_t> tailIndex; // Where to enqueue to next
1717  std::atomic<index_t> headIndex; // Where to dequeue from next
1718 
1719  std::atomic<index_t> dequeueOptimisticCount;
1720  std::atomic<index_t> dequeueOvercommit;
1721 
1722  Block *tailBlock;
1723 
1724  public:
1725  bool isExplicit;
1726  ConcurrentQueue *parent;
1727 
1728  protected:
1729 #ifdef MCDBGQ_TRACKMEM
1730  friend struct MemStats;
1731 #endif
1732  };
1733 
1734 
1736  // Explicit queue
1738 
1739  struct ExplicitProducer : public ProducerBase {
1740  explicit ExplicitProducer(ConcurrentQueue *parent_) :
1741  ProducerBase(parent_, true),
1742  blockIndex(nullptr),
1743  pr_blockIndexSlotsUsed(0),
1744  pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1745  pr_blockIndexFront(0),
1746  pr_blockIndexEntries(nullptr),
1747  pr_blockIndexRaw(nullptr) {
1748  size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1749  if (poolBasedIndexSize > pr_blockIndexSize) {
1750  pr_blockIndexSize = poolBasedIndexSize;
1751  }
1752 
1753  new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1754  }
1755 
1756  ~ExplicitProducer() {
1757  // Destruct any elements not yet dequeued.
1758  // Since we're in the destructor, we can assume all elements
1759  // are either completely dequeued or completely not (no halfways).
1760  if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1761  // First find the block that's partially dequeued, if any
1762  Block *halfDequeuedBlock = nullptr;
1763  if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1764  // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1765  // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1766  size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1767  while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1768  this->headIndex.load(std::memory_order_relaxed))) {
1769  i = (i + 1) & (pr_blockIndexSize - 1);
1770  }
1771  assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1772  this->headIndex.load(std::memory_order_relaxed)));
1773  halfDequeuedBlock = pr_blockIndexEntries[i].block;
1774  }
1775 
1776  // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1777  auto block = this->tailBlock;
1778  do {
1779  block = block->next;
1780  if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1781  continue;
1782  }
1783 
1784  size_t i = 0; // Offset into block
1785  if (block == halfDequeuedBlock) {
1786  i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed)
1787  & static_cast<index_t>(BLOCK_SIZE - 1));
1788  }
1789 
1790  // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1791  auto lastValidIndex =
1792  (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE
1793  : static_cast<size_t>(
1794  this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1795  while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1796  (*block)[i++]->~T();
1797  }
1798  } while (block != this->tailBlock);
1799  }
1800 
1801  // Destroy all blocks that we own
1802  if (this->tailBlock != nullptr) {
1803  auto block = this->tailBlock;
1804  do {
1805  auto nextBlock = block->next;
1806  if (block->dynamicallyAllocated) {
1807  destroy(block);
1808  } else {
1809  this->parent->add_block_to_free_list(block);
1810  }
1811  block = nextBlock;
1812  } while (block != this->tailBlock);
1813  }
1814 
1815  // Destroy the block indices
1816  auto header = static_cast<BlockIndexHeader *>(pr_blockIndexRaw);
1817  while (header != nullptr) {
1818  auto prev = static_cast<BlockIndexHeader *>(header->prev);
1819  header->~BlockIndexHeader();
1820  (Traits::free)(header);
1821  header = prev;
1822  }
1823  }
1824 
1825  template<AllocationMode allocMode, typename U>
1826  inline bool enqueue(U &&element) {
1827  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1828  index_t newTailIndex = 1 + currentTailIndex;
1829  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1830  // We reached the end of a block, start a new one
1831  auto startBlock = this->tailBlock;
1832  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1833  if (this->tailBlock != nullptr
1834  && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1835  // We can re-use the block ahead of us, it's empty!
1836  this->tailBlock = this->tailBlock->next;
1837  this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1838 
1839  // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1840  // last block from it first -- except instead of removing then adding, we can just overwrite).
1841  // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1842  // it would have been re-attempted when adding the first block to the queue; since there is such
1843  // a block, a block index must have been successfully allocated.
1844  } else {
1845  // Whatever head value we see here is >= the last value we saw here (relatively),
1846  // and <= its current value. Since we have the most recent tail, the head must be
1847  // <= to it.
1848  auto head = this->headIndex.load(std::memory_order_relaxed);
1849  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1850  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1851  || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value
1852  && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1853  // We can't enqueue in another block because there's not enough leeway -- the
1854  // tail could surpass the head by the time the block fills up! (Or we'll exceed
1855  // the size limit, if the second part of the condition was true.)
1856  return false;
1857  }
1858  // We're going to need a new block; check that the block index has room
1859  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1860  // Hmm, the circular block index is already full -- we'll need
1861  // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1862  // the initial allocation failed in the constructor.
1863 
1864  MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
1865  return false;
1866  } else if (!new_block_index(pr_blockIndexSlotsUsed)) {
1867  return false;
1868  }
1869  }
1870 
1871  // Insert a new block in the circular linked list
1872  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1873  if (newBlock == nullptr) {
1874  return false;
1875  }
1876 #ifdef MCDBGQ_TRACKMEM
1877  newBlock->owner = this;
1878 #endif
1879  newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1880  if (this->tailBlock == nullptr) {
1881  newBlock->next = newBlock;
1882  } else {
1883  newBlock->next = this->tailBlock->next;
1884  this->tailBlock->next = newBlock;
1885  }
1886  this->tailBlock = newBlock;
1887  ++pr_blockIndexSlotsUsed;
1888  }
1889 
1890  MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
1891  U,
1892  new(static_cast<T *>(nullptr)) T(std::forward<U>(element)))) {
1893  // The constructor may throw. We want the element not to appear in the queue in
1894  // that case (without corrupting the queue):
1895  MOODYCAMEL_TRY {
1896  new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1897  }
1898  MOODYCAMEL_CATCH (...) {
1899  // Revert change to the current block, but leave the new block available
1900  // for next time
1901  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1902  this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1903  MOODYCAMEL_RETHROW;
1904  }
1905  } else {
1906  (void) startBlock;
1907  (void) originalBlockIndexSlotsUsed;
1908  }
1909 
1910  // Add block to block index
1911  auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1912  entry.base = currentTailIndex;
1913  entry.block = this->tailBlock;
1914  blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1915  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1916 
1917  MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
1918  U,
1919  new(static_cast<T *>(nullptr)) T(std::forward<U>(element)))) {
1920  this->tailIndex.store(newTailIndex, std::memory_order_release);
1921  return true;
1922  }
1923  }
1924 
1925  // Enqueue
1926  new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1927 
1928  this->tailIndex.store(newTailIndex, std::memory_order_release);
1929  return true;
1930  }
1931 
1932  template<typename U>
1933  bool dequeue(U &element) {
1934  auto tail = this->tailIndex.load(std::memory_order_relaxed);
1935  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1936  if (details::circular_less_than<index_t>(
1937  this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit,
1938  tail)) {
1939  // Might be something to dequeue, let's give it a try
1940 
1941  // Note that this if is purely for performance purposes in the common case when the queue is
1942  // empty and the values are eventually consistent -- we may enter here spuriously.
1943 
1944  // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1945  // change them) and must be the same value at this point (inside the if) as when the if condition was
1946  // evaluated.
1947 
1948  // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1949  // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1950  // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1951  // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1952  // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1953  // unfortunately that can't be shown to be correct using only the C++11 standard.
1954  // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1955  std::atomic_thread_fence(std::memory_order_acquire);
1956 
1957  // Increment optimistic counter, then check if it went over the boundary
1958  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1959 
1960  // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1961  // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1962  // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1963  // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1964  // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
1965  // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
1966 
1967  // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1968  // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1969  // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1970  tail = this->tailIndex.load(std::memory_order_acquire);
1971  if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1972  // Guaranteed to be at least one element to dequeue!
1973 
1974  // Get the index. Note that since there's guaranteed to be at least one element, this
1975  // will never exceed tail. We need to do an acquire-release fence here since it's possible
1976  // that whatever condition got us to this point was for an earlier enqueued element (that
1977  // we already see the memory effects for), but that by the time we increment somebody else
1978  // has incremented it, and we need to see the memory effects for *that* element, which is
1979  // in such a case is necessarily visible on the thread that incremented it in the first
1980  // place with the more current condition (they must have acquired a tail that is at least
1981  // as recent).
1982  auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1983 
1984 
1985  // Determine which block the element is in
1986 
1987  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1988  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1989 
1990  // We need to be careful here about subtracting and dividing because of index wrap-around.
1991  // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1992  // block size (in order to get a correct signed block count offset in all cases):
1993  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1994  auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1995  auto offset =
1996  static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase)
1997  / BLOCK_SIZE);
1998  auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
1999 
2000  // Dequeue
2001  auto &el = *((*block)[index]);
2002  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2003  // Make sure the element is still fully dequeued and destroyed even if the assignment
2004  // throws
2005  struct Guard {
2006  Block *block;
2007  index_t index;
2008 
2009  ~Guard() {
2010  (*block)[index]->~T();
2011  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2012  }
2013  } guard = {block, index};
2014 
2015  element = std::move(el); // NOLINT
2016  } else {
2017  element = std::move(el); // NOLINT
2018  el.~T(); // NOLINT
2019  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2020  }
2021 
2022  return true;
2023  } else {
2024  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2025  this->dequeueOvercommit.fetch_add(1,
2026  std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
2027  }
2028  }
2029 
2030  return false;
2031  }
2032 
2033  template<AllocationMode allocMode, typename It>
2034  bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count) {
2035  // First, we need to make sure we have enough room to enqueue all of the elements;
2036  // this means pre-allocating blocks and putting them in the block index (but only if
2037  // all the allocations succeeded).
2038  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2039  auto startBlock = this->tailBlock;
2040  auto originalBlockIndexFront = pr_blockIndexFront;
2041  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2042 
2043  Block *firstAllocatedBlock = nullptr;
2044 
2045  // Figure out how many blocks we'll need to allocate, and do so
2046  size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1))
2047  - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2048  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2049  if (blockBaseDiff > 0) {
2050  // Allocate as many blocks as possible from ahead
2051  while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock
2052  && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2053  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2054  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2055 
2056  this->tailBlock = this->tailBlock->next;
2057  firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2058 
2059  auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2060  entry.base = currentTailIndex;
2061  entry.block = this->tailBlock;
2062  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2063  }
2064 
2065  // Now allocate as many blocks as necessary from the block pool
2066  while (blockBaseDiff > 0) {
2067  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2068  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2069 
2070  auto head = this->headIndex.load(std::memory_order_relaxed);
2071  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2072  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2073  || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value
2074  && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2075  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2076  MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2077  // Failed to allocate, undo changes (but keep injected blocks)
2078  pr_blockIndexFront = originalBlockIndexFront;
2079  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2080  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2081  return false;
2082  } else if (full || !new_block_index(originalBlockIndexSlotsUsed)) {
2083  // Failed to allocate, undo changes (but keep injected blocks)
2084  pr_blockIndexFront = originalBlockIndexFront;
2085  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2086  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2087  return false;
2088  }
2089 
2090  // pr_blockIndexFront is updated inside new_block_index, so we need to
2091  // update our fallback value too (since we keep the new index even if we
2092  // later fail)
2093  originalBlockIndexFront = originalBlockIndexSlotsUsed;
2094  }
2095 
2096  // Insert a new block in the circular linked list
2097  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2098  if (newBlock == nullptr) {
2099  pr_blockIndexFront = originalBlockIndexFront;
2100  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2101  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2102  return false;
2103  }
2104 
2105 #ifdef MCDBGQ_TRACKMEM
2106  newBlock->owner = this;
2107 #endif
2108  newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2109  if (this->tailBlock == nullptr) {
2110  newBlock->next = newBlock;
2111  } else {
2112  newBlock->next = this->tailBlock->next;
2113  this->tailBlock->next = newBlock;
2114  }
2115  this->tailBlock = newBlock;
2116  firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2117 
2118  ++pr_blockIndexSlotsUsed;
2119 
2120  auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2121  entry.base = currentTailIndex;
2122  entry.block = this->tailBlock;
2123  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2124  }
2125 
2126  // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2127  // publish the new block index front
2128  auto block = firstAllocatedBlock;
2129  while (true) {
2130  block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2131  if (block == this->tailBlock) {
2132  break;
2133  }
2134  block = block->next;
2135  }
2136 
2137  MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T,
2138  decltype(*itemFirst),
2139  new(static_cast<T *>(nullptr)) T(details::deref_noexcept(
2140  itemFirst)))) {
2141  blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2142  std::memory_order_release);
2143  }
2144  }
2145 
2146  // Enqueue, one block at a time
2147  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2148  currentTailIndex = startTailIndex;
2149  auto endBlock = this->tailBlock;
2150  this->tailBlock = startBlock;
2151  assert(
2152  (startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2153  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2154  this->tailBlock = firstAllocatedBlock;
2155  }
2156  while (true) {
2157  index_t
2158  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2159  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2160  stopIndex = newTailIndex;
2161  }
2162  MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T,
2163  decltype(*itemFirst),
2164  new(static_cast<T *>(nullptr)) T(details::deref_noexcept(
2165  itemFirst)))) {
2166  while (currentTailIndex != stopIndex) {
2167  new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2168  }
2169  } else {
2170  MOODYCAMEL_TRY {
2171  while (currentTailIndex != stopIndex) {
2172  // Must use copy constructor even if move constructor is available
2173  // because we may have to revert if there's an exception.
2174  // Sorry about the horrible templated next line, but it was the only way
2175  // to disable moving *at compile time*, which is important because a type
2176  // may only define a (noexcept) move constructor, and so calls to the
2177  // cctor will not compile, even if they are in an if branch that will never
2178  // be executed
2179  new((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T,
2180  decltype(*itemFirst),
2181  new(static_cast<T *>(nullptr)) T(
2182  details::deref_noexcept(
2183  itemFirst)))>::eval(
2184  *itemFirst));
2185  ++currentTailIndex;
2186  ++itemFirst;
2187  }
2188  }
2189  MOODYCAMEL_CATCH (...) {
2190  // Oh dear, an exception's been thrown -- destroy the elements that
2191  // were enqueued so far and revert the entire bulk operation (we'll keep
2192  // any allocated blocks in our linked list for later, though).
2193  auto constructedStopIndex = currentTailIndex;
2194  auto lastBlockEnqueued = this->tailBlock;
2195 
2196  pr_blockIndexFront = originalBlockIndexFront;
2197  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2198  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2199 
2201  auto block = startBlock;
2202  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2203  block = firstAllocatedBlock;
2204  }
2205  currentTailIndex = startTailIndex;
2206  while (true) {
2207  stopIndex =
2208  (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2209  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2210  stopIndex = constructedStopIndex;
2211  }
2212  while (currentTailIndex != stopIndex) {
2213  (*block)[currentTailIndex++]->~T();
2214  }
2215  if (block == lastBlockEnqueued) {
2216  break;
2217  }
2218  block = block->next;
2219  }
2220  }
2221  MOODYCAMEL_RETHROW;
2222  }
2223  }
2224 
2225  if (this->tailBlock == endBlock) {
2226  assert(currentTailIndex == newTailIndex);
2227  break;
2228  }
2229  this->tailBlock = this->tailBlock->next;
2230  }
2231 
2232  MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
2233  decltype(*itemFirst),
2234  new(static_cast<T *>(nullptr)) T(details::deref_noexcept(
2235  itemFirst)))) {
2236  if (firstAllocatedBlock != nullptr)
2237  blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2238  std::memory_order_release);
2239  }
2240 
2241  this->tailIndex.store(newTailIndex, std::memory_order_release);
2242  return true;
2243  }
2244 
2245  template<typename It>
2246  size_t dequeue_bulk(It &itemFirst, size_t max) {
2247  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2248  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2249  auto desiredCount =
2250  static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2251  if (details::circular_less_than<size_t>(0, desiredCount)) {
2252  desiredCount = desiredCount < max ? desiredCount : max;
2253  std::atomic_thread_fence(std::memory_order_acquire);
2254 
2255  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2256 
2257  tail = this->tailIndex.load(std::memory_order_acquire);
2258  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2259  if (details::circular_less_than<size_t>(0, actualCount)) {
2260  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2261  if (actualCount < desiredCount) {
2262  this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2263  }
2264 
2265  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2266  // will never exceed tail.
2267  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2268 
2269  // Determine which block the first element is in
2270  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2271  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2272 
2273  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2274  auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2275  auto offset =
2276  static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase)
2277  / BLOCK_SIZE);
2278  auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2279 
2280  // Iterate the blocks and dequeue
2281  auto index = firstIndex;
2282  do {
2283  auto firstIndexInBlock = index;
2284  index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2285  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ?
2286  firstIndex + static_cast<index_t>(actualCount) : endIndex;
2287  auto block = localBlockIndex->entries[indexIndex].block;
2288  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T,
2289  T &&,
2290  details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2291  while (index != endIndex) {
2292  auto &el = *((*block)[index]);
2293  *itemFirst++ = std::move(el);
2294  el.~T();
2295  ++index;
2296  }
2297  } else {
2298  MOODYCAMEL_TRY {
2299  while (index != endIndex) {
2300  auto &el = *((*block)[index]);
2301  *itemFirst = std::move(el);
2302  ++itemFirst;
2303  el.~T();
2304  ++index;
2305  }
2306  }
2307  MOODYCAMEL_CATCH (...) {
2308  // It's too late to revert the dequeue, but we can make sure that all
2309  // the dequeued objects are properly destroyed and the block index
2310  // (and empty count) are properly updated before we propagate the exception
2311  do {
2312  block = localBlockIndex->entries[indexIndex].block;
2313  while (index != endIndex) {
2314  (*block)[index++]->~T();
2315  }
2316  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
2317  static_cast<size_t>(endIndex
2318  - firstIndexInBlock));
2319  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2320 
2321  firstIndexInBlock = index;
2322  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2323  endIndex =
2324  details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ?
2325  firstIndex + static_cast<index_t>(actualCount) : endIndex;
2326  } while (index != firstIndex + actualCount);
2327 
2328  MOODYCAMEL_RETHROW;
2329  }
2330  }
2331  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
2332  static_cast<size_t>(endIndex
2333  - firstIndexInBlock));
2334  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2335  } while (index != firstIndex + actualCount);
2336 
2337  return actualCount;
2338  } else {
2339  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2340  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2341  }
2342  }
2343 
2344  return 0;
2345  }
2346 
2347  private:
2348  struct BlockIndexEntry {
2349  index_t base;
2350  Block *block;
2351  };
2352 
2353  struct BlockIndexHeader {
2354  size_t size;
2355  std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2356  BlockIndexEntry *entries;
2357  void *prev;
2358  };
2359 
2360  bool new_block_index(size_t numberOfFilledSlotsToExpose) {
2361  auto prevBlockSizeMask = pr_blockIndexSize - 1;
2362 
2363  // Create the new block
2364  pr_blockIndexSize <<= 1;
2365  auto newRawPtr = static_cast<char *>((Traits::malloc)(
2366  sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1
2367  + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2368  if (newRawPtr == nullptr) {
2369  pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2370  return false;
2371  }
2372 
2373  auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry *>(details::align_for<BlockIndexEntry>(
2374  newRawPtr + sizeof(BlockIndexHeader)));
2375 
2376  // Copy in all the old indices, if any
2377  size_t j = 0;
2378  if (pr_blockIndexSlotsUsed != 0) {
2379  auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2380  do {
2381  newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2382  i = (i + 1) & prevBlockSizeMask;
2383  } while (i != pr_blockIndexFront);
2384  }
2385 
2386  // Update everything
2387  auto header = new(newRawPtr) BlockIndexHeader;
2388  header->size = pr_blockIndexSize;
2389  header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2390  header->entries = newBlockIndexEntries;
2391  header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2392 
2393  pr_blockIndexFront = j;
2394  pr_blockIndexEntries = newBlockIndexEntries;
2395  pr_blockIndexRaw = newRawPtr;
2396  blockIndex.store(header, std::memory_order_release);
2397 
2398  return true;
2399  }
2400 
2401  private:
2402  std::atomic<BlockIndexHeader *> blockIndex;
2403 
2404  // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2405  size_t pr_blockIndexSlotsUsed;
2406  size_t pr_blockIndexSize;
2407  size_t pr_blockIndexFront; // Next slot (not current)
2408  BlockIndexEntry *pr_blockIndexEntries;
2409  void *pr_blockIndexRaw;
2410 
2411 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2412  public:
2413  ExplicitProducer* nextExplicitProducer;
2414  private:
2415 #endif
2416 
2417 #ifdef MCDBGQ_TRACKMEM
2418  friend struct MemStats;
2419 #endif
2420  };
2421 
2422 
2424  // Implicit queue
2426 
2427  struct ImplicitProducer : public ProducerBase {
2428  ImplicitProducer(ConcurrentQueue *parent_) :
2429  ProducerBase(parent_, false),
2430  nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2431  blockIndex(nullptr) {
2432  new_block_index();
2433  }
2434 
2435  ~ImplicitProducer() {
2436  // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2437  // completed already; this means that all undequeued elements are placed contiguously across
2438  // contiguous blocks, and that only the first and last remaining blocks can be only partially
2439  // empty (all other remaining blocks must be completely full).
2440 
2441 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2442  // Unregister ourselves for thread termination notification
2443  if (!this->inactive.load(std::memory_order_relaxed)) {
2444  details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2445  }
2446 #endif
2447 
2448  // Destroy all remaining elements!
2449  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2450  auto index = this->headIndex.load(std::memory_order_relaxed);
2451  Block *block = nullptr;
2452  assert(index == tail || details::circular_less_than(index, tail));
2453  bool forceFreeLastBlock =
2454  index != tail; // If we enter the loop, then the last (tail) block will not be freed
2455  while (index != tail) {
2456  if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2457  if (block != nullptr) {
2458  // Free the old block
2459  this->parent->add_block_to_free_list(block);
2460  }
2461 
2462  block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2463  }
2464 
2465  ((*block)[index])->~T();
2466  ++index;
2467  }
2468  // Even if the queue is empty, there's still one block that's not on the free list
2469  // (unless the head index reached the end of it, in which case the tail will be poised
2470  // to create a new block).
2471  if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2472  this->parent->add_block_to_free_list(this->tailBlock);
2473  }
2474 
2475  // Destroy block index
2476  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2477  if (localBlockIndex != nullptr) {
2478  for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2479  localBlockIndex->index[i]->~BlockIndexEntry();
2480  }
2481  do {
2482  auto prev = localBlockIndex->prev;
2483  localBlockIndex->~BlockIndexHeader();
2484  (Traits::free)(localBlockIndex);
2485  localBlockIndex = prev;
2486  } while (localBlockIndex != nullptr);
2487  }
2488  }
2489 
2490  template<AllocationMode allocMode, typename U>
2491  inline bool enqueue(U &&element) {
2492  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2493  index_t newTailIndex = 1 + currentTailIndex;
2494  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2495  // We reached the end of a block, start a new one
2496  auto head = this->headIndex.load(std::memory_order_relaxed);
2497  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2498  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2499  || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value
2500  && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2501  return false;
2502  }
2503 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2504  debug::DebugLock lock(mutex);
2505 #endif
2506  // Find out where we'll be inserting this block in the block index
2507  BlockIndexEntry *idxEntry;
2508  if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2509  return false;
2510  }
2511 
2512  // Get ahold of a new block
2513  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2514  if (newBlock == nullptr) {
2515  rewind_block_index_tail();
2516  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2517  return false;
2518  }
2519 #ifdef MCDBGQ_TRACKMEM
2520  newBlock->owner = this;
2521 #endif
2522  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2523 
2524  MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
2525  U,
2526  new(static_cast<T *>(nullptr)) T(std::forward<U>(element)))) {
2527  // May throw, try to insert now before we publish the fact that we have this new block
2528  MOODYCAMEL_TRY {
2529  new((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2530  }
2531  MOODYCAMEL_CATCH (...) {
2532  rewind_block_index_tail();
2533  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2534  this->parent->add_block_to_free_list(newBlock);
2535  MOODYCAMEL_RETHROW;
2536  }
2537  }
2538 
2539  // Insert the new block into the index
2540  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2541 
2542  this->tailBlock = newBlock;
2543 
2544  MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
2545  U,
2546  new(static_cast<T *>(nullptr)) T(std::forward<U>(element)))) {
2547  this->tailIndex.store(newTailIndex, std::memory_order_release);
2548  return true;
2549  }
2550  }
2551 
2552  // Enqueue
2553  new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2554 
2555  this->tailIndex.store(newTailIndex, std::memory_order_release);
2556  return true;
2557  }
2558 
2559  template<typename U>
2560  bool dequeue(U &element) {
2561  // See ExplicitProducer::dequeue for rationale and explanation
2562  index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2563  index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2564  if (details::circular_less_than<index_t>(
2565  this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit,
2566  tail)) {
2567  std::atomic_thread_fence(std::memory_order_acquire);
2568 
2569  index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2570  tail = this->tailIndex.load(std::memory_order_acquire);
2571  if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2572  index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2573 
2574  // Determine which block the element is in
2575  auto entry = get_block_index_entry_for_index(index);
2576 
2577  // Dequeue
2578  auto block = entry->value.load(std::memory_order_relaxed);
2579  auto &el = *((*block)[index]);
2580 
2581  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2582 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2583  // Note: Acquiring the mutex with every dequeue instead of only when a block
2584  // is released is very sub-optimal, but it is, after all, purely debug code.
2585  debug::DebugLock lock(producer->mutex);
2586 #endif
2587  struct Guard {
2588  Block *block;
2589  index_t index;
2590  BlockIndexEntry *entry;
2591  ConcurrentQueue *parent;
2592 
2593  ~Guard() {
2594  (*block)[index]->~T();
2595  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2596  entry->value.store(nullptr, std::memory_order_relaxed);
2597  parent->add_block_to_free_list(block);
2598  }
2599  }
2600  } guard = {block, index, entry, this->parent};
2601 
2602  element = std::move(el); // NOLINT
2603  } else {
2604  element = std::move(el); // NOLINT
2605  el.~T(); // NOLINT
2606 
2607  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2608  {
2609 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2610  debug::DebugLock lock(mutex);
2611 #endif
2612  // Add the block back into the global free pool (and remove from block index)
2613  entry->value.store(nullptr, std::memory_order_relaxed);
2614  }
2615  this->parent->add_block_to_free_list(block); // releases the above store
2616  }
2617  }
2618 
2619  return true;
2620  } else {
2621  this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2622  }
2623  }
2624 
2625  return false;
2626  }
2627 
2628 #ifdef _MSC_VER
2629  #pragma warning(push)
2630 #pragma warning(disable: 4706) // assignment within conditional expression
2631 #endif
2632  template<AllocationMode allocMode, typename It>
2633  bool enqueue_bulk(It itemFirst, size_t count) {
2634  // First, we need to make sure we have enough room to enqueue all of the elements;
2635  // this means pre-allocating blocks and putting them in the block index (but only if
2636  // all the allocations succeeded).
2637 
2638  // Note that the tailBlock we start off with may not be owned by us any more;
2639  // this happens if it was filled up exactly to the top (setting tailIndex to
2640  // the first index of the next block which is not yet allocated), then dequeued
2641  // completely (putting it on the free list) before we enqueue again.
2642 
2643  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2644  auto startBlock = this->tailBlock;
2645  Block *firstAllocatedBlock = nullptr;
2646  auto endBlock = this->tailBlock;
2647 
2648  // Figure out how many blocks we'll need to allocate, and do so
2649  size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1))
2650  - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2651  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2652  if (blockBaseDiff > 0) {
2653 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2654  debug::DebugLock lock(mutex);
2655 #endif
2656  do {
2657  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2658  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2659 
2660  // Find out where we'll be inserting this block in the block index
2661  BlockIndexEntry *idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2662  Block *newBlock;
2663  bool indexInserted = false;
2664  auto head = this->headIndex.load(std::memory_order_relaxed);
2665  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2666  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2667  || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value
2668  && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2669 
2670  if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex))
2671  || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2672  // Index allocation or block allocation failed; revert any other allocations
2673  // and index insertions done so far for this operation
2674  if (indexInserted) {
2675  rewind_block_index_tail();
2676  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2677  }
2678  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2679  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2680  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2681  idxEntry = get_block_index_entry_for_index(currentTailIndex);
2682  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2683  rewind_block_index_tail();
2684  }
2685  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2686  this->tailBlock = startBlock;
2687 
2688  return false;
2689  }
2690 
2691 #ifdef MCDBGQ_TRACKMEM
2692  newBlock->owner = this;
2693 #endif
2694  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2695  newBlock->next = nullptr;
2696 
2697  // Insert the new block into the index
2698  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2699 
2700  // Store the chain of blocks so that we can undo if later allocations fail,
2701  // and so that we can find the blocks when we do the actual enqueueing
2702  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2703  assert(this->tailBlock != nullptr);
2704  this->tailBlock->next = newBlock;
2705  }
2706  this->tailBlock = newBlock;
2707  endBlock = newBlock;
2708  firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2709  } while (blockBaseDiff > 0);
2710  }
2711 
2712  // Enqueue, one block at a time
2713  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2714  currentTailIndex = startTailIndex;
2715  this->tailBlock = startBlock;
2716  assert(
2717  (startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2718  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2719  this->tailBlock = firstAllocatedBlock;
2720  }
2721  while (true) {
2722  index_t
2723  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2724  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2725  stopIndex = newTailIndex;
2726  }
2727  MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T,
2728  decltype(*itemFirst),
2729  new(static_cast<T *>(nullptr)) T(details::deref_noexcept(
2730  itemFirst)))) {
2731  while (currentTailIndex != stopIndex) {
2732  new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2733  }
2734  } else {
2735  MOODYCAMEL_TRY {
2736  while (currentTailIndex != stopIndex) {
2737  new((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T,
2738  decltype(*itemFirst),
2739  new(static_cast<T *>(nullptr)) T(
2740  details::deref_noexcept(
2741  itemFirst)))>::eval(
2742  *itemFirst));
2743  ++currentTailIndex;
2744  ++itemFirst;
2745  }
2746  }
2747  MOODYCAMEL_CATCH (...) {
2748  auto constructedStopIndex = currentTailIndex;
2749  auto lastBlockEnqueued = this->tailBlock;
2750 
2752  auto block = startBlock;
2753  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2754  block = firstAllocatedBlock;
2755  }
2756  currentTailIndex = startTailIndex;
2757  while (true) {
2758  stopIndex =
2759  (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2760  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2761  stopIndex = constructedStopIndex;
2762  }
2763  while (currentTailIndex != stopIndex) {
2764  (*block)[currentTailIndex++]->~T();
2765  }
2766  if (block == lastBlockEnqueued) {
2767  break;
2768  }
2769  block = block->next;
2770  }
2771  }
2772 
2773  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2774  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2775  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2776  auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2777  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2778  rewind_block_index_tail();
2779  }
2780  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2781  this->tailBlock = startBlock;
2782  MOODYCAMEL_RETHROW;
2783  }
2784  }
2785 
2786  if (this->tailBlock == endBlock) {
2787  assert(currentTailIndex == newTailIndex);
2788  break;
2789  }
2790  this->tailBlock = this->tailBlock->next;
2791  }
2792  this->tailIndex.store(newTailIndex, std::memory_order_release);
2793  return true;
2794  }
2795 #ifdef _MSC_VER
2796 #pragma warning(pop)
2797 #endif
2798 
2799  template<typename It>
2800  size_t dequeue_bulk(It &itemFirst, size_t max) {
2801  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2802  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2803  auto desiredCount =
2804  static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2805  if (details::circular_less_than<size_t>(0, desiredCount)) {
2806  desiredCount = desiredCount < max ? desiredCount : max;
2807  std::atomic_thread_fence(std::memory_order_acquire);
2808 
2809  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2810 
2811  tail = this->tailIndex.load(std::memory_order_acquire);
2812  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2813  if (details::circular_less_than<size_t>(0, actualCount)) {
2814  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2815  if (actualCount < desiredCount) {
2816  this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2817  }
2818 
2819  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2820  // will never exceed tail.
2821  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2822 
2823  // Iterate the blocks and dequeue
2824  auto index = firstIndex;
2825  BlockIndexHeader *localBlockIndex;
2826  auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2827  do {
2828  auto blockStartIndex = index;
2829  index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2830  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ?
2831  firstIndex + static_cast<index_t>(actualCount) : endIndex;
2832 
2833  auto entry = localBlockIndex->index[indexIndex];
2834  auto block = entry->value.load(std::memory_order_relaxed);
2835  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T,
2836  T &&,
2837  details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2838  while (index != endIndex) {
2839  auto &el = *((*block)[index]);
2840  *itemFirst++ = std::move(el);
2841  el.~T();
2842  ++index;
2843  }
2844  } else {
2845  MOODYCAMEL_TRY {
2846  while (index != endIndex) {
2847  auto &el = *((*block)[index]);
2848  *itemFirst = std::move(el);
2849  ++itemFirst;
2850  el.~T();
2851  ++index;
2852  }
2853  }
2854  MOODYCAMEL_CATCH (...) {
2855  do {
2856  entry = localBlockIndex->index[indexIndex];
2857  block = entry->value.load(std::memory_order_relaxed);
2858  while (index != endIndex) {
2859  (*block)[index++]->~T();
2860  }
2861 
2862  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
2863  static_cast<size_t>(
2864  endIndex
2865  - blockStartIndex))) {
2866 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2867  debug::DebugLock lock(mutex);
2868 #endif
2869  entry->value.store(nullptr, std::memory_order_relaxed);
2870  this->parent->add_block_to_free_list(block);
2871  }
2872  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2873 
2874  blockStartIndex = index;
2875  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2876  endIndex =
2877  details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ?
2878  firstIndex + static_cast<index_t>(actualCount) : endIndex;
2879  } while (index != firstIndex + actualCount);
2880 
2881  MOODYCAMEL_RETHROW;
2882  }
2883  }
2884  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
2885  static_cast<size_t>(endIndex
2886  - blockStartIndex))) {
2887  {
2888 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2889  debug::DebugLock lock(mutex);
2890 #endif
2891  // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2892  // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2893  entry->value.store(nullptr, std::memory_order_relaxed);
2894  }
2895  this->parent->add_block_to_free_list(block); // releases the above store
2896  }
2897  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2898  } while (index != firstIndex + actualCount);
2899 
2900  return actualCount;
2901  } else {
2902  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2903  }
2904  }
2905 
2906  return 0;
2907  }
2908 
2909  private:
2910  // The block size must be > 1, so any number with the low bit set is an invalid block base index
2911  static const index_t INVALID_BLOCK_BASE = 1;
2912 
2913  struct BlockIndexEntry {
2914  std::atomic<index_t> key;
2915  std::atomic<Block *> value;
2916  };
2917 
2918  struct BlockIndexHeader {
2919  size_t capacity;
2920  std::atomic<size_t> tail;
2921  BlockIndexEntry *entries;
2922  BlockIndexEntry **index;
2923  BlockIndexHeader *prev;
2924  };
2925 
2926  template<AllocationMode allocMode>
2927  inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex) {
2928  auto localBlockIndex =
2929  blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2930  if (localBlockIndex == nullptr) {
2931  return false; // this can happen if new_block_index failed in the constructor
2932  }
2933  size_t newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2934  idxEntry = localBlockIndex->index[newTail];
2935  if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2936  idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2937 
2938  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2939  localBlockIndex->tail.store(newTail, std::memory_order_release);
2940  return true;
2941  }
2942 
2943  // No room in the old block index, try to allocate another one!
2944  MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2945  return false;
2946  } else if (!new_block_index()) {
2947  return false;
2948  } else {
2949  localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2950  newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2951  idxEntry = localBlockIndex->index[newTail];
2952  assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2953  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2954  localBlockIndex->tail.store(newTail, std::memory_order_release);
2955  return true;
2956  }
2957  }
2958 
2959  inline void rewind_block_index_tail() {
2960  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2961  localBlockIndex->tail.store(
2962  (localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1),
2963  std::memory_order_relaxed);
2964  }
2965 
2966  inline BlockIndexEntry *get_block_index_entry_for_index(index_t index) const {
2967  BlockIndexHeader *localBlockIndex;
2968  auto idx = get_block_index_index_for_index(index, localBlockIndex);
2969  return localBlockIndex->index[idx];
2970  }
2971 
2972  inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const {
2973 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2974  debug::DebugLock lock(mutex);
2975 #endif
2976  index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2977  localBlockIndex = blockIndex.load(std::memory_order_acquire);
2978  auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2979  auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2980  assert(tailBase != INVALID_BLOCK_BASE);
2981  // Note: Must use division instead of shift because the index may wrap around, causing a negative
2982  // offset, whose negativity we want to preserve
2983  auto offset =
2984  static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2985  size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2986  assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index
2987  && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2988  return idx;
2989  }
2990 
2991  bool new_block_index() {
2992  auto prev = blockIndex.load(std::memory_order_relaxed);
2993  size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2994  auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2995  auto raw = static_cast<char *>((Traits::malloc)(
2996  sizeof(BlockIndexHeader) +
2997  std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2998  std::alignment_of<BlockIndexEntry *>::value - 1 + sizeof(BlockIndexEntry *) * nextBlockIndexCapacity));
2999  if (raw == nullptr) {
3000  return false;
3001  }
3002 
3003  auto header = new(raw) BlockIndexHeader;
3004  auto entries =
3005  reinterpret_cast<BlockIndexEntry *>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
3006  auto index = reinterpret_cast<BlockIndexEntry **>(details::align_for<BlockIndexEntry *>(
3007  reinterpret_cast<char *>(entries) + sizeof(BlockIndexEntry) * entryCount));
3008  if (prev != nullptr) {
3009  auto prevTail = prev->tail.load(std::memory_order_relaxed);
3010  auto prevPos = prevTail;
3011  size_t i = 0;
3012  do {
3013  prevPos = (prevPos + 1) & (prev->capacity - 1);
3014  index[i++] = prev->index[prevPos];
3015  } while (prevPos != prevTail);
3016  assert(i == prevCapacity);
3017  }
3018  for (size_t i = 0; i != entryCount; ++i) {
3019  new(entries + i) BlockIndexEntry;
3020  entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3021  index[prevCapacity + i] = entries + i;
3022  }
3023  header->prev = prev;
3024  header->entries = entries;
3025  header->index = index;
3026  header->capacity = nextBlockIndexCapacity;
3027  header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3028 
3029  blockIndex.store(header, std::memory_order_release);
3030 
3031  nextBlockIndexCapacity <<= 1;
3032 
3033  return true;
3034  }
3035 
3036  private:
3037  size_t nextBlockIndexCapacity;
3038  std::atomic<BlockIndexHeader *> blockIndex;
3039 
3040 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3041  public:
3042  details::ThreadExitListener threadExitListener;
3043  private:
3044 #endif
3045 
3046 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3047  public:
3048  ImplicitProducer* nextImplicitProducer;
3049  private:
3050 #endif
3051 
3052 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3053  mutable debug::DebugMutex mutex;
3054 #endif
3055 #ifdef MCDBGQ_TRACKMEM
3056  friend struct MemStats;
3057 #endif
3058  };
3059 
3060 
3062  // Block pool manipulation
3064 
3065  void populate_initial_block_list(size_t blockCount) {
3066  initialBlockPoolSize = blockCount;
3067  if (initialBlockPoolSize == 0) {
3068  initialBlockPool = nullptr;
3069  return;
3070  }
3071 
3072  initialBlockPool = create_array<Block>(blockCount);
3073  if (initialBlockPool == nullptr) {
3074  initialBlockPoolSize = 0;
3075  }
3076  for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3077  initialBlockPool[i].dynamicallyAllocated = false;
3078  }
3079  }
3080 
3081  inline Block *try_get_block_from_initial_pool() {
3082  if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3083  return nullptr;
3084  }
3085 
3086  auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3087 
3088  return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3089  }
3090 
3091  inline void add_block_to_free_list(Block *block) {
3092 #ifdef MCDBGQ_TRACKMEM
3093  block->owner = nullptr;
3094 #endif
3095  freeList.add(block);
3096  }
3097 
3098  inline void add_blocks_to_free_list(Block *block) {
3099  while (block != nullptr) {
3100  auto next = block->next;
3101  add_block_to_free_list(block);
3102  block = next;
3103  }
3104  }
3105 
3106  inline Block *try_get_block_from_free_list() {
3107  return freeList.try_get();
3108  }
3109 
3110  // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3111  template<AllocationMode canAlloc>
3112  Block *requisition_block() {
3113  auto block = try_get_block_from_initial_pool();
3114  if (block != nullptr) {
3115  return block;
3116  }
3117 
3118  block = try_get_block_from_free_list();
3119  if (block != nullptr) {
3120  return block;
3121  }
3122 
3123  MOODYCAMEL_CONSTEXPR_IF (canAlloc == CanAlloc) {
3124  return create<Block>();
3125  } else {
3126  return nullptr;
3127  }
3128  }
3129 
3130 #ifdef MCDBGQ_TRACKMEM
3131  public:
3132  struct MemStats {
3133  size_t allocatedBlocks;
3134  size_t usedBlocks;
3135  size_t freeBlocks;
3136  size_t ownedBlocksExplicit;
3137  size_t ownedBlocksImplicit;
3138  size_t implicitProducers;
3139  size_t explicitProducers;
3140  size_t elementsEnqueued;
3141  size_t blockClassBytes;
3142  size_t queueClassBytes;
3143  size_t implicitBlockIndexBytes;
3144  size_t explicitBlockIndexBytes;
3145 
3146  friend class ConcurrentQueue;
3147 
3148  private:
3149  static MemStats getFor(ConcurrentQueue* q)
3150  {
3151  MemStats stats = { 0 };
3152 
3153  stats.elementsEnqueued = q->size_approx();
3154 
3155  auto block = q->freeList.head_unsafe();
3156  while (block != nullptr) {
3157  ++stats.allocatedBlocks;
3158  ++stats.freeBlocks;
3159  block = block->freeListNext.load(std::memory_order_relaxed);
3160  }
3161 
3162  for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3163  bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3164  stats.implicitProducers += implicit ? 1 : 0;
3165  stats.explicitProducers += implicit ? 0 : 1;
3166 
3167  if (implicit) {
3168  auto prod = static_cast<ImplicitProducer*>(ptr);
3169  stats.queueClassBytes += sizeof(ImplicitProducer);
3170  auto head = prod->headIndex.load(std::memory_order_relaxed);
3171  auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3172  auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3173  if (hash != nullptr) {
3174  for (size_t i = 0; i != hash->capacity; ++i) {
3175  if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3176  ++stats.allocatedBlocks;
3177  ++stats.ownedBlocksImplicit;
3178  }
3179  }
3180  stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3181  for (; hash != nullptr; hash = hash->prev) {
3182  stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3183  }
3184  }
3185  for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3186  //auto block = prod->get_block_index_entry_for_index(head);
3187  ++stats.usedBlocks;
3188  }
3189  }
3190  else {
3191  auto prod = static_cast<ExplicitProducer*>(ptr);
3192  stats.queueClassBytes += sizeof(ExplicitProducer);
3193  auto tailBlock = prod->tailBlock;
3194  bool wasNonEmpty = false;
3195  if (tailBlock != nullptr) {
3196  auto block = tailBlock;
3197  do {
3198  ++stats.allocatedBlocks;
3199  if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3200  ++stats.usedBlocks;
3201  wasNonEmpty = wasNonEmpty || block != tailBlock;
3202  }
3203  ++stats.ownedBlocksExplicit;
3204  block = block->next;
3205  } while (block != tailBlock);
3206  }
3207  auto index = prod->blockIndex.load(std::memory_order_relaxed);
3208  while (index != nullptr) {
3209  stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3210  index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3211  }
3212  }
3213  }
3214 
3215  auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3216  stats.allocatedBlocks += freeOnInitialPool;
3217  stats.freeBlocks += freeOnInitialPool;
3218 
3219  stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3220  stats.queueClassBytes += sizeof(ConcurrentQueue);
3221 
3222  return stats;
3223  }
3224  };
3225 
3226  // For debugging only. Not thread-safe.
3227  MemStats getMemStats()
3228  {
3229  return MemStats::getFor(this);
3230  }
3231  private:
3232  friend struct MemStats;
3233 #endif
3234 
3235 
3237  // Producer list manipulation
3239 
3240  ProducerBase *recycle_or_create_producer(bool isExplicit) {
3241  bool recycled;
3242  return recycle_or_create_producer(isExplicit, recycled);
3243  }
3244 
3245  ProducerBase *recycle_or_create_producer(bool isExplicit, bool &recycled) {
3246 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3247  debug::DebugLock lock(implicitProdMutex);
3248 #endif
3249  // Try to re-use one first
3250  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3251  if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3252  bool expected = true;
3253  if (ptr->inactive.compare_exchange_strong(expected, /* desired */
3254  false,
3255  std::memory_order_acquire,
3256  std::memory_order_relaxed)) {
3257  // We caught one! It's been marked as activated, the caller can have it
3258  recycled = true;
3259  return ptr;
3260  }
3261  }
3262  }
3263 
3264  recycled = false;
3265  return add_producer(isExplicit ? static_cast<ProducerBase *>(create<ExplicitProducer>(this)) : create<
3266  ImplicitProducer>(this));
3267  }
3268 
3269  ProducerBase *add_producer(ProducerBase *producer) {
3270  // Handle failed memory allocation
3271  if (producer == nullptr) {
3272  return nullptr;
3273  }
3274 
3275  producerCount.fetch_add(1, std::memory_order_relaxed);
3276 
3277  // Add it to the lock-free list
3278  auto prevTail = producerListTail.load(std::memory_order_relaxed);
3279  do {
3280  producer->next = prevTail;
3281  } while (!producerListTail.compare_exchange_weak(prevTail,
3282  producer,
3283  std::memory_order_release,
3284  std::memory_order_relaxed));
3285 
3286 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3287  if (producer->isExplicit) {
3288  auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3289  do {
3290  static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3291  } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3292  }
3293  else {
3294  auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3295  do {
3296  static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3297  } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3298  }
3299 #endif
3300 
3301  return producer;
3302  }
3303 
3304  void reown_producers() {
3305  // After another instance is moved-into/swapped-with this one, all the
3306  // producers we stole still think their parents are the other queue.
3307  // So fix them up!
3308  for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3309  ptr->parent = this;
3310  }
3311  }
3312 
3313 
3315  // Implicit producer hash
3317 
3318  struct ImplicitProducerKVP {
3319  std::atomic<details::thread_id_t> key;
3320  ImplicitProducer
3321  *value; // No need for atomicity since it's only read by the thread that sets it in the first place
3322 
3323  ImplicitProducerKVP() : value(nullptr) {}
3324 
3325  ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3326  key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3327  value = other.value;
3328  }
3329 
3330  inline ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3331  swap(other);
3332  return *this;
3333  }
3334 
3335  inline void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT {
3336  if (this != &other) {
3337  details::swap_relaxed(key, other.key);
3338  std::swap(value, other.value);
3339  }
3340  }
3341  };
3342 
3343  template<typename XT, typename XTraits>
3344  friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &,
3345  typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT;
3346 
3347  struct ImplicitProducerHash {
3348  size_t capacity;
3349  ImplicitProducerKVP *entries;
3350  ImplicitProducerHash *prev;
3351  };
3352 
3353  inline void populate_initial_implicit_producer_hash() {
3354  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) {
3355  return;
3356  } else {
3357  implicitProducerHashCount.store(0, std::memory_order_relaxed);
3358  auto hash = &initialImplicitProducerHash;
3359  hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3360  hash->entries = &initialImplicitProducerHashEntries[0];
3361  for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3362  initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3363  }
3364  hash->prev = nullptr;
3365  implicitProducerHash.store(hash, std::memory_order_relaxed);
3366  }
3367  }
3368 
3369  void swap_implicit_producer_hashes(ConcurrentQueue &other) {
3370  MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) {
3371  return;
3372  } else {
3373  // Swap (assumes our implicit producer hash is initialized)
3374  initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3375  initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3376  other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3377 
3378  details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3379 
3380  details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3381  if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3382  implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3383  } else {
3384  ImplicitProducerHash *hash;
3385  for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3386  hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3387  continue;
3388  }
3389  hash->prev = &initialImplicitProducerHash;
3390  }
3391  if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3392  other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3393  } else {
3394  ImplicitProducerHash *hash;
3395  for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3396  hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3397  continue;
3398  }
3399  hash->prev = &other.initialImplicitProducerHash;
3400  }
3401  }
3402  }
3403 
3404  // Only fails (returns nullptr) if memory allocation fails
3405  ImplicitProducer *get_or_add_implicit_producer() {
3406  // Note that since the data is essentially thread-local (key is thread ID),
3407  // there's a reduced need for fences (memory ordering is already consistent
3408  // for any individual thread), except for the current table itself.
3409 
3410  // Start by looking for the thread ID in the current and all previous hash tables.
3411  // If it's not found, it must not be in there yet, since this same thread would
3412  // have added it previously to one of the tables that we traversed.
3413 
3414  // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3415 
3416 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3417  debug::DebugLock lock(implicitProdMutex);
3418 #endif
3419 
3420  auto id = details::thread_id();
3421  auto hashedId = details::hash_thread_id(id);
3422 
3423  auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3424  assert(mainHash != nullptr); // silence clang-tidy and MSVC warnings (hash cannot be null)
3425  for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3426  // Look for the id in this hash
3427  auto index = hashedId;
3428  while (true) { // Not an infinite loop because at least one slot is free in the hash table
3429  index &= hash->capacity - 1;
3430 
3431  auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3432  if (probedKey == id) {
3433  // Found it! If we had to search several hashes deep, though, we should lazily add it
3434  // to the current main hash table to avoid the extended search next time.
3435  // Note there's guaranteed to be room in the current hash table since every subsequent
3436  // table implicitly reserves space for all previous tables (there's only one
3437  // implicitProducerHashCount).
3438  auto value = hash->entries[index].value;
3439  if (hash != mainHash) {
3440  index = hashedId;
3441  while (true) {
3442  index &= mainHash->capacity - 1;
3443  probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3444  auto empty = details::invalid_thread_id;
3445 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3446  auto reusable = details::invalid_thread_id2;
3447  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3448  (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3449 #else
3450  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
3451  id,
3452  std::memory_order_relaxed,
3453  std::memory_order_relaxed))) {
3454 #endif
3455  mainHash->entries[index].value = value;
3456  break;
3457  }
3458  ++index;
3459  }
3460  }
3461 
3462  return value;
3463  }
3464  if (probedKey == details::invalid_thread_id) {
3465  break; // Not in this hash table
3466  }
3467  ++index;
3468  }
3469  }
3470 
3471  // Insert!
3472  auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3473  while (true) {
3474  // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
3475  if (newCount >= (mainHash->capacity >> 1)
3476  && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3477  // We've acquired the resize lock, try to allocate a bigger hash table.
3478  // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3479  // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3480  // locked block).
3481  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3482  if (newCount >= (mainHash->capacity >> 1)) {
3483  auto newCapacity = mainHash->capacity << 1;
3484  while (newCount >= (newCapacity >> 1)) {
3485  newCapacity <<= 1;
3486  }
3487  auto raw = static_cast<char *>((Traits::malloc)(
3488  sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1
3489  + sizeof(ImplicitProducerKVP) * newCapacity));
3490  if (raw == nullptr) {
3491  // Allocation failed
3492  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3493  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3494  return nullptr;
3495  }
3496 
3497  auto newHash = new(raw) ImplicitProducerHash;
3498  newHash->capacity = static_cast<size_t>(newCapacity);
3499  newHash->entries = reinterpret_cast<ImplicitProducerKVP *>(details::align_for<ImplicitProducerKVP>(
3500  raw + sizeof(ImplicitProducerHash)));
3501  for (size_t i = 0; i != newCapacity; ++i) {
3502  new(newHash->entries + i) ImplicitProducerKVP;
3503  newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3504  }
3505  newHash->prev = mainHash;
3506  implicitProducerHash.store(newHash, std::memory_order_release);
3507  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3508  mainHash = newHash;
3509  } else {
3510  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3511  }
3512  }
3513 
3514  // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3515  // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3516  // always be true)
3517  if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3518  bool recycled;
3519  auto producer = static_cast<ImplicitProducer *>(recycle_or_create_producer(false, recycled));
3520  if (producer == nullptr) {
3521  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3522  return nullptr;
3523  }
3524  if (recycled) {
3525  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3526  }
3527 
3528 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3529  producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3530  producer->threadExitListener.userData = producer;
3531  details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3532 #endif
3533 
3534  auto index = hashedId;
3535  while (true) {
3536  index &= mainHash->capacity - 1;
3537  auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3538 
3539  auto empty = details::invalid_thread_id;
3540 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3541  auto reusable = details::invalid_thread_id2;
3542  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3543  (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3544 #else
3545  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
3546  id,
3547  std::memory_order_relaxed,
3548  std::memory_order_relaxed))) {
3549 #endif
3550  mainHash->entries[index].value = producer;
3551  break;
3552  }
3553  ++index;
3554  }
3555  return producer;
3556  }
3557 
3558  // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3559  // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3560  // we try to allocate ourselves).
3561  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3562  }
3563  }
3564 
3565 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3566  void implicit_producer_thread_exited(ImplicitProducer* producer)
3567  {
3568  // Remove from thread exit listeners
3569  details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3570 
3571  // Remove from hash
3572 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3573  debug::DebugLock lock(implicitProdMutex);
3574 #endif
3575  auto hash = implicitProducerHash.load(std::memory_order_acquire);
3576  assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3577  auto id = details::thread_id();
3578  auto hashedId = details::hash_thread_id(id);
3579  details::thread_id_t probedKey;
3580 
3581  // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3582  // trying to add an entry thinking there's a free slot (because they reused a producer)
3583  for (; hash != nullptr; hash = hash->prev) {
3584  auto index = hashedId;
3585  do {
3586  index &= hash->capacity - 1;
3587  probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3588  if (probedKey == id) {
3589  hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3590  break;
3591  }
3592  ++index;
3593  } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3594  }
3595 
3596  // Mark the queue as being recyclable
3597  producer->inactive.store(true, std::memory_order_release);
3598  }
3599 
3600  static void implicit_producer_thread_exited_callback(void* userData)
3601  {
3602  auto producer = static_cast<ImplicitProducer*>(userData);
3603  auto queue = producer->parent;
3604  queue->implicit_producer_thread_exited(producer);
3605  }
3606 #endif
3607 
3609  // Utility functions
3611 
3612  template<typename TAlign>
3613  static inline void *aligned_malloc(size_t size) {
3614  MOODYCAMEL_CONSTEXPR_IF (std::alignment_of<TAlign>::value
3615  <= std::alignment_of<details::max_align_t>::value)return (Traits::malloc)(size);
3616  else {
3617  size_t alignment = std::alignment_of<TAlign>::value;
3618  void *raw = (Traits::malloc)(size + alignment - 1 + sizeof(void *));
3619  if (!raw)
3620  return nullptr;
3621  char *ptr = details::align_for<TAlign>(reinterpret_cast<char *>(raw) + sizeof(void *));
3622  *(reinterpret_cast<void **>(ptr) - 1) = raw;
3623  return ptr;
3624  }
3625  }
3626 
3627  template<typename TAlign>
3628  static inline void aligned_free(void *ptr) {
3629  MOODYCAMEL_CONSTEXPR_IF (std::alignment_of<TAlign>::value
3630  <= std::alignment_of<details::max_align_t>::value)return (Traits::free)(ptr);
3631  else
3632  (Traits::free)(ptr ? *(reinterpret_cast<void **>(ptr) - 1) : nullptr);
3633  }
3634 
3635  template<typename U>
3636  static inline U *create_array(size_t count) {
3637  assert(count > 0);
3638  U *p = static_cast<U *>(aligned_malloc<U>(sizeof(U) * count));
3639  if (p == nullptr)
3640  return nullptr;
3641 
3642  for (size_t i = 0; i != count; ++i)
3643  new(p + i) U();
3644  return p;
3645  }
3646 
3647  template<typename U>
3648  static inline void destroy_array(U *p, size_t count) {
3649  if (p != nullptr) {
3650  assert(count > 0);
3651  for (size_t i = count; i != 0;)
3652  (p + --i)->~U();
3653  }
3654  aligned_free<U>(p);
3655  }
3656 
3657  template<typename U>
3658  static inline U *create() {
3659  void *p = aligned_malloc<U>(sizeof(U));
3660  return p != nullptr ? new(p) U : nullptr;
3661  }
3662 
3663  template<typename U, typename A1>
3664  static inline U *create(A1 &&a1) {
3665  void *p = aligned_malloc<U>(sizeof(U));
3666  return p != nullptr ? new(p) U(std::forward<A1>(a1)) : nullptr;
3667  }
3668 
3669  template<typename U>
3670  static inline void destroy(U *p) {
3671  if (p != nullptr)
3672  p->~U();
3673  aligned_free<U>(p);
3674  }
3675 
3676  private:
3677  std::atomic<ProducerBase *> producerListTail;
3678  std::atomic<std::uint32_t> producerCount;
3679 
3680  std::atomic<size_t> initialBlockPoolIndex;
3681  Block *initialBlockPool;
3682  size_t initialBlockPoolSize;
3683 
3684 #ifndef MCDBGQ_USEDEBUGFREELIST
3685  FreeList<Block> freeList;
3686 #else
3687  debug::DebugFreeList<Block> freeList;
3688 #endif
3689 
3690  std::atomic<ImplicitProducerHash *> implicitProducerHash;
3691  std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3692  ImplicitProducerHash initialImplicitProducerHash;
3693  std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3694  std::atomic_flag implicitProducerHashResizeInProgress;
3695 
3696  std::atomic<std::uint32_t> nextExplicitConsumerId;
3697  std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3698 
3699 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3700  debug::DebugMutex implicitProdMutex;
3701 #endif
3702 
3703 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3704  std::atomic<ExplicitProducer*> explicitProducers;
3705  std::atomic<ImplicitProducer*> implicitProducers;
3706 #endif
3707 };
3708 
3709 template<typename T, typename Traits>
3710 ProducerToken::ProducerToken(ConcurrentQueue<T, Traits> &queue)
3711  : producer(queue.recycle_or_create_producer(true)) {
3712  if (producer != nullptr) {
3713  producer->token = this;
3714  }
3715 }
3716 
3717 template<typename T, typename Traits>
3718 ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits> &queue)
3719  : producer(reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)->recycle_or_create_producer(true)) {
3720  if (producer != nullptr) {
3721  producer->token = this;
3722  }
3723 }
3724 
3725 template<typename T, typename Traits>
3726 ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits> &queue)
3727  : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3728  initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3729  lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3730 }
3731 
3732 template<typename T, typename Traits>
3733 ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits> &queue)
3734  : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3735  initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)->nextExplicitConsumerId.fetch_add(1,
3736  std::memory_order_release);
3737  lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3738 }
3739 
3740 template<typename T, typename Traits>
3741 inline void swap(ConcurrentQueue<T, Traits> &a, ConcurrentQueue<T, Traits> &b) MOODYCAMEL_NOEXCEPT {
3742  a.swap(b);
3743 }
3744 
3745 inline void swap(ProducerToken &a, ProducerToken &b) MOODYCAMEL_NOEXCEPT {
3746  a.swap(b);
3747 }
3748 
3749 inline void swap(ConsumerToken &a, ConsumerToken &b) MOODYCAMEL_NOEXCEPT {
3750  a.swap(b);
3751 }
3752 
3753 template<typename T, typename Traits>
3754 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
3755  typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT {
3756  a.swap(b);
3757 }
3758 
3759 }
3760 
3761 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3762 #pragma warning(pop)
3763 #endif
3764 
3765 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3766 #pragma GCC diagnostic pop
3767 #endif
Definition: concurrentqueue.h:430
Definition: concurrentqueue.h:752
Definition: concurrentqueue.h:334
Definition: concurrentqueue.h:700
Definition: concurrentqueue.h:640
Definition: concurrentqueue.h:445
Definition: concurrentqueue.h:306
Definition: concurrentqueue.h:469
Definition: concurrentqueue.h:265
Definition: concurrentqueue.h:550
Definition: concurrentqueue.h:527
Definition: concurrentqueue.h:621
Definition: concurrentqueue.h:633
Definition: concurrentqueue.h:85
Definition: concurrentqueue.h:321