AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
SafeQueue.hpp
1 #pragma once
2 #ifndef _SAFEQUEUE_H_
3 #define _SAFEQUEUE_H_
4 #include <iostream>
5 #include <thread>
6 #include <condition_variable>
7 #include <mutex>
8 #include <memory>
9 #include <algorithm>
10 #include <vector>
11 #include <string>
12 #include <queue>
13 #include <iostream>
14 #include <thread>
15 #include <condition_variable>
16 #include <mutex>
17 #include <memory>
18 #include <algorithm>
19 #include <vector>
20 #include <string>
21 #include <queue>
22 #include <barrier>
23 
24 using namespace std;
25 namespace std {
26 typedef std::shared_ptr<std::barrier<>> BarrierPtr;
27 template<typename T>
28 class SafeQueue {
29  public:
30  mutex m_mut;
31  condition_variable m_cond;
32  queue<T> m_queue;
33 
34  public:
35  SafeQueue() {}
36  SafeQueue(const SafeQueue &rhs) {
37  lock_guard<mutex> lk(rhs.m_mut);
38  m_queue = rhs.m_queue;
39  }
40  void push(T data) {
41  lock_guard<mutex> lk(m_mut); // 使用效率高的lock_guard
42  m_queue.push(move(data)); // 移动构造函数,防止对象不能拷贝
43  m_cond.notify_one(); // 通知唤醒阻塞的一个线程
44  }
45  void pushNoMove(T data) {
46  lock_guard<mutex> lk(m_mut); // 使用效率高的lock_guard
47  m_queue.push(data); // 移动构造函数,防止对象不能拷贝
48  m_cond.notify_one(); // 通知唤醒阻塞的一个线程
49  }
50 
51  void waitAndPop(T &res) // 直到队列不为空
52  {
53  unique_lock<mutex> lk(m_mut);
54  m_cond.wait(lk, [this] { return !m_queue.empty(); });
55  res = move(m_queue.front());
56  m_queue.pop();
57  }
58  auto pop() {
59  unique_lock<mutex> lk(m_mut);
60  m_cond.wait(lk, [this] { return !m_queue.empty(); });
61  move(m_queue.front());
62  return m_queue.pop();
63  }
64  auto size() {
65  lock_guard<mutex> lk(m_mut); // 使用效率高的lock_guard
66  auto sz = m_queue.size();
67  return sz;
68  }
69  auto empty() {
70  lock_guard<mutex> lk(m_mut); // 使用效率高的lock_guard
71  auto sz = m_queue.empty();
72 
73  return sz;
74  }
75  auto front() {
76  lock_guard<mutex> lk(m_mut); // 使用效率高的lock_guard
77  auto fr = m_queue.front();
78  return fr;
79  }
80  void operator=(const SafeQueue &D) {
81  m_queue = D.m_queue;
82  }
83  void operator=(const std::queue<T> &D) {
84  m_queue = D;
85  }
86  void waitAndPopNoMOve(T &res) // 直到队列不为空
87  {
88  unique_lock<mutex> lk(m_mut);
89  m_cond.wait(lk, [this] { return !m_queue.empty(); });
90  m_queue.pop();
91  }
92  bool tryPop(T &res) //立即返回
93  {
94  lock_guard<mutex> lk(m_mut);
95  if (m_queue.empty())
96  return false;
97  res = move(m_queue.front());
98  return true;
99  }
100  /*
101  下面这种是由返回值返回元素。
102  */
103  shared_ptr<T> waitAndPop() {
104  unique_lock<mutex> lk(m_mut);
105  m_cond.wait(lk, [this] { return !m_queue.empty(); });
106  shared_ptr<T> res(make_shared<T>(move(m_queue.front())));
107  m_queue.pop();
108  return res;
109  }
110 
111  shared_ptr<T> tryPop() {
112  lock_guard<mutex> lk(m_mut);
113  if (m_queue.empty())
114  return NULL;
115  shared_ptr<T> res(make_shared<T>(move(m_queue.front())));
116  m_queue.pop();
117  return res;
118  }
119 };
120 
121 typedef std::shared_ptr<SafeQueue<int>> SafeQueuePtr;
122 }
123 
124 #endif
Definition: SafeQueue.hpp:28