AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
Types.h
Go to the documentation of this file.
1 
2 //
3 // Created by Wang Chenyu on 1/9/21.
4 //
5 #ifndef UNUSED
6 #define UNUSED(x) (void)(x)
7 #endif
8 
9 #ifndef _INTELLISTREAM_TYPES_H
10 #define _INTELLISTREAM_TYPES_H
11 
12 #ifndef ALGO_NAME
13 
14 //#define ALGO_NAME "OneWayHashJoin"
15 #define ALGO_NAME "CellJoin"
16 //#define ALGO_NAME "HandShakeJoin"
17 
18 #endif
19 
20 #ifndef ALGO_CLASS
21 
22 //#define ALGO_CLASS OneWayHashJoin
23 #define ALGO_CLASS CellJoin
24 //#define ALGO_CLASS HandShakeJoin
25 
26 #endif
27 
28 //Constants
29 #ifndef WINDOW_SIZE
30 #define WINDOW_SIZE 500
31 #endif
32 
33 #ifndef THREAD_NUMBER
34 #define THREAD_NUMBER 2
35 #endif
36 
37 #ifndef TIME_STEP
38 #define TIME_STEP 20// US
39 #endif
40 #ifndef DATASET_NAME
41 #define DATASET_NAME "Test1" //dataset name should be DATASET_NAME + "-R.txt" and DATASET_NAME + "-S.txt"
42 //in Test2, we manually assigned duplicated keys
43 #endif
44 
45 #include <cstdint>
46 #include <vector>
47 #include <memory>
48 #include <mutex>
49 #include <unordered_map>
50 #include <queue>
51 #include "Utils/concurrentqueue.h"
52 #include "Utils/DupicatedHashTable.hpp"
53 #include "Utils/SafeQueue.hpp"
54 #include "Utils/SPSCQueue.hpp"
55 
93 namespace INTELLI {
103 //Pointers
104 
105 
106 //Alias
107 typedef uint64_t keyType;
108 typedef uint64_t valueType;
109 typedef int numberType; //for counting the number of datagram (eg: tuples) in a struct
110 typedef std::mutex mutex;
113 typedef std::queue<numberType> tupleKeyQueue;
114 
119 class Tuple {
120  public:
123  // The subKey can be either time-stamp or arrival count, which is assigned by join system
124  // We use the subKey to do tuple expiration, which covers both count-based and time-based window
125  size_t subKey = 0;
143  Tuple(keyType k, valueType v, size_t sk);
144  ~Tuple();
145 };
150 typedef std::shared_ptr<class Tuple> TuplePtr;
151 typedef std::shared_ptr<class RelationCouple> RelationCouplePtr;
152 typedef std::shared_ptr<std::barrier<>> BarrierPtr;
153 //Array Pointers
154 typedef std::vector<TuplePtr> WindowOfTuples;
155 
156 //typedef std::SafeQueue<TuplePtr> TuplePtrQueueIn;
162 typedef std::queue<TuplePtr> TuplePtrQueueIn;
165  public:
166  TuplePtrQueueIn relationS;
167  TuplePtrQueueIn relationR;
168  RelationCouple();
169  ~RelationCouple();
170 };
171 
173  public:
174  //We use queue to implement a window, it stores the sequence of tuples.
175  TuplePtrQueueIn windowS;
176  TuplePtrQueueIn windowR;
177  hashtable hashtableS;
178  hashtable hashtableR;
179 
180  numberType windowSize;
181  numberType windowSizeR;
182  numberType windowSizeS;
183 
184  mutex windowLock;
185 
186  WindowCouple(numberType windowSize);
187  WindowCouple(numberType windowSizeR, numberType windowSizeS);
188 };
189 
191  public:
192  //We use queue to implement a window, it stores the sequence of tuples.
193  //Concurrent Q but not concurrent hashtable
194  //A sub-window that can be obtained from other threads
195  concurrentTupleQueue windowS;
196  concurrentTupleQueue windowR;
197  hashtable hashtableS;
198  hashtable hashtableR;
199 
200  numberType windowSize;
201  numberType windowSizeR;
202  numberType windowSizeS;
203 
204  mutex windowLock;
205 
206  ConcurrentQWindowCouple(numberType windowSize);
207  ConcurrentQWindowCouple(numberType windowSizeR, numberType windowSizeS);
208 };
209 
210 class Result {
211  public:
212  numberType joinNumber;
213  numberType streamSize;
214  string algoName;
215  string dataSetName;
216  double timeTaken;
217  struct timeval timeBegin;
218  explicit Result();
219  Result operator++(int);
220  void statPrinter();
221 };
228 typedef std::shared_ptr<INTELLI::SPSCQueue<INTELLI::TuplePtr>> TuplePtrQueue;
229 typedef std::shared_ptr<std::queue<INTELLI::TuplePtr>> TupleQueueSelfPtr;
230 typedef std::shared_ptr<INTELLI::SPSCQueue<vector<INTELLI::TuplePtr>>> WindowQueue;
236 #define newTuplePtrQueue(n) make_shared<INTELLI::SPSCQueue<INTELLI::TuplePtr>>(n)
237 #define newWindowQueue(n) make_shared<INTELLI::SPSCQueue<WindowOfTuples>>(n)
238 typedef enum {
239  CNT_BASED = 1,
240  TIME_STAMP_BASED = 2,
241 } join_type_t;
242 typedef enum {
243  CMD_ACK = 1,
244  CMD_STOP = 2,
245  //S is a window, R is a Tuple
246  CMD_NEXT_WSTR,
247  CMD_NEXT_WSTS,
248  CMD_NEXT_TSWR,
249  CMD_NEXT_TSTR,
250  CMD_NEXT_TS_ONLY,
251  CMD_NEXT_TR_ONLY,
252 } join_cmd_t;
253 typedef std::shared_ptr<INTELLI::SPSCQueue<INTELLI::join_cmd_t>> CmdQueuePtr;
254 #define newCmdQueue(n) make_shared<INTELLI::SPSCQueue<INTELLI::join_cmd_t>>(n)
255 
256 }
260 #endif //INTELLISTREAM_TYPES_H
Definition: Types.h:190
Definition: Types.h:164
Definition: Types.h:210
The class to describe a tuple.
Definition: Types.h:119
Definition: Types.h:172
Tuple(keyType k, valueType v, size_t sk)
construct with key, value and subkey
Tuple(keyType k, valueType v)
construct with key and value
size_t subKey
Definition: Types.h:125
uint64_t valueType
Definition: Types.h:108
valueType payload
Definition: Types.h:122
keyType key
Definition: Types.h:121
INTELLI::DupicatedHashTable< keyType, keyType > hashtable
Definition: Types.h:111
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
std::queue< TuplePtr > TuplePtrQueueIn
To describe a local queue of TuplePtr.
Definition: Types.h:162
Tuple(keyType k)
construct with key
uint64_t keyType
Definition: Types.h:107
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10