11 #include <type_traits>
15 #include <condition_variable>
18 using namespace std::literals::chrono_literals;
22 template<
typename T,
typename Allocator = std::allocator<T>>
25 #if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
26 template<
typename Alloc2,
typename =
void>
27 struct has_allocate_at_least : std::false_type {
30 template<
typename Alloc2>
31 struct has_allocate_at_least<
32 Alloc2, std::void_t<typename Alloc2::value_type,
33 decltype(std::declval<Alloc2 &>().allocate_at_least(
34 size_t{}))>> : std::true_type {
40 pthread_mutex_t mutex;
42 const Allocator &allocator = Allocator())
43 : capacity_(capacity), allocator_(allocator) {
51 if (capacity_ > SIZE_MAX - 2 * kPadding) {
52 capacity_ = SIZE_MAX - 2 * kPadding;
55 #if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
56 if constexpr (has_allocate_at_least<Allocator>::value) {
57 auto res = allocator_.allocate_at_least(capacity_ + 2 * kPadding);
59 capacity_ = res.count - 2 * kPadding;
61 slots_ = std::allocator_traits<Allocator>::allocate(
62 allocator_, capacity_ + 2 * kPadding);
65 slots_ = std::allocator_traits<Allocator>::allocate(
66 allocator_, capacity_ + 2 * kPadding);
69 static_assert(
alignof(
SPSCQueue<T>) == kCacheLineSize,
"");
70 static_assert(
sizeof(
SPSCQueue<T>) >= 3 * kCacheLineSize,
"");
71 assert(
reinterpret_cast<char *
>(&readIdx_) -
72 reinterpret_cast<char *
>(&writeIdx_) >=
73 static_cast<std::ptrdiff_t
>(kCacheLineSize));
80 std::allocator_traits<Allocator>::deallocate(allocator_, slots_,
81 capacity_ + 2 * kPadding);
89 condition_variable g_con;
91 void wakeUpSink(
void) {
99 void waitForSource(
void) {
100 std::unique_lock<std::mutex> lock(g_mutex);
111 template<
typename... Args>
112 void emplace(Args &&...args)
114 std::is_nothrow_constructible<T, Args && ...>::value) {
115 static_assert(std::is_constructible<T, Args &&...>::value,
116 "T must be constructible with Args&&...");
117 auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
118 auto nextWriteIdx = writeIdx + 1;
119 if (nextWriteIdx == capacity_) {
122 while (nextWriteIdx == readIdxCache_) {
123 readIdxCache_ = readIdx_.load(std::memory_order_acquire);
125 new(&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
126 writeIdx_.store(nextWriteIdx, std::memory_order_release);
129 template<
typename... Args>
130 bool try_emplace(Args &&...args)
132 std::is_nothrow_constructible<T, Args && ...>::value) {
133 static_assert(std::is_constructible<T, Args &&...>::value,
134 "T must be constructible with Args&&...");
135 auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
136 auto nextWriteIdx = writeIdx + 1;
137 if (nextWriteIdx == capacity_) {
140 if (nextWriteIdx == readIdxCache_) {
141 readIdxCache_ = readIdx_.load(std::memory_order_acquire);
142 if (nextWriteIdx == readIdxCache_) {
146 new(&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
147 writeIdx_.store(nextWriteIdx, std::memory_order_release);
151 void push(
const T &v)
152 noexcept(std::is_nothrow_copy_constructible<T>::value) {
153 static_assert(std::is_copy_constructible<T>::value,
154 "T must be copy constructible");
159 template<
typename P,
typename =
typename std::enable_if<
160 std::is_constructible<T, P &&>::value>::type>
162 noexcept(std::is_nothrow_constructible<T, P &&>::value) {
163 emplace(std::forward<P>(v));
168 noexcept(std::is_nothrow_copy_constructible<T>::value) {
169 static_assert(std::is_copy_constructible<T>::value,
170 "T must be copy constructible");
171 return try_emplace(v);
174 template<
typename P,
typename =
typename std::enable_if<
175 std::is_constructible<T, P &&>::value>::type>
177 noexcept(std::is_nothrow_constructible<T, P &&>::value) {
178 return try_emplace(std::forward<P>(v));
183 auto const readIdx = readIdx_.load(std::memory_order_relaxed);
184 if (readIdx == writeIdxCache_) {
185 writeIdxCache_ = writeIdx_.load(std::memory_order_acquire);
186 if (writeIdxCache_ == readIdx) {
190 return &slots_[readIdx + kPadding];
195 static_assert(std::is_nothrow_destructible<T>::value,
196 "T must be nothrow destructible");
197 auto const readIdx = readIdx_.load(std::memory_order_relaxed);
198 assert(writeIdx_.load(std::memory_order_acquire) != readIdx);
199 slots_[readIdx + kPadding].~T();
200 auto nextReadIdx = readIdx + 1;
201 if (nextReadIdx == capacity_) {
204 readIdx_.store(nextReadIdx, std::memory_order_release);
209 std::ptrdiff_t diff = writeIdx_.load(std::memory_order_acquire) -
210 readIdx_.load(std::memory_order_acquire);
214 return static_cast<size_t>(diff);
224 size_t capacity()
const
225 noexcept {
return capacity_ - 1; }
228 #ifdef __cpp_lib_hardware_interference_size
229 static constexpr
size_t kCacheLineSize =
230 std::hardware_destructive_interference_size;
232 static constexpr
size_t
237 static constexpr
size_t
238 kPadding = (kCacheLineSize - 1) /
sizeof(T) + 1;
243 #if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
244 Allocator allocator_ [[no_unique_address]];
246 Allocator allocator_;
252 alignas(kCacheLineSize)
253 std::atomic<size_t> writeIdx_ = {0};
254 alignas(kCacheLineSize)
255 size_t readIdxCache_ = 0;
256 alignas(kCacheLineSize)
257 std::atomic<size_t> readIdx_ = {0};
258 alignas(kCacheLineSize)
259 size_t writeIdxCache_ = 0;
263 char padding_[kCacheLineSize -
sizeof(writeIdxCache_)];
Definition: SPSCQueue.hpp:23
Definition: DatasetTool.h:10