AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
SPSCQueue.hpp
1 // Copyright (C) 2021 by the IntelliStream team (https://github.com/intellistream)
2 
3 #pragma once
4 
5 #include <atomic>
6 #include <cassert>
7 #include <cstddef>
8 #include <memory> // std::allocator
9 #include <new> // std::hardware_destructive_interference_size
10 #include <stdexcept>
11 #include <type_traits> // std::enable_if, std::is_*_constructible
12 #include <thread>
13 #include <mutex>
14 #include <iostream>
15 #include <condition_variable>
16 #include <pthread.h>
17 #include <sys/time.h>
18 using namespace std::literals::chrono_literals;
19 using namespace std;
20 
21 namespace INTELLI {
22 template<typename T, typename Allocator = std::allocator<T>>
23 class SPSCQueue {
24 
25 #if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
26  template<typename Alloc2, typename = void>
27  struct has_allocate_at_least : std::false_type {
28  };
29 
30  template<typename Alloc2>
31  struct has_allocate_at_least<
32  Alloc2, std::void_t<typename Alloc2::value_type,
33  decltype(std::declval<Alloc2 &>().allocate_at_least(
34  size_t{}))>> : std::true_type {
35  };
36 #endif
37 
38  public:
39  pthread_cond_t cond;
40  pthread_mutex_t mutex;
41  explicit SPSCQueue(const size_t capacity,
42  const Allocator &allocator = Allocator())
43  : capacity_(capacity), allocator_(allocator) {
44 
45  // The queue needs at least one element
46  if (capacity_ < 1) {
47  capacity_ = 1;
48  }
49  capacity_++; // Needs one slack element
50  // Prevent overflowing size_t
51  if (capacity_ > SIZE_MAX - 2 * kPadding) {
52  capacity_ = SIZE_MAX - 2 * kPadding;
53  }
54 
55 #if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
56  if constexpr (has_allocate_at_least<Allocator>::value) {
57  auto res = allocator_.allocate_at_least(capacity_ + 2 * kPadding);
58  slots_ = res.ptr;
59  capacity_ = res.count - 2 * kPadding;
60  } else {
61  slots_ = std::allocator_traits<Allocator>::allocate(
62  allocator_, capacity_ + 2 * kPadding);
63  }
64 #else
65  slots_ = std::allocator_traits<Allocator>::allocate(
66  allocator_, capacity_ + 2 * kPadding);
67 #endif
68 
69  static_assert(alignof(SPSCQueue<T>) == kCacheLineSize, "");
70  static_assert(sizeof(SPSCQueue<T>) >= 3 * kCacheLineSize, "");
71  assert(reinterpret_cast<char *>(&readIdx_) -
72  reinterpret_cast<char *>(&writeIdx_) >=
73  static_cast<std::ptrdiff_t>(kCacheLineSize));
74  }
75 
76  ~SPSCQueue() {
77  while (front()) {
78  pop();
79  }
80  std::allocator_traits<Allocator>::deallocate(allocator_, slots_,
81  capacity_ + 2 * kPadding);
82  }
83 
84  // non-copyable and non-movable
85  SPSCQueue(const SPSCQueue &) = delete;
86 
87  SPSCQueue &operator=(const SPSCQueue &) = delete;
88  std::mutex g_mutex;
89  condition_variable g_con;
90 
91  void wakeUpSink(void) {
92  //std::unique_lock<std::mutex> lock(g_mutex);
93 
94  g_con.notify_one();
95 
96 
97  //lock.unlock();
98  }
99  void waitForSource(void) { // printf("enter sleep\r\n");
100  std::unique_lock<std::mutex> lock(g_mutex);
101  g_con.wait(lock);
102 
103  // printf("end sleep\r\n");
104  // pthread_mutex_lock(&mutex);
105 
106  // pthread_mutex_unlock(&mutex);
107  //
108 
109 
110  }
111  template<typename... Args>
112  void emplace(Args &&...args)
113  noexcept(
114  std::is_nothrow_constructible<T, Args && ...>::value) {
115  static_assert(std::is_constructible<T, Args &&...>::value,
116  "T must be constructible with Args&&...");
117  auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
118  auto nextWriteIdx = writeIdx + 1;
119  if (nextWriteIdx == capacity_) {
120  nextWriteIdx = 0;
121  }
122  while (nextWriteIdx == readIdxCache_) {
123  readIdxCache_ = readIdx_.load(std::memory_order_acquire);
124  }
125  new(&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
126  writeIdx_.store(nextWriteIdx, std::memory_order_release);
127  }
128 
129  template<typename... Args>
130  bool try_emplace(Args &&...args)
131  noexcept(
132  std::is_nothrow_constructible<T, Args && ...>::value) {
133  static_assert(std::is_constructible<T, Args &&...>::value,
134  "T must be constructible with Args&&...");
135  auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
136  auto nextWriteIdx = writeIdx + 1;
137  if (nextWriteIdx == capacity_) {
138  nextWriteIdx = 0;
139  }
140  if (nextWriteIdx == readIdxCache_) {
141  readIdxCache_ = readIdx_.load(std::memory_order_acquire);
142  if (nextWriteIdx == readIdxCache_) {
143  return false;
144  }
145  }
146  new(&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
147  writeIdx_.store(nextWriteIdx, std::memory_order_release);
148  return true;
149  }
150 
151  void push(const T &v)
152  noexcept(std::is_nothrow_copy_constructible<T>::value) {
153  static_assert(std::is_copy_constructible<T>::value,
154  "T must be copy constructible");
155  emplace(v);
156  // g_con.notify_all();
157  }
158 
159  template<typename P, typename = typename std::enable_if<
160  std::is_constructible<T, P &&>::value>::type>
161  void push(P &&v)
162  noexcept(std::is_nothrow_constructible<T, P &&>::value) {
163  emplace(std::forward<P>(v));
164  }
165 
166  bool
167  try_push(const T &v)
168  noexcept(std::is_nothrow_copy_constructible<T>::value) {
169  static_assert(std::is_copy_constructible<T>::value,
170  "T must be copy constructible");
171  return try_emplace(v);
172  }
173 
174  template<typename P, typename = typename std::enable_if<
175  std::is_constructible<T, P &&>::value>::type>
176  bool try_push(P &&v)
177  noexcept(std::is_nothrow_constructible<T, P &&>::value) {
178  return try_emplace(std::forward<P>(v));
179  }
180 
181  T *front()
182  noexcept {
183  auto const readIdx = readIdx_.load(std::memory_order_relaxed);
184  if (readIdx == writeIdxCache_) {
185  writeIdxCache_ = writeIdx_.load(std::memory_order_acquire);
186  if (writeIdxCache_ == readIdx) {
187  return nullptr;
188  }
189  }
190  return &slots_[readIdx + kPadding];
191  }
192 
193  void pop()
194  noexcept {
195  static_assert(std::is_nothrow_destructible<T>::value,
196  "T must be nothrow destructible");
197  auto const readIdx = readIdx_.load(std::memory_order_relaxed);
198  assert(writeIdx_.load(std::memory_order_acquire) != readIdx);
199  slots_[readIdx + kPadding].~T();
200  auto nextReadIdx = readIdx + 1;
201  if (nextReadIdx == capacity_) {
202  nextReadIdx = 0;
203  }
204  readIdx_.store(nextReadIdx, std::memory_order_release);
205  }
206 
207  size_t size() const
208  noexcept {
209  std::ptrdiff_t diff = writeIdx_.load(std::memory_order_acquire) -
210  readIdx_.load(std::memory_order_acquire);
211  if (diff < 0) {
212  diff += capacity_;
213  }
214  return static_cast<size_t>(diff);
215  }
216 
217  bool empty() const
218  noexcept {
219 
220  return size() == 0;
221 
222  }
223 
224  size_t capacity() const
225  noexcept { return capacity_ - 1; }
226 
227  private:
228 #ifdef __cpp_lib_hardware_interference_size
229  static constexpr size_t kCacheLineSize =
230  std::hardware_destructive_interference_size;
231 #else
232  static constexpr size_t
233  kCacheLineSize = 64;
234 #endif
235 
236  // Padding to avoid false sharing between slots_ and adjacent allocations
237  static constexpr size_t
238  kPadding = (kCacheLineSize - 1) / sizeof(T) + 1;
239 
240  private:
241  size_t capacity_;
242  T *slots_;
243 #if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
244  Allocator allocator_ [[no_unique_address]];
245 #else
246  Allocator allocator_;
247 #endif
248 
249  // Align to cache line size in order to avoid false sharing
250  // readIdxCache_ and writeIdxCache_ is used to reduce the amount of cache
251  // coherency traffic
252  alignas(kCacheLineSize)
253  std::atomic<size_t> writeIdx_ = {0};
254  alignas(kCacheLineSize)
255  size_t readIdxCache_ = 0;
256  alignas(kCacheLineSize)
257  std::atomic<size_t> readIdx_ = {0};
258  alignas(kCacheLineSize)
259  size_t writeIdxCache_ = 0;
260 
261  // Padding to avoid adjacent allocations to share cache line with
262  // writeIdxCache_
263  char padding_[kCacheLineSize - sizeof(writeIdxCache_)];
264 };
265 } // namespace rigtorp
Definition: SPSCQueue.hpp:23
Definition: DatasetTool.h:10