34 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
38 #pragma GCC diagnostic push
39 #pragma GCC diagnostic ignored "-Wconversion"
41 #ifdef MCDBGQ_USE_RELACY
42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
46 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
50 #pragma warning(disable: 4127)
53 #if defined(__APPLE__)
54 #include "TargetConditionals.h"
57 #ifdef MCDBGQ_USE_RELACY
58 #include "relacy/relacy_std.hpp"
59 #include "relacy_shims.h"
73 #include <type_traits>
82 namespace moodycamel {
84 template<
typename thread_
id_t>
86 typedef thread_id_t thread_id_numeric_size_t;
87 typedef thread_id_t thread_id_hash_t;
88 static thread_id_hash_t prehash(thread_id_t
const &x) {
return x; }
92 #if defined(MCDBGQ_USE_RELACY)
93 namespace moodycamel {
namespace details {
94 typedef std::uint32_t thread_id_t;
95 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
96 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
97 static inline thread_id_t thread_id() {
return rl::thread_index(); }
99 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
102 extern "C" __declspec(dllimport)
unsigned long __stdcall GetCurrentThreadId(
void);
103 namespace moodycamel {
namespace details {
104 static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
"Expected size of unsigned long to be 32 bits on Windows");
105 typedef std::uint32_t thread_id_t;
106 static const thread_id_t invalid_thread_id = 0;
107 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU;
108 static inline thread_id_t thread_id() {
return static_cast<thread_id_t
>(::GetCurrentThreadId()); }
110 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || defined(MOODYCAMEL_NO_THREAD_LOCAL)
111 namespace moodycamel {
namespace details {
112 static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
"std::thread::id is expected to be either 4 or 8 bytes");
114 typedef std::thread::id thread_id_t;
115 static const thread_id_t invalid_thread_id;
120 static inline thread_id_t thread_id() {
return std::this_thread::get_id(); }
122 template<std::
size_t>
struct thread_id_size { };
123 template<>
struct thread_id_size<4> {
typedef std::uint32_t numeric_t; };
124 template<>
struct thread_id_size<8> {
typedef std::uint64_t numeric_t; };
126 template<>
struct thread_id_converter<thread_id_t> {
127 typedef thread_id_size<
sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
129 typedef std::size_t thread_id_hash_t;
131 typedef thread_id_numeric_size_t thread_id_hash_t;
134 static thread_id_hash_t prehash(thread_id_t
const& x)
137 return std::hash<std::thread::id>()(x);
139 return *
reinterpret_cast<thread_id_hash_t const*
>(&x);
148 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
149 #define MOODYCAMEL_THREADLOCAL __thread
150 #elif defined(_MSC_VER)
151 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
154 #define MOODYCAMEL_THREADLOCAL thread_local
156 namespace moodycamel {
158 typedef std::uintptr_t thread_id_t;
159 static const thread_id_t invalid_thread_id = 0;
160 static const thread_id_t invalid_thread_id2 =
162 inline thread_id_t thread_id() {
163 static MOODYCAMEL_THREADLOCAL
int x;
164 return reinterpret_cast<thread_id_t
>(&x);
171 #ifndef MOODYCAMEL_CONSTEXPR_IF
172 #if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
173 #define MOODYCAMEL_CONSTEXPR_IF if constexpr
174 #define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
176 #define MOODYCAMEL_CONSTEXPR_IF if
177 #define MOODYCAMEL_MAYBE_UNUSED
182 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
183 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
184 #define MOODYCAMEL_EXCEPTIONS_ENABLED
187 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
188 #define MOODYCAMEL_TRY try
189 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
190 #define MOODYCAMEL_RETHROW throw
191 #define MOODYCAMEL_THROW(expr) throw (expr)
193 #define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF (true)
194 #define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF (false)
195 #define MOODYCAMEL_RETHROW
196 #define MOODYCAMEL_THROW(expr)
199 #ifndef MOODYCAMEL_NOEXCEPT
200 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
201 #define MOODYCAMEL_NOEXCEPT
202 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
203 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
204 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
207 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
208 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
209 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
210 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
211 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
212 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
213 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
215 #define MOODYCAMEL_NOEXCEPT noexcept
216 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
217 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
221 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
222 #ifdef MCDBGQ_USE_RELACY
223 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
228 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
237 #ifndef MOODYCAMEL_DELETE_FUNCTION
238 #if defined(_MSC_VER) && _MSC_VER < 1800
239 #define MOODYCAMEL_DELETE_FUNCTION
241 #define MOODYCAMEL_DELETE_FUNCTION = delete
245 namespace moodycamel {
247 #ifndef MOODYCAMEL_ALIGNAS
249 #if defined(_MSC_VER) && _MSC_VER <= 1800
250 #define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
251 #define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
252 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
253 template<
int Align,
typename T>
struct Vs2013Aligned { };
254 template<
typename T>
struct Vs2013Aligned<1, T> {
typedef __declspec(align(1)) T type; };
255 template<typename T> struct Vs2013Aligned<2, T> {
typedef __declspec(align(2)) T type; };
256 template<typename T> struct Vs2013Aligned<4, T> {
typedef __declspec(align(4)) T type; };
257 template<typename T> struct Vs2013Aligned<8, T> {
typedef __declspec(align(8)) T type; };
258 template<typename T> struct Vs2013Aligned<16, T> {
typedef __declspec(align(16)) T type; };
259 template<typename T> struct Vs2013Aligned<32, T> {
typedef __declspec(align(32)) T type; };
260 template<typename T> struct Vs2013Aligned<64, T> {
typedef __declspec(align(64)) T type; };
261 template<typename T> struct Vs2013Aligned<128, T> {
typedef __declspec(align(128)) T type; };
262 template<typename T> struct Vs2013Aligned<256, T> {
typedef __declspec(align(256)) T type; };
266 #define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
267 #define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
268 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
278 #define MOODYCAMEL_NO_TSAN
279 #if defined(__has_feature)
280 #if __has_feature(thread_sanitizer)
281 #undef MOODYCAMEL_NO_TSAN
282 #define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
287 namespace moodycamel {
289 #if defined(__GNUC__)
290 static inline bool (likely)(
bool x) {
return __builtin_expect((x),
true); }
291 static inline bool (unlikely)(
bool x) {
return __builtin_expect((x),
false); }
293 static inline bool (likely)(
bool x) {
return x; }
294 static inline bool (unlikely)(
bool x) {
return x; }
299 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
300 #include "internal/concurrentqueue_internal_debug.h"
303 namespace moodycamel {
307 static_assert(std::is_integral<T>::value,
"const_numeric_max can only be used with integers");
308 static const T value = std::numeric_limits<T>::is_signed
309 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
static_cast<T
>(1)
310 :
static_cast<T
>(-1);
313 #if defined(__GLIBCXX__)
314 typedef ::max_align_t std_max_align_t;
316 typedef std::max_align_t std_max_align_t;
336 typedef std::size_t size_t;
348 typedef std::size_t index_t;
355 static const size_t BLOCK_SIZE = 32;
362 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
366 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
370 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
376 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
381 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
393 static const int MAX_SEMA_SPINS = 10000;
395 #ifndef MCDBGQ_USE_RELACY
398 #if defined(malloc) || defined(free)
401 static inline void* WORKAROUND_malloc(
size_t size) {
return malloc(size); }
402 static inline void WORKAROUND_free(
void* ptr) {
return free(ptr); }
403 static inline void* (malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
404 static inline void (free)(
void* ptr) {
return WORKAROUND_free(ptr); }
406 static inline void *malloc(
size_t size) {
return std::malloc(size); }
407 static inline void free(
void *ptr) {
return std::free(ptr); }
412 static inline void* malloc(
size_t size) {
return rl::rl_malloc(size, $); }
413 static inline void free(
void* ptr) {
return rl::rl_free(ptr, $); }
427 template<
typename T,
typename Traits>
429 template<
typename T,
typename Traits>
431 class ConcurrentQueueTests;
436 std::atomic<bool> inactive;
440 : next(
nullptr), inactive(
false), token(
nullptr) {
446 static inline std::uint32_t hash(std::uint32_t h) {
455 return h ^ (h >> 16);
460 static inline std::uint64_t hash(std::uint64_t h) {
462 h *= 0xff51afd7ed558ccd;
464 h *= 0xc4ceb9fe1a85ec53;
465 return h ^ (h >> 33);
468 template<std::
size_t size>
471 static inline size_t hash_thread_id(thread_id_t
id) {
472 static_assert(
sizeof(thread_id_t) <= 8,
"Expected a platform where thread IDs are at most 64-bit values");
473 return static_cast<size_t>(
hash_32_or_64<
sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
478 static inline bool circular_less_than(T a, T b) {
480 #pragma warning(push)
481 #pragma warning(disable: 4554)
483 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
484 "circular_less_than is intended to be used only with unsigned integer types");
485 return static_cast<T
>(a - b) >
static_cast<T
>(
static_cast<T
>(1) <<
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1));
492 static inline char *align_for(
char *ptr) {
493 const std::size_t alignment = std::alignment_of<U>::value;
494 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
498 static inline T ceil_to_pow_2(T x) {
499 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
500 "ceil_to_pow_2 is intended to be used only with unsigned integer types");
507 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
515 static inline void swap_relaxed(std::atomic<T> &left, std::atomic<T> &right) {
516 T temp = std::move(left.load(std::memory_order_relaxed));
517 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
518 right.store(std::move(temp), std::memory_order_relaxed);
522 static inline T
const &nomove(T
const &x) {
526 template<
bool Enable>
529 static inline T
const &eval(T
const &x) {
537 static inline auto eval(U &&x)
538 -> decltype(std::forward<U>(x)) {
539 return std::forward<U>(x);
543 template<
typename It>
544 static inline auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it) {
548 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
555 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
556 #ifdef MCDBGQ_USE_RELACY
557 typedef RelacyThreadExitListener ThreadExitListener;
558 typedef RelacyThreadExitNotifier ThreadExitNotifier;
560 struct ThreadExitListener
562 typedef void (*callback_t)(
void*);
566 ThreadExitListener* next;
570 class ThreadExitNotifier
573 static void subscribe(ThreadExitListener* listener)
575 auto& tlsInst = instance();
576 listener->next = tlsInst.tail;
577 tlsInst.tail = listener;
580 static void unsubscribe(ThreadExitListener* listener)
582 auto& tlsInst = instance();
583 ThreadExitListener** prev = &tlsInst.tail;
584 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
585 if (ptr == listener) {
594 ThreadExitNotifier() : tail(nullptr) { }
595 ThreadExitNotifier(ThreadExitNotifier
const&) MOODYCAMEL_DELETE_FUNCTION;
596 ThreadExitNotifier& operator=(ThreadExitNotifier
const&) MOODYCAMEL_DELETE_FUNCTION;
598 ~ThreadExitNotifier()
601 assert(
this == &instance() &&
"If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
602 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
603 ptr->callback(ptr->userData);
608 static inline ThreadExitNotifier& instance()
610 static thread_local ThreadExitNotifier notifier;
615 ThreadExitListener* tail;
641 template<
typename T,
typename Traits>
644 template<
typename T,
typename Traits>
648 : producer(other.producer) {
649 other.producer =
nullptr;
650 if (producer !=
nullptr) {
651 producer->token =
this;
661 std::swap(producer, other.producer);
662 if (producer !=
nullptr) {
663 producer->token =
this;
665 if (other.producer !=
nullptr) {
666 other.producer->token = &other;
678 inline bool valid()
const {
return producer !=
nullptr; }
681 if (producer !=
nullptr) {
682 producer->token =
nullptr;
683 producer->inactive.store(
true, std::memory_order_release);
692 template<
typename T,
typename Traits>
friend
694 friend class ConcurrentQueueTests;
701 template<
typename T,
typename Traits>
704 template<
typename T,
typename Traits>
708 : initialOffset(other.initialOffset),
709 lastKnownGlobalOffset(other.lastKnownGlobalOffset),
710 itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
711 currentProducer(other.currentProducer),
712 desiredProducer(other.desiredProducer) {
721 std::swap(initialOffset, other.initialOffset);
722 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
723 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
724 std::swap(currentProducer, other.currentProducer);
725 std::swap(desiredProducer, other.desiredProducer);
733 template<
typename T,
typename Traits>
friend
735 friend class ConcurrentQueueTests;
738 std::uint32_t initialOffset;
739 std::uint32_t lastKnownGlobalOffset;
740 std::uint32_t itemsConsumedFromCurrent;
747 template<
typename T,
typename Traits>
751 template<
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
757 typedef typename Traits::index_t index_t;
758 typedef typename Traits::size_t size_t;
760 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
762 EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
763 static const size_t EXPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
764 static const size_t IMPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
766 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE =
static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
767 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
768 static_cast<std::uint32_t
>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
770 #pragma warning(push)
771 #pragma warning(disable: 4307)
772 #pragma warning(disable: 4309)
774 static const size_t MAX_SUBQUEUE_SIZE =
777 / BLOCK_SIZE * BLOCK_SIZE);
782 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
783 "Traits::size_t must be an unsigned integral type");
784 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
785 "Traits::index_t must be an unsigned integral type");
786 static_assert(
sizeof(index_t) >=
sizeof(
size_t),
"Traits::index_t must be at least as wide as Traits::size_t");
787 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
788 "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
789 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1)
790 && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
791 "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
792 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
793 "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
794 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
795 "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
796 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
797 || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
798 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
799 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
800 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
814 : producerListTail(
nullptr),
816 initialBlockPoolIndex(0),
817 nextExplicitConsumerId(0),
818 globalExplicitConsumerOffset(0) {
819 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
820 populate_initial_implicit_producer_hash();
821 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
823 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
828 explicitProducers.store(
nullptr, std::memory_order_relaxed);
829 implicitProducers.store(
nullptr, std::memory_order_relaxed);
836 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
837 : producerListTail(
nullptr),
839 initialBlockPoolIndex(0),
840 nextExplicitConsumerId(0),
841 globalExplicitConsumerOffset(0) {
842 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
843 populate_initial_implicit_producer_hash();
844 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1)
845 + 2 * (maxExplicitProducers + maxImplicitProducers);
846 populate_initial_block_list(blocks);
848 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
849 explicitProducers.store(
nullptr, std::memory_order_relaxed);
850 implicitProducers.store(
nullptr, std::memory_order_relaxed);
859 auto ptr = producerListTail.load(std::memory_order_relaxed);
860 while (ptr !=
nullptr) {
861 auto next = ptr->next_prod();
862 if (ptr->token !=
nullptr) {
863 ptr->token->producer =
nullptr;
870 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
871 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
872 while (hash !=
nullptr) {
873 auto prev = hash->prev;
874 if (prev !=
nullptr) {
875 for (
size_t i = 0; i != hash->capacity; ++i) {
876 hash->entries[i].~ImplicitProducerKVP();
878 hash->~ImplicitProducerHash();
879 (Traits::free)(hash);
886 auto block = freeList.head_unsafe();
887 while (block !=
nullptr) {
888 auto next = block->freeListNext.load(std::memory_order_relaxed);
889 if (block->dynamicallyAllocated) {
896 destroy_array(initialBlockPool, initialBlockPoolSize);
910 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
911 producerCount(other.producerCount.load(std::memory_order_relaxed)),
912 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
913 initialBlockPool(other.initialBlockPool),
914 initialBlockPoolSize(other.initialBlockPoolSize),
915 freeList(std::move(other.freeList)),
916 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
917 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
919 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
920 populate_initial_implicit_producer_hash();
921 swap_implicit_producer_hashes(other);
923 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
924 other.producerCount.store(0, std::memory_order_relaxed);
925 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
926 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
928 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
929 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
930 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
931 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
932 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
935 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
936 other.initialBlockPoolSize = 0;
937 other.initialBlockPool =
nullptr;
943 return swap_internal(other);
952 swap_internal(other);
957 if (
this == &other) {
961 details::swap_relaxed(producerListTail, other.producerListTail);
962 details::swap_relaxed(producerCount, other.producerCount);
963 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
964 std::swap(initialBlockPool, other.initialBlockPool);
965 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
966 freeList.swap(other.freeList);
967 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
968 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
970 swap_implicit_producer_hashes(other);
973 other.reown_producers();
975 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
976 details::swap_relaxed(explicitProducers, other.explicitProducers);
977 details::swap_relaxed(implicitProducers, other.implicitProducers);
989 inline bool enqueue(T
const &item) {
990 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
991 else return inner_enqueue<CanAlloc>(item);
999 inline bool enqueue(T &&item) {
1000 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1001 else return inner_enqueue<CanAlloc>(std::move(item));
1009 return inner_enqueue<CanAlloc>(token, item);
1017 return inner_enqueue<CanAlloc>(token, std::move(item));
1026 template<
typename It>
1027 bool enqueue_bulk(It itemFirst,
size_t count) {
1028 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1029 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1038 template<
typename It>
1039 bool enqueue_bulk(
producer_token_t const &token, It itemFirst,
size_t count) {
1040 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1048 inline bool try_enqueue(T
const &item) {
1049 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1050 else return inner_enqueue<CannotAlloc>(item);
1058 inline bool try_enqueue(T &&item) {
1059 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1060 else return inner_enqueue<CannotAlloc>(std::move(item));
1067 return inner_enqueue<CannotAlloc>(token, item);
1074 return inner_enqueue<CannotAlloc>(token, std::move(item));
1084 template<
typename It>
1085 bool try_enqueue_bulk(It itemFirst,
size_t count) {
1086 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1087 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1095 template<
typename It>
1096 bool try_enqueue_bulk(
producer_token_t const &token, It itemFirst,
size_t count) {
1097 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1104 template<
typename U>
1105 bool try_dequeue(U &item) {
1108 size_t nonEmptyCount = 0;
1109 ProducerBase *best =
nullptr;
1110 size_t bestSize = 0;
1111 for (
auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr !=
nullptr;
1112 ptr = ptr->next_prod()) {
1113 auto size = ptr->size_approx();
1115 if (size > bestSize) {
1125 if (nonEmptyCount > 0) {
1126 if ((details::likely)(best->dequeue(item))) {
1129 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1130 if (ptr != best && ptr->dequeue(item)) {
1147 template<
typename U>
1148 bool try_dequeue_non_interleaved(U &item) {
1149 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1150 if (ptr->dequeue(item)) {
1161 template<
typename U>
1169 if (token.desiredProducer ==
nullptr
1170 || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1171 if (!update_current_producer_after_rotation(token)) {
1178 if (
static_cast<ProducerBase *
>(token.currentProducer)->dequeue(item)) {
1179 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1180 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1185 auto tail = producerListTail.load(std::memory_order_acquire);
1186 auto ptr =
static_cast<ProducerBase *
>(token.currentProducer)->next_prod();
1187 if (ptr ==
nullptr) {
1190 while (ptr !=
static_cast<ProducerBase *
>(token.currentProducer)) {
1191 if (ptr->dequeue(item)) {
1192 token.currentProducer = ptr;
1193 token.itemsConsumedFromCurrent = 1;
1196 ptr = ptr->next_prod();
1197 if (ptr ==
nullptr) {
1209 template<
typename It>
1210 size_t try_dequeue_bulk(It itemFirst,
size_t max) {
1212 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1213 count += ptr->dequeue_bulk(itemFirst, max - count);
1226 template<
typename It>
1227 size_t try_dequeue_bulk(
consumer_token_t &token, It itemFirst,
size_t max) {
1228 if (token.desiredProducer ==
nullptr
1229 || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1230 if (!update_current_producer_after_rotation(token)) {
1235 size_t count =
static_cast<ProducerBase *
>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1237 if ((token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(max))
1238 >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1239 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1243 token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(count);
1246 auto tail = producerListTail.load(std::memory_order_acquire);
1247 auto ptr =
static_cast<ProducerBase *
>(token.currentProducer)->next_prod();
1248 if (ptr ==
nullptr) {
1251 while (ptr !=
static_cast<ProducerBase *
>(token.currentProducer)) {
1252 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1254 if (dequeued != 0) {
1255 token.currentProducer = ptr;
1256 token.itemsConsumedFromCurrent =
static_cast<std::uint32_t
>(dequeued);
1258 if (dequeued == max) {
1262 ptr = ptr->next_prod();
1263 if (ptr ==
nullptr) {
1276 template<
typename U>
1277 inline bool try_dequeue_from_producer(
producer_token_t const &producer, U &item) {
1278 return static_cast<ExplicitProducer *
>(producer.producer)->dequeue(item);
1288 template<
typename It>
1289 inline size_t try_dequeue_bulk_from_producer(
producer_token_t const &producer, It itemFirst,
size_t max) {
1290 return static_cast<ExplicitProducer *
>(producer.producer)->dequeue_bulk(itemFirst, max);
1299 size_t size_approx()
const {
1301 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1302 size += ptr->size_approx();
1310 static constexpr
bool is_lock_free() {
1324 struct ExplicitProducer;
1325 friend struct ExplicitProducer;
1326 struct ImplicitProducer;
1327 friend struct ImplicitProducer;
1328 friend class ConcurrentQueueTests;
1330 enum AllocationMode { CanAlloc, CannotAlloc };
1337 template<AllocationMode canAlloc,
typename U>
1339 return static_cast<ExplicitProducer *
>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1340 std::forward<U>(element));
1343 template<AllocationMode canAlloc,
typename U>
1344 inline bool inner_enqueue(U &&element) {
1345 auto producer = get_or_add_implicit_producer();
1346 return producer ==
nullptr ? false
1347 : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(
1351 template<AllocationMode canAlloc,
typename It>
1352 inline bool inner_enqueue_bulk(
producer_token_t const &token, It itemFirst,
size_t count) {
1353 return static_cast<ExplicitProducer *
>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<
1354 canAlloc>(itemFirst, count);
1357 template<AllocationMode canAlloc,
typename It>
1358 inline bool inner_enqueue_bulk(It itemFirst,
size_t count) {
1359 auto producer = get_or_add_implicit_producer();
1360 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(
1365 inline bool update_current_producer_after_rotation(
consumer_token_t &token) {
1367 auto tail = producerListTail.load(std::memory_order_acquire);
1368 if (token.desiredProducer ==
nullptr && tail ==
nullptr) {
1371 auto prodCount = producerCount.load(std::memory_order_relaxed);
1372 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1373 if ((details::unlikely)(token.desiredProducer ==
nullptr)) {
1377 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1378 token.desiredProducer = tail;
1379 for (std::uint32_t i = 0; i != offset; ++i) {
1380 token.desiredProducer =
static_cast<ProducerBase *
>(token.desiredProducer)->next_prod();
1381 if (token.desiredProducer ==
nullptr) {
1382 token.desiredProducer = tail;
1387 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1388 if (delta >= prodCount) {
1389 delta = delta % prodCount;
1391 for (std::uint32_t i = 0; i != delta; ++i) {
1392 token.desiredProducer =
static_cast<ProducerBase *
>(token.desiredProducer)->next_prod();
1393 if (token.desiredProducer ==
nullptr) {
1394 token.desiredProducer = tail;
1398 token.lastKnownGlobalOffset = globalOffset;
1399 token.currentProducer = token.desiredProducer;
1400 token.itemsConsumedFromCurrent = 0;
1409 template<
typename N>
1410 struct FreeListNode {
1411 FreeListNode() : freeListRefs(0), freeListNext(
nullptr) {}
1413 std::atomic<std::uint32_t> freeListRefs;
1414 std::atomic<N *> freeListNext;
1420 template<
typename N>
1422 FreeList() : freeListHead(
nullptr) {}
1423 FreeList(FreeList &&other)
1424 : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) {
1425 other.freeListHead.store(
nullptr,
1426 std::memory_order_relaxed);
1428 void swap(FreeList &other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1430 FreeList(FreeList
const &) MOODYCAMEL_DELETE_FUNCTION;
1431 FreeList &operator=(FreeList
const &) MOODYCAMEL_DELETE_FUNCTION;
1433 inline void add(N *node) {
1434 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1435 debug::DebugLock lock(mutex);
1439 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1442 add_knowing_refcount_is_zero(node);
1446 inline N *try_get() {
1447 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1448 debug::DebugLock lock(mutex);
1450 auto head = freeListHead.load(std::memory_order_acquire);
1451 while (head !=
nullptr) {
1452 auto prevHead = head;
1453 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1454 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs,
1456 std::memory_order_acquire,
1457 std::memory_order_relaxed)) {
1458 head = freeListHead.load(std::memory_order_acquire);
1464 auto next = head->freeListNext.load(std::memory_order_relaxed);
1465 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1468 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1471 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1478 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1479 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1480 add_knowing_refcount_is_zero(prevHead);
1488 N *head_unsafe()
const {
return freeListHead.load(std::memory_order_relaxed); }
1491 inline void add_knowing_refcount_is_zero(N *node) {
1500 auto head = freeListHead.load(std::memory_order_relaxed);
1502 node->freeListNext.store(head, std::memory_order_relaxed);
1503 node->freeListRefs.store(1, std::memory_order_release);
1504 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1506 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1516 std::atomic<N *> freeListHead;
1518 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1519 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1521 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1522 debug::DebugMutex mutex;
1531 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1536 elementsCompletelyDequeued(0),
1538 freeListNext(
nullptr),
1539 shouldBeOnFreeList(
false),
1540 dynamicallyAllocated(
true) {
1541 #ifdef MCDBGQ_TRACKMEM
1546 template<InnerQueueContext context>
1547 inline bool is_empty()
const {
1548 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1550 for (
size_t i = 0; i < BLOCK_SIZE; ++i) {
1551 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1557 std::atomic_thread_fence(std::memory_order_acquire);
1561 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1562 std::atomic_thread_fence(std::memory_order_acquire);
1565 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1571 template<InnerQueueContext context>
1572 inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i) {
1573 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1575 assert(!emptyFlags[BLOCK_SIZE - 1
1576 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1577 emptyFlags[BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1))].store(
true,
1578 std::memory_order_release);
1582 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1583 assert(prevVal < BLOCK_SIZE);
1584 return prevVal == BLOCK_SIZE - 1;
1590 template<InnerQueueContext context>
1591 inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i,
size_t count) {
1592 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1594 std::atomic_thread_fence(std::memory_order_release);
1595 i = BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1)) - count + 1;
1596 for (
size_t j = 0; j != count; ++j) {
1597 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1598 emptyFlags[i + j].store(
true, std::memory_order_relaxed);
1603 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1604 assert(prevVal + count <= BLOCK_SIZE);
1605 return prevVal + count == BLOCK_SIZE;
1609 template<InnerQueueContext context>
1610 inline void set_all_empty() {
1611 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1613 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1614 emptyFlags[i].store(
true, std::memory_order_relaxed);
1618 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1622 template<InnerQueueContext context>
1623 inline void reset_empty() {
1624 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1626 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1627 emptyFlags[i].store(
false, std::memory_order_relaxed);
1631 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1635 inline T *operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
1636 return static_cast<T *
>(
static_cast<void *
>(elements))
1637 +
static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1));
1639 inline T
const *operator[](index_t idx)
const MOODYCAMEL_NOEXCEPT {
1640 return static_cast<T
const *
>(
static_cast<void const *
>(elements))
1641 +
static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1));
1645 static_assert(std::alignment_of<T>::value <=
sizeof(T),
1646 "The queue does not support types with an alignment greater than their size at this time");
1647 MOODYCAMEL_ALIGNED_TYPE_LIKE(
char[
sizeof(T) * BLOCK_SIZE], T) elements;
1650 std::atomic<size_t> elementsCompletelyDequeued;
1651 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1653 std::atomic<std::uint32_t> freeListRefs;
1654 std::atomic<Block *> freeListNext;
1655 std::atomic<bool> shouldBeOnFreeList;
1656 bool dynamicallyAllocated;
1658 #ifdef MCDBGQ_TRACKMEM
1662 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value,
1663 "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1665 #ifdef MCDBGQ_TRACKMEM
1679 dequeueOptimisticCount(0),
1680 dequeueOvercommit(0),
1682 isExplicit(isExplicit_),
1686 virtual ~ProducerBase() {}
1688 template<
typename U>
1689 inline bool dequeue(U &element) {
1691 return static_cast<ExplicitProducer *
>(
this)->dequeue(element);
1693 return static_cast<ImplicitProducer *
>(
this)->dequeue(element);
1697 template<
typename It>
1698 inline size_t dequeue_bulk(It &itemFirst,
size_t max) {
1700 return static_cast<ExplicitProducer *
>(
this)->dequeue_bulk(itemFirst, max);
1702 return static_cast<ImplicitProducer *
>(
this)->dequeue_bulk(itemFirst, max);
1706 inline ProducerBase *next_prod()
const {
return static_cast<ProducerBase *
>(next); }
1708 inline size_t size_approx()
const {
1709 auto tail = tailIndex.load(std::memory_order_relaxed);
1710 auto head = headIndex.load(std::memory_order_relaxed);
1711 return details::circular_less_than(head, tail) ?
static_cast<size_t>(tail - head) : 0;
1714 inline index_t getTail()
const {
return tailIndex.load(std::memory_order_relaxed); }
1716 std::atomic<index_t> tailIndex;
1717 std::atomic<index_t> headIndex;
1719 std::atomic<index_t> dequeueOptimisticCount;
1720 std::atomic<index_t> dequeueOvercommit;
1729 #ifdef MCDBGQ_TRACKMEM
1730 friend struct MemStats;
1739 struct ExplicitProducer :
public ProducerBase {
1741 ProducerBase(parent_,
true),
1742 blockIndex(
nullptr),
1743 pr_blockIndexSlotsUsed(0),
1744 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1745 pr_blockIndexFront(0),
1746 pr_blockIndexEntries(
nullptr),
1747 pr_blockIndexRaw(
nullptr) {
1748 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1749 if (poolBasedIndexSize > pr_blockIndexSize) {
1750 pr_blockIndexSize = poolBasedIndexSize;
1756 ~ExplicitProducer() {
1760 if (this->tailBlock !=
nullptr) {
1762 Block *halfDequeuedBlock =
nullptr;
1763 if ((this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0) {
1766 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1767 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1768 this->headIndex.load(std::memory_order_relaxed))) {
1769 i = (i + 1) & (pr_blockIndexSize - 1);
1771 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1772 this->headIndex.load(std::memory_order_relaxed)));
1773 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1777 auto block = this->tailBlock;
1779 block = block->next;
1780 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1785 if (block == halfDequeuedBlock) {
1786 i =
static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed)
1787 &
static_cast<index_t
>(BLOCK_SIZE - 1));
1791 auto lastValidIndex =
1792 (this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE
1793 :
static_cast<size_t>(
1794 this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1));
1795 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1796 (*block)[i++]->~T();
1798 }
while (block != this->tailBlock);
1802 if (this->tailBlock !=
nullptr) {
1803 auto block = this->tailBlock;
1805 auto nextBlock = block->next;
1806 if (block->dynamicallyAllocated) {
1809 this->parent->add_block_to_free_list(block);
1812 }
while (block != this->tailBlock);
1816 auto header =
static_cast<BlockIndexHeader *
>(pr_blockIndexRaw);
1817 while (header !=
nullptr) {
1818 auto prev =
static_cast<BlockIndexHeader *
>(header->prev);
1819 header->~BlockIndexHeader();
1820 (Traits::free)(header);
1825 template<AllocationMode allocMode,
typename U>
1826 inline bool enqueue(U &&element) {
1827 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1828 index_t newTailIndex = 1 + currentTailIndex;
1829 if ((currentTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
1831 auto startBlock = this->tailBlock;
1832 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1833 if (this->tailBlock !=
nullptr
1834 && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1836 this->tailBlock = this->tailBlock->next;
1837 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1848 auto head = this->headIndex.load(std::memory_order_relaxed);
1849 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1850 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1852 && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1859 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1864 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
1866 }
else if (!new_block_index(pr_blockIndexSlotsUsed)) {
1872 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1873 if (newBlock ==
nullptr) {
1876 #ifdef MCDBGQ_TRACKMEM
1877 newBlock->owner =
this;
1879 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1880 if (this->tailBlock ==
nullptr) {
1881 newBlock->next = newBlock;
1883 newBlock->next = this->tailBlock->next;
1884 this->tailBlock->next = newBlock;
1886 this->tailBlock = newBlock;
1887 ++pr_blockIndexSlotsUsed;
1890 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
1892 new(
static_cast<T *
>(
nullptr)) T(std::forward<U>(element)))) {
1896 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1898 MOODYCAMEL_CATCH (...) {
1901 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1902 this->tailBlock = startBlock ==
nullptr ? this->tailBlock : startBlock;
1907 (void) originalBlockIndexSlotsUsed;
1911 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1912 entry.base = currentTailIndex;
1913 entry.block = this->tailBlock;
1914 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1915 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1917 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
1919 new(
static_cast<T *
>(
nullptr)) T(std::forward<U>(element)))) {
1920 this->tailIndex.store(newTailIndex, std::memory_order_release);
1926 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1928 this->tailIndex.store(newTailIndex, std::memory_order_release);
1932 template<
typename U>
1933 bool dequeue(U &element) {
1934 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1935 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1936 if (details::circular_less_than<index_t>(
1937 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit,
1955 std::atomic_thread_fence(std::memory_order_acquire);
1958 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1970 tail = this->tailIndex.load(std::memory_order_acquire);
1971 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1982 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1987 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1988 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1993 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1994 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1996 static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase)
1998 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2001 auto &el = *((*block)[index]);
2002 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2010 (*block)[index]->~T();
2011 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2013 } guard = {block, index};
2015 element = std::move(el);
2017 element = std::move(el);
2019 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2025 this->dequeueOvercommit.fetch_add(1,
2026 std::memory_order_release);
2033 template<AllocationMode allocMode,
typename It>
2034 bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst,
size_t count) {
2038 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2039 auto startBlock = this->tailBlock;
2040 auto originalBlockIndexFront = pr_blockIndexFront;
2041 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2043 Block *firstAllocatedBlock =
nullptr;
2046 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1))
2047 - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2048 index_t currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2049 if (blockBaseDiff > 0) {
2051 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr && this->tailBlock->next != firstAllocatedBlock
2052 && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2053 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2054 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2056 this->tailBlock = this->tailBlock->next;
2057 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2059 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2060 entry.base = currentTailIndex;
2061 entry.block = this->tailBlock;
2062 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2066 while (blockBaseDiff > 0) {
2067 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2068 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2070 auto head = this->headIndex.load(std::memory_order_relaxed);
2071 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2072 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2074 && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2075 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2076 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2078 pr_blockIndexFront = originalBlockIndexFront;
2079 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2080 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2082 }
else if (full || !new_block_index(originalBlockIndexSlotsUsed)) {
2084 pr_blockIndexFront = originalBlockIndexFront;
2085 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2086 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2093 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2097 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2098 if (newBlock ==
nullptr) {
2099 pr_blockIndexFront = originalBlockIndexFront;
2100 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2101 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2105 #ifdef MCDBGQ_TRACKMEM
2106 newBlock->owner =
this;
2108 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2109 if (this->tailBlock ==
nullptr) {
2110 newBlock->next = newBlock;
2112 newBlock->next = this->tailBlock->next;
2113 this->tailBlock->next = newBlock;
2115 this->tailBlock = newBlock;
2116 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2118 ++pr_blockIndexSlotsUsed;
2120 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2121 entry.base = currentTailIndex;
2122 entry.block = this->tailBlock;
2123 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2128 auto block = firstAllocatedBlock;
2130 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2131 if (block == this->tailBlock) {
2134 block = block->next;
2137 MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T,
2138 decltype(*itemFirst),
2139 new(
static_cast<T *
>(
nullptr)) T(details::deref_noexcept(
2141 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2142 std::memory_order_release);
2147 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2148 currentTailIndex = startTailIndex;
2149 auto endBlock = this->tailBlock;
2150 this->tailBlock = startBlock;
2152 (startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2153 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2154 this->tailBlock = firstAllocatedBlock;
2158 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2159 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2160 stopIndex = newTailIndex;
2162 MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T,
2163 decltype(*itemFirst),
2164 new(
static_cast<T *
>(
nullptr)) T(details::deref_noexcept(
2166 while (currentTailIndex != stopIndex) {
2167 new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2171 while (currentTailIndex != stopIndex) {
2179 new((*this->tailBlock)[currentTailIndex]) T(
details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T,
2180 decltype(*itemFirst),
2181 new(
static_cast<T *
>(
nullptr)) T(
2182 details::deref_noexcept(
2183 itemFirst)))>::eval(
2189 MOODYCAMEL_CATCH (...) {
2193 auto constructedStopIndex = currentTailIndex;
2194 auto lastBlockEnqueued = this->tailBlock;
2196 pr_blockIndexFront = originalBlockIndexFront;
2197 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2198 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2201 auto block = startBlock;
2202 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
2203 block = firstAllocatedBlock;
2205 currentTailIndex = startTailIndex;
2208 (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2209 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2210 stopIndex = constructedStopIndex;
2212 while (currentTailIndex != stopIndex) {
2213 (*block)[currentTailIndex++]->~T();
2215 if (block == lastBlockEnqueued) {
2218 block = block->next;
2225 if (this->tailBlock == endBlock) {
2226 assert(currentTailIndex == newTailIndex);
2229 this->tailBlock = this->tailBlock->next;
2232 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
2233 decltype(*itemFirst),
2234 new(
static_cast<T *
>(
nullptr)) T(details::deref_noexcept(
2236 if (firstAllocatedBlock !=
nullptr)
2237 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2238 std::memory_order_release);
2241 this->tailIndex.store(newTailIndex, std::memory_order_release);
2245 template<
typename It>
2246 size_t dequeue_bulk(It &itemFirst,
size_t max) {
2247 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2248 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2250 static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2251 if (details::circular_less_than<size_t>(0, desiredCount)) {
2252 desiredCount = desiredCount < max ? desiredCount : max;
2253 std::atomic_thread_fence(std::memory_order_acquire);
2255 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2257 tail = this->tailIndex.load(std::memory_order_acquire);
2258 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2259 if (details::circular_less_than<size_t>(0, actualCount)) {
2260 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2261 if (actualCount < desiredCount) {
2262 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2267 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2270 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2271 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2273 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2274 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2276 static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase)
2278 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2281 auto index = firstIndex;
2283 auto firstIndexInBlock = index;
2284 index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2285 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ?
2286 firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2287 auto block = localBlockIndex->entries[indexIndex].block;
2288 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T,
2290 details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2291 while (index != endIndex) {
2292 auto &el = *((*block)[index]);
2293 *itemFirst++ = std::move(el);
2299 while (index != endIndex) {
2300 auto &el = *((*block)[index]);
2301 *itemFirst = std::move(el);
2307 MOODYCAMEL_CATCH (...) {
2312 block = localBlockIndex->entries[indexIndex].block;
2313 while (index != endIndex) {
2314 (*block)[index++]->~T();
2316 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
2317 static_cast<size_t>(endIndex
2318 - firstIndexInBlock));
2319 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2321 firstIndexInBlock = index;
2322 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2324 details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ?
2325 firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2326 }
while (index != firstIndex + actualCount);
2331 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
2332 static_cast<size_t>(endIndex
2333 - firstIndexInBlock));
2334 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2335 }
while (index != firstIndex + actualCount);
2340 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2348 struct BlockIndexEntry {
2353 struct BlockIndexHeader {
2355 std::atomic<size_t> front;
2356 BlockIndexEntry *entries;
2360 bool new_block_index(
size_t numberOfFilledSlotsToExpose) {
2361 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2364 pr_blockIndexSize <<= 1;
2365 auto newRawPtr =
static_cast<char *
>((Traits::malloc)(
2366 sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1
2367 +
sizeof(BlockIndexEntry) * pr_blockIndexSize));
2368 if (newRawPtr ==
nullptr) {
2369 pr_blockIndexSize >>= 1;
2373 auto newBlockIndexEntries =
reinterpret_cast<BlockIndexEntry *
>(details::align_for<BlockIndexEntry>(
2374 newRawPtr +
sizeof(BlockIndexHeader)));
2378 if (pr_blockIndexSlotsUsed != 0) {
2379 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2381 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2382 i = (i + 1) & prevBlockSizeMask;
2383 }
while (i != pr_blockIndexFront);
2387 auto header =
new(newRawPtr) BlockIndexHeader;
2388 header->size = pr_blockIndexSize;
2389 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2390 header->entries = newBlockIndexEntries;
2391 header->prev = pr_blockIndexRaw;
2393 pr_blockIndexFront = j;
2394 pr_blockIndexEntries = newBlockIndexEntries;
2395 pr_blockIndexRaw = newRawPtr;
2396 blockIndex.store(header, std::memory_order_release);
2402 std::atomic<BlockIndexHeader *> blockIndex;
2405 size_t pr_blockIndexSlotsUsed;
2406 size_t pr_blockIndexSize;
2407 size_t pr_blockIndexFront;
2408 BlockIndexEntry *pr_blockIndexEntries;
2409 void *pr_blockIndexRaw;
2411 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2413 ExplicitProducer* nextExplicitProducer;
2417 #ifdef MCDBGQ_TRACKMEM
2418 friend struct MemStats;
2427 struct ImplicitProducer :
public ProducerBase {
2429 ProducerBase(parent_,
false),
2430 nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2431 blockIndex(
nullptr) {
2435 ~ImplicitProducer() {
2441 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2443 if (!this->inactive.load(std::memory_order_relaxed)) {
2444 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2449 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2450 auto index = this->headIndex.load(std::memory_order_relaxed);
2451 Block *block =
nullptr;
2452 assert(index == tail || details::circular_less_than(index, tail));
2453 bool forceFreeLastBlock =
2455 while (index != tail) {
2456 if ((index &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 || block ==
nullptr) {
2457 if (block !=
nullptr) {
2459 this->parent->add_block_to_free_list(block);
2462 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2465 ((*block)[index])->~T();
2471 if (this->tailBlock !=
nullptr && (forceFreeLastBlock || (tail &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0)) {
2472 this->parent->add_block_to_free_list(this->tailBlock);
2476 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2477 if (localBlockIndex !=
nullptr) {
2478 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2479 localBlockIndex->index[i]->~BlockIndexEntry();
2482 auto prev = localBlockIndex->prev;
2483 localBlockIndex->~BlockIndexHeader();
2484 (Traits::free)(localBlockIndex);
2485 localBlockIndex = prev;
2486 }
while (localBlockIndex !=
nullptr);
2490 template<AllocationMode allocMode,
typename U>
2491 inline bool enqueue(U &&element) {
2492 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2493 index_t newTailIndex = 1 + currentTailIndex;
2494 if ((currentTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
2496 auto head = this->headIndex.load(std::memory_order_relaxed);
2497 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2498 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2500 && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2503 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2504 debug::DebugLock lock(mutex);
2507 BlockIndexEntry *idxEntry;
2508 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2513 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2514 if (newBlock ==
nullptr) {
2515 rewind_block_index_tail();
2516 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2519 #ifdef MCDBGQ_TRACKMEM
2520 newBlock->owner =
this;
2522 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2524 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
2526 new(
static_cast<T *
>(
nullptr)) T(std::forward<U>(element)))) {
2529 new((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2531 MOODYCAMEL_CATCH (...) {
2532 rewind_block_index_tail();
2533 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2534 this->parent->add_block_to_free_list(newBlock);
2540 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2542 this->tailBlock = newBlock;
2544 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T,
2546 new(
static_cast<T *
>(
nullptr)) T(std::forward<U>(element)))) {
2547 this->tailIndex.store(newTailIndex, std::memory_order_release);
2553 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2555 this->tailIndex.store(newTailIndex, std::memory_order_release);
2559 template<
typename U>
2560 bool dequeue(U &element) {
2562 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2563 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2564 if (details::circular_less_than<index_t>(
2565 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit,
2567 std::atomic_thread_fence(std::memory_order_acquire);
2569 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2570 tail = this->tailIndex.load(std::memory_order_acquire);
2571 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2572 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2575 auto entry = get_block_index_entry_for_index(index);
2578 auto block = entry->value.load(std::memory_order_relaxed);
2579 auto &el = *((*block)[index]);
2581 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2582 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2585 debug::DebugLock lock(producer->mutex);
2590 BlockIndexEntry *entry;
2594 (*block)[index]->~T();
2595 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2596 entry->value.store(
nullptr, std::memory_order_relaxed);
2597 parent->add_block_to_free_list(block);
2600 } guard = {block, index, entry, this->parent};
2602 element = std::move(el);
2604 element = std::move(el);
2607 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2609 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2610 debug::DebugLock lock(mutex);
2613 entry->value.store(
nullptr, std::memory_order_relaxed);
2615 this->parent->add_block_to_free_list(block);
2621 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2629 #pragma warning(push)
2630 #pragma warning(disable: 4706)
2632 template<AllocationMode allocMode,
typename It>
2633 bool enqueue_bulk(It itemFirst,
size_t count) {
2643 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2644 auto startBlock = this->tailBlock;
2645 Block *firstAllocatedBlock =
nullptr;
2646 auto endBlock = this->tailBlock;
2649 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1))
2650 - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2651 index_t currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2652 if (blockBaseDiff > 0) {
2653 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2654 debug::DebugLock lock(mutex);
2657 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2658 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2661 BlockIndexEntry *idxEntry =
nullptr;
2663 bool indexInserted =
false;
2664 auto head = this->headIndex.load(std::memory_order_relaxed);
2665 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2666 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2668 && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2670 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex))
2671 || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
nullptr) {
2674 if (indexInserted) {
2675 rewind_block_index_tail();
2676 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2678 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2679 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2680 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2681 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2682 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2683 rewind_block_index_tail();
2685 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2686 this->tailBlock = startBlock;
2691 #ifdef MCDBGQ_TRACKMEM
2692 newBlock->owner =
this;
2694 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2695 newBlock->next =
nullptr;
2698 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2702 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr) {
2703 assert(this->tailBlock !=
nullptr);
2704 this->tailBlock->next = newBlock;
2706 this->tailBlock = newBlock;
2707 endBlock = newBlock;
2708 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2709 }
while (blockBaseDiff > 0);
2713 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2714 currentTailIndex = startTailIndex;
2715 this->tailBlock = startBlock;
2717 (startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2718 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2719 this->tailBlock = firstAllocatedBlock;
2723 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2724 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2725 stopIndex = newTailIndex;
2727 MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T,
2728 decltype(*itemFirst),
2729 new(
static_cast<T *
>(
nullptr)) T(details::deref_noexcept(
2731 while (currentTailIndex != stopIndex) {
2732 new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2736 while (currentTailIndex != stopIndex) {
2737 new((*this->tailBlock)[currentTailIndex]) T(
details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T,
2738 decltype(*itemFirst),
2739 new(
static_cast<T *
>(
nullptr)) T(
2740 details::deref_noexcept(
2741 itemFirst)))>::eval(
2747 MOODYCAMEL_CATCH (...) {
2748 auto constructedStopIndex = currentTailIndex;
2749 auto lastBlockEnqueued = this->tailBlock;
2752 auto block = startBlock;
2753 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
2754 block = firstAllocatedBlock;
2756 currentTailIndex = startTailIndex;
2759 (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2760 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2761 stopIndex = constructedStopIndex;
2763 while (currentTailIndex != stopIndex) {
2764 (*block)[currentTailIndex++]->~T();
2766 if (block == lastBlockEnqueued) {
2769 block = block->next;
2773 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2774 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2775 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2776 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2777 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2778 rewind_block_index_tail();
2780 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2781 this->tailBlock = startBlock;
2786 if (this->tailBlock == endBlock) {
2787 assert(currentTailIndex == newTailIndex);
2790 this->tailBlock = this->tailBlock->next;
2792 this->tailIndex.store(newTailIndex, std::memory_order_release);
2796 #pragma warning(pop)
2799 template<
typename It>
2800 size_t dequeue_bulk(It &itemFirst,
size_t max) {
2801 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2802 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2804 static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2805 if (details::circular_less_than<size_t>(0, desiredCount)) {
2806 desiredCount = desiredCount < max ? desiredCount : max;
2807 std::atomic_thread_fence(std::memory_order_acquire);
2809 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2811 tail = this->tailIndex.load(std::memory_order_acquire);
2812 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2813 if (details::circular_less_than<size_t>(0, actualCount)) {
2814 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2815 if (actualCount < desiredCount) {
2816 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2821 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2824 auto index = firstIndex;
2825 BlockIndexHeader *localBlockIndex;
2826 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2828 auto blockStartIndex = index;
2829 index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2830 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ?
2831 firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2833 auto entry = localBlockIndex->index[indexIndex];
2834 auto block = entry->value.load(std::memory_order_relaxed);
2835 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T,
2837 details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2838 while (index != endIndex) {
2839 auto &el = *((*block)[index]);
2840 *itemFirst++ = std::move(el);
2846 while (index != endIndex) {
2847 auto &el = *((*block)[index]);
2848 *itemFirst = std::move(el);
2854 MOODYCAMEL_CATCH (...) {
2856 entry = localBlockIndex->index[indexIndex];
2857 block = entry->value.load(std::memory_order_relaxed);
2858 while (index != endIndex) {
2859 (*block)[index++]->~T();
2862 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
2863 static_cast<size_t>(
2865 - blockStartIndex))) {
2866 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2867 debug::DebugLock lock(mutex);
2869 entry->value.store(
nullptr, std::memory_order_relaxed);
2870 this->parent->add_block_to_free_list(block);
2872 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2874 blockStartIndex = index;
2875 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2877 details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ?
2878 firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2879 }
while (index != firstIndex + actualCount);
2884 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
2885 static_cast<size_t>(endIndex
2886 - blockStartIndex))) {
2888 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2889 debug::DebugLock lock(mutex);
2893 entry->value.store(
nullptr, std::memory_order_relaxed);
2895 this->parent->add_block_to_free_list(block);
2897 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2898 }
while (index != firstIndex + actualCount);
2902 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2911 static const index_t INVALID_BLOCK_BASE = 1;
2913 struct BlockIndexEntry {
2914 std::atomic<index_t> key;
2915 std::atomic<Block *> value;
2918 struct BlockIndexHeader {
2920 std::atomic<size_t> tail;
2921 BlockIndexEntry *entries;
2922 BlockIndexEntry **index;
2923 BlockIndexHeader *prev;
2926 template<AllocationMode allocMode>
2927 inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex) {
2928 auto localBlockIndex =
2929 blockIndex.load(std::memory_order_relaxed);
2930 if (localBlockIndex ==
nullptr) {
2933 size_t newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2934 idxEntry = localBlockIndex->index[newTail];
2935 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2936 idxEntry->value.load(std::memory_order_relaxed) ==
nullptr) {
2938 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2939 localBlockIndex->tail.store(newTail, std::memory_order_release);
2944 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2946 }
else if (!new_block_index()) {
2949 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2950 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2951 idxEntry = localBlockIndex->index[newTail];
2952 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2953 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2954 localBlockIndex->tail.store(newTail, std::memory_order_release);
2959 inline void rewind_block_index_tail() {
2960 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2961 localBlockIndex->tail.store(
2962 (localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1),
2963 std::memory_order_relaxed);
2966 inline BlockIndexEntry *get_block_index_entry_for_index(index_t index)
const {
2967 BlockIndexHeader *localBlockIndex;
2968 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2969 return localBlockIndex->index[idx];
2972 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex)
const {
2973 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2974 debug::DebugLock lock(mutex);
2976 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2977 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2978 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2979 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2980 assert(tailBase != INVALID_BLOCK_BASE);
2984 static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) / BLOCK_SIZE);
2985 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2986 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index
2987 && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) !=
nullptr);
2991 bool new_block_index() {
2992 auto prev = blockIndex.load(std::memory_order_relaxed);
2993 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
2994 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
2995 auto raw =
static_cast<char *
>((Traits::malloc)(
2996 sizeof(BlockIndexHeader) +
2997 std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(BlockIndexEntry) * entryCount +
2998 std::alignment_of<BlockIndexEntry *>::value - 1 +
sizeof(BlockIndexEntry *) * nextBlockIndexCapacity));
2999 if (raw ==
nullptr) {
3003 auto header =
new(raw) BlockIndexHeader;
3005 reinterpret_cast<BlockIndexEntry *
>(details::align_for<BlockIndexEntry>(raw +
sizeof(BlockIndexHeader)));
3006 auto index =
reinterpret_cast<BlockIndexEntry **
>(details::align_for<BlockIndexEntry *>(
3007 reinterpret_cast<char *
>(entries) +
sizeof(BlockIndexEntry) * entryCount));
3008 if (prev !=
nullptr) {
3009 auto prevTail = prev->tail.load(std::memory_order_relaxed);
3010 auto prevPos = prevTail;
3013 prevPos = (prevPos + 1) & (prev->capacity - 1);
3014 index[i++] = prev->index[prevPos];
3015 }
while (prevPos != prevTail);
3016 assert(i == prevCapacity);
3018 for (
size_t i = 0; i != entryCount; ++i) {
3019 new(entries + i) BlockIndexEntry;
3020 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3021 index[prevCapacity + i] = entries + i;
3023 header->prev = prev;
3024 header->entries = entries;
3025 header->index = index;
3026 header->capacity = nextBlockIndexCapacity;
3027 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3029 blockIndex.store(header, std::memory_order_release);
3031 nextBlockIndexCapacity <<= 1;
3037 size_t nextBlockIndexCapacity;
3038 std::atomic<BlockIndexHeader *> blockIndex;
3040 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3042 details::ThreadExitListener threadExitListener;
3046 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3048 ImplicitProducer* nextImplicitProducer;
3052 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3053 mutable debug::DebugMutex mutex;
3055 #ifdef MCDBGQ_TRACKMEM
3056 friend struct MemStats;
3065 void populate_initial_block_list(
size_t blockCount) {
3066 initialBlockPoolSize = blockCount;
3067 if (initialBlockPoolSize == 0) {
3068 initialBlockPool =
nullptr;
3072 initialBlockPool = create_array<Block>(blockCount);
3073 if (initialBlockPool ==
nullptr) {
3074 initialBlockPoolSize = 0;
3076 for (
size_t i = 0; i < initialBlockPoolSize; ++i) {
3077 initialBlockPool[i].dynamicallyAllocated =
false;
3081 inline Block *try_get_block_from_initial_pool() {
3082 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3086 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3088 return index < initialBlockPoolSize ? (initialBlockPool + index) :
nullptr;
3091 inline void add_block_to_free_list(Block *block) {
3092 #ifdef MCDBGQ_TRACKMEM
3093 block->owner =
nullptr;
3095 freeList.add(block);
3098 inline void add_blocks_to_free_list(Block *block) {
3099 while (block !=
nullptr) {
3100 auto next = block->next;
3101 add_block_to_free_list(block);
3106 inline Block *try_get_block_from_free_list() {
3107 return freeList.try_get();
3111 template<AllocationMode canAlloc>
3112 Block *requisition_block() {
3113 auto block = try_get_block_from_initial_pool();
3114 if (block !=
nullptr) {
3118 block = try_get_block_from_free_list();
3119 if (block !=
nullptr) {
3123 MOODYCAMEL_CONSTEXPR_IF (canAlloc == CanAlloc) {
3124 return create<Block>();
3130 #ifdef MCDBGQ_TRACKMEM
3133 size_t allocatedBlocks;
3136 size_t ownedBlocksExplicit;
3137 size_t ownedBlocksImplicit;
3138 size_t implicitProducers;
3139 size_t explicitProducers;
3140 size_t elementsEnqueued;
3141 size_t blockClassBytes;
3142 size_t queueClassBytes;
3143 size_t implicitBlockIndexBytes;
3144 size_t explicitBlockIndexBytes;
3151 MemStats stats = { 0 };
3153 stats.elementsEnqueued = q->size_approx();
3155 auto block = q->freeList.head_unsafe();
3156 while (block !=
nullptr) {
3157 ++stats.allocatedBlocks;
3159 block = block->freeListNext.load(std::memory_order_relaxed);
3162 for (
auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3163 bool implicit =
dynamic_cast<ImplicitProducer*
>(ptr) !=
nullptr;
3164 stats.implicitProducers += implicit ? 1 : 0;
3165 stats.explicitProducers += implicit ? 0 : 1;
3168 auto prod =
static_cast<ImplicitProducer*
>(ptr);
3169 stats.queueClassBytes +=
sizeof(ImplicitProducer);
3170 auto head = prod->headIndex.load(std::memory_order_relaxed);
3171 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3172 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3173 if (hash !=
nullptr) {
3174 for (
size_t i = 0; i != hash->capacity; ++i) {
3175 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3176 ++stats.allocatedBlocks;
3177 ++stats.ownedBlocksImplicit;
3180 stats.implicitBlockIndexBytes += hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3181 for (; hash !=
nullptr; hash = hash->prev) {
3182 stats.implicitBlockIndexBytes +=
sizeof(
typename ImplicitProducer::BlockIndexHeader) + hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3185 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3191 auto prod =
static_cast<ExplicitProducer*
>(ptr);
3192 stats.queueClassBytes +=
sizeof(ExplicitProducer);
3193 auto tailBlock = prod->tailBlock;
3194 bool wasNonEmpty =
false;
3195 if (tailBlock !=
nullptr) {
3196 auto block = tailBlock;
3198 ++stats.allocatedBlocks;
3199 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3201 wasNonEmpty = wasNonEmpty || block != tailBlock;
3203 ++stats.ownedBlocksExplicit;
3204 block = block->next;
3205 }
while (block != tailBlock);
3207 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3208 while (index !=
nullptr) {
3209 stats.explicitBlockIndexBytes +=
sizeof(
typename ExplicitProducer::BlockIndexHeader) + index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3210 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3215 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3216 stats.allocatedBlocks += freeOnInitialPool;
3217 stats.freeBlocks += freeOnInitialPool;
3219 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3227 MemStats getMemStats()
3229 return MemStats::getFor(
this);
3232 friend struct MemStats;
3240 ProducerBase *recycle_or_create_producer(
bool isExplicit) {
3242 return recycle_or_create_producer(isExplicit, recycled);
3245 ProducerBase *recycle_or_create_producer(
bool isExplicit,
bool &recycled) {
3246 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3247 debug::DebugLock lock(implicitProdMutex);
3250 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3251 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3252 bool expected =
true;
3253 if (ptr->inactive.compare_exchange_strong(expected,
3255 std::memory_order_acquire,
3256 std::memory_order_relaxed)) {
3265 return add_producer(isExplicit ?
static_cast<ProducerBase *
>(create<ExplicitProducer>(
this)) : create<
3266 ImplicitProducer>(
this));
3269 ProducerBase *add_producer(ProducerBase *producer) {
3271 if (producer ==
nullptr) {
3275 producerCount.fetch_add(1, std::memory_order_relaxed);
3278 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3280 producer->next = prevTail;
3281 }
while (!producerListTail.compare_exchange_weak(prevTail,
3283 std::memory_order_release,
3284 std::memory_order_relaxed));
3286 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3287 if (producer->isExplicit) {
3288 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3290 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3291 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit,
static_cast<ExplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3294 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3296 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3297 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit,
static_cast<ImplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3304 void reown_producers() {
3308 for (
auto ptr = producerListTail.load(std::memory_order_relaxed); ptr !=
nullptr; ptr = ptr->next_prod()) {
3318 struct ImplicitProducerKVP {
3319 std::atomic<details::thread_id_t> key;
3323 ImplicitProducerKVP() : value(
nullptr) {}
3325 ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3326 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3327 value = other.value;
3330 inline ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3335 inline void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT {
3336 if (
this != &other) {
3337 details::swap_relaxed(key, other.key);
3338 std::swap(value, other.value);
3343 template<
typename XT,
typename XTraits>
3347 struct ImplicitProducerHash {
3349 ImplicitProducerKVP *entries;
3350 ImplicitProducerHash *prev;
3353 inline void populate_initial_implicit_producer_hash() {
3354 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) {
3357 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3358 auto hash = &initialImplicitProducerHash;
3359 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3360 hash->entries = &initialImplicitProducerHashEntries[0];
3361 for (
size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3362 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3364 hash->prev =
nullptr;
3365 implicitProducerHash.store(hash, std::memory_order_relaxed);
3370 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) {
3374 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3375 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3376 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3378 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3380 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3381 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3382 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3384 ImplicitProducerHash *hash;
3385 for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3386 hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3389 hash->prev = &initialImplicitProducerHash;
3391 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3392 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3394 ImplicitProducerHash *hash;
3395 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3396 hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3399 hash->prev = &other.initialImplicitProducerHash;
3405 ImplicitProducer *get_or_add_implicit_producer() {
3416 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3417 debug::DebugLock lock(implicitProdMutex);
3420 auto id = details::thread_id();
3421 auto hashedId = details::hash_thread_id(
id);
3423 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3424 assert(mainHash !=
nullptr);
3425 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3427 auto index = hashedId;
3429 index &= hash->capacity - 1;
3431 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3432 if (probedKey ==
id) {
3438 auto value = hash->entries[index].value;
3439 if (hash != mainHash) {
3442 index &= mainHash->capacity - 1;
3443 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3444 auto empty = details::invalid_thread_id;
3445 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3446 auto reusable = details::invalid_thread_id2;
3447 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3448 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3450 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
3452 std::memory_order_relaxed,
3453 std::memory_order_relaxed))) {
3455 mainHash->entries[index].value = value;
3464 if (probedKey == details::invalid_thread_id) {
3472 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3475 if (newCount >= (mainHash->capacity >> 1)
3476 && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3481 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3482 if (newCount >= (mainHash->capacity >> 1)) {
3483 auto newCapacity = mainHash->capacity << 1;
3484 while (newCount >= (newCapacity >> 1)) {
3487 auto raw =
static_cast<char *
>((Traits::malloc)(
3488 sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1
3489 +
sizeof(ImplicitProducerKVP) * newCapacity));
3490 if (raw ==
nullptr) {
3492 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3493 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3497 auto newHash =
new(raw) ImplicitProducerHash;
3498 newHash->capacity =
static_cast<size_t>(newCapacity);
3499 newHash->entries =
reinterpret_cast<ImplicitProducerKVP *
>(details::align_for<ImplicitProducerKVP>(
3500 raw +
sizeof(ImplicitProducerHash)));
3501 for (
size_t i = 0; i != newCapacity; ++i) {
3502 new(newHash->entries + i) ImplicitProducerKVP;
3503 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3505 newHash->prev = mainHash;
3506 implicitProducerHash.store(newHash, std::memory_order_release);
3507 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3510 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3517 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3519 auto producer =
static_cast<ImplicitProducer *
>(recycle_or_create_producer(
false, recycled));
3520 if (producer ==
nullptr) {
3521 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3525 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3528 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3529 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3530 producer->threadExitListener.userData = producer;
3531 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3534 auto index = hashedId;
3536 index &= mainHash->capacity - 1;
3537 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3539 auto empty = details::invalid_thread_id;
3540 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3541 auto reusable = details::invalid_thread_id2;
3542 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3543 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3545 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
3547 std::memory_order_relaxed,
3548 std::memory_order_relaxed))) {
3550 mainHash->entries[index].value = producer;
3561 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3565 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3566 void implicit_producer_thread_exited(ImplicitProducer* producer)
3569 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3572 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3573 debug::DebugLock lock(implicitProdMutex);
3575 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3576 assert(hash !=
nullptr);
3577 auto id = details::thread_id();
3578 auto hashedId = details::hash_thread_id(
id);
3579 details::thread_id_t probedKey;
3583 for (; hash !=
nullptr; hash = hash->prev) {
3584 auto index = hashedId;
3586 index &= hash->capacity - 1;
3587 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3588 if (probedKey ==
id) {
3589 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3593 }
while (probedKey != details::invalid_thread_id);
3597 producer->inactive.store(
true, std::memory_order_release);
3600 static void implicit_producer_thread_exited_callback(
void* userData)
3602 auto producer =
static_cast<ImplicitProducer*
>(userData);
3603 auto queue = producer->parent;
3604 queue->implicit_producer_thread_exited(producer);
3612 template<
typename TAlign>
3613 static inline void *aligned_malloc(
size_t size) {
3614 MOODYCAMEL_CONSTEXPR_IF (std::alignment_of<TAlign>::value
3615 <= std::alignment_of<details::max_align_t>::value)
return (Traits::malloc)(size);
3617 size_t alignment = std::alignment_of<TAlign>::value;
3618 void *raw = (Traits::malloc)(size + alignment - 1 +
sizeof(
void *));
3621 char *ptr = details::align_for<TAlign>(
reinterpret_cast<char *
>(raw) +
sizeof(
void *));
3622 *(
reinterpret_cast<void **
>(ptr) - 1) = raw;
3627 template<
typename TAlign>
3628 static inline void aligned_free(
void *ptr) {
3629 MOODYCAMEL_CONSTEXPR_IF (std::alignment_of<TAlign>::value
3630 <= std::alignment_of<details::max_align_t>::value)
return (Traits::free)(ptr);
3632 (Traits::free)(ptr ? *(
reinterpret_cast<void **
>(ptr) - 1) :
nullptr);
3635 template<
typename U>
3636 static inline U *create_array(
size_t count) {
3638 U *p =
static_cast<U *
>(aligned_malloc<U>(
sizeof(U) * count));
3642 for (
size_t i = 0; i != count; ++i)
3647 template<
typename U>
3648 static inline void destroy_array(U *p,
size_t count) {
3651 for (
size_t i = count; i != 0;)
3657 template<
typename U>
3658 static inline U *create() {
3659 void *p = aligned_malloc<U>(
sizeof(U));
3660 return p !=
nullptr ?
new(p) U :
nullptr;
3663 template<
typename U,
typename A1>
3664 static inline U *create(A1 &&a1) {
3665 void *p = aligned_malloc<U>(
sizeof(U));
3666 return p !=
nullptr ?
new(p) U(std::forward<A1>(a1)) :
nullptr;
3669 template<
typename U>
3670 static inline void destroy(U *p) {
3677 std::atomic<ProducerBase *> producerListTail;
3678 std::atomic<std::uint32_t> producerCount;
3680 std::atomic<size_t> initialBlockPoolIndex;
3681 Block *initialBlockPool;
3682 size_t initialBlockPoolSize;
3684 #ifndef MCDBGQ_USEDEBUGFREELIST
3687 debug::DebugFreeList<Block> freeList;
3690 std::atomic<ImplicitProducerHash *> implicitProducerHash;
3691 std::atomic<size_t> implicitProducerHashCount;
3692 ImplicitProducerHash initialImplicitProducerHash;
3693 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3694 std::atomic_flag implicitProducerHashResizeInProgress;
3696 std::atomic<std::uint32_t> nextExplicitConsumerId;
3697 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3699 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3700 debug::DebugMutex implicitProdMutex;
3703 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3704 std::atomic<ExplicitProducer*> explicitProducers;
3705 std::atomic<ImplicitProducer*> implicitProducers;
3709 template<
typename T,
typename Traits>
3711 : producer(queue.recycle_or_create_producer(
true)) {
3712 if (producer !=
nullptr) {
3713 producer->token =
this;
3717 template<
typename T,
typename Traits>
3720 if (producer !=
nullptr) {
3721 producer->token =
this;
3725 template<
typename T,
typename Traits>
3727 : itemsConsumedFromCurrent(0), currentProducer(
nullptr), desiredProducer(
nullptr) {
3728 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3729 lastKnownGlobalOffset =
static_cast<std::uint32_t
>(-1);
3732 template<
typename T,
typename Traits>
3734 : itemsConsumedFromCurrent(0), currentProducer(
nullptr), desiredProducer(
nullptr) {
3736 std::memory_order_release);
3737 lastKnownGlobalOffset =
static_cast<std::uint32_t
>(-1);
3740 template<
typename T,
typename Traits>
3753 template<
typename T,
typename Traits>
3761 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3762 #pragma warning(pop)
3765 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3766 #pragma GCC diagnostic pop
Definition: concurrentqueue.h:430
Definition: concurrentqueue.h:752
Definition: concurrentqueue.h:334
Definition: concurrentqueue.h:700
Definition: concurrentqueue.h:640
Definition: concurrentqueue.h:434
Definition: concurrentqueue.h:445
Definition: concurrentqueue.h:306
Definition: concurrentqueue.h:469
Definition: concurrentqueue.h:265
Definition: concurrentqueue.h:550
Definition: concurrentqueue.h:527
Definition: concurrentqueue.h:621
Definition: concurrentqueue.h:633
Definition: concurrentqueue.h:85
Definition: concurrentqueue.h:321