AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
AbstractWS.h
Go to the documentation of this file.
1 
2 #ifndef _WINDOWSLIDER_ABSTRACTWS_H_
3 #define _WINDOWSLIDER_ABSTRACTWS_H_
4 #include <cstdint>
5 #include <vector>
6 #include <Common/Types.h>
7 #include <Utils/SPSCQueue.hpp>
9 #include <time.h>
10 #include <numeric>
11 using namespace INTELLI;
12 using namespace std;
13 namespace INTELLI {
41 class AbstractWS {
42 
43  protected:
44 
45  INTELLI::join_type_t myType = INTELLI::CNT_BASED;
46  size_t countS, countR;
47  size_t windowLen = 0;
48  size_t slideLen = 1;
49  //partition
50  std::vector<size_t> partitionWeight;
51  std::vector<size_t> partitionSizeFinal;
52  size_t threads;
53  size_t sLen, rLen;
54  bool runTimeScheduling = false;
55  bool timeBased = false;
56  bool isRunning = false;
57  struct timeval timeSys;
58  TuplePtrQueue TuplePtrQueueInS;
59  TuplePtrQueue TuplePtrQueueInR;
60  string nameTag;
66  size_t oldestWindowBelong(size_t ts) {
67  if (ts < windowLen) {
68  return 0;
69  }
70  return ((ts - windowLen) / slideLen) + 1;
71  }
72  public:
73 
78  void setTimeBased(bool ts) {
79  timeBased = ts;
80  }
85  bool isTimeBased() {
86  return timeBased;
87  }
88 
93  void setRunTimeScheduling(bool r) {
94  runTimeScheduling = r;
95  }
101  return runTimeScheduling;
102  }
107  void setWindowLen(size_t wl) {
108  windowLen = wl;
109  }
114  void setSlideLen(size_t sli) {
115  slideLen = sli;
116  }
121  size_t getSlideLen() {
122  return slideLen;
123  }
128  void setParallelSMP(size_t threads) {
129  partitionWeight = std::vector<size_t>(threads);
130  for (size_t i = 0; i < threads; i++) {
131  partitionWeight[i] = 1;
132  }
133  }
137  void reset() {
138  countS = 0;
139  countR = 0;
140 
141  gettimeofday(&timeSys, NULL);
142  }
143 
144  AbstractWS() {
145  reset();
146  }
151  size_t getTimeStamp() {
152 
153  return UtilityFunctions::timeLastUs(timeSys) / TIME_STEP;
154  }
155  //init with length of queue
161  AbstractWS(size_t sLen, size_t rLen);
162  //feed the tuple S
168  virtual void feedTupleS(TuplePtr ts) {
169  //assert(ts);
170  TuplePtrQueueInS->push(ts);
171  }
172  //feed the tuple R
178  virtual void feedTupleR(TuplePtr tr) {
179  //assert(tr);
180  TuplePtrQueueInR->push(tr);
181  }
182  ~AbstractWS();
183 
184  //init the join processors
189  virtual void initJoinProcessors() {
190 
191  }
195  virtual void terminateJoinProcessors() {
196 
197  }
201  virtual void waitAckFromJoinProcessors() {
202 
203  }
204  //get the join result
211  virtual size_t getJoinResult() {
212  return 0;
213  }
218  struct timeval getSysTime() {
219  return timeSys;
220  }
225  void setSysTime(struct timeval tv) {
226  timeSys = tv;
227  }
232  string getName() {
233  return nameTag;
234  }
235 };
236 }
237 #endif
An abstraction of window slider, also inherited by both eager and lazy.
Definition: AbstractWS.h:41
AbstractWS(size_t sLen, size_t rLen)
to init the slider with specific length of queue
virtual void waitAckFromJoinProcessors()
to wait the response of join processors
Definition: AbstractWS.h:201
void reset()
reset everything needed
Definition: AbstractWS.h:137
bool isTimeBased()
to read the window type
Definition: AbstractWS.h:85
void setParallelSMP(size_t threads)
to set the parallel level under SMP model
Definition: AbstractWS.h:128
bool isRunTimeScheduling()
to read the scheduling type
Definition: AbstractWS.h:100
size_t getTimeStamp()
to get the time stamp
Definition: AbstractWS.h:151
virtual void feedTupleR(TuplePtr tr)
to feed a tuple R
Definition: AbstractWS.h:178
void setRunTimeScheduling(bool r)
to configure the scheduling place
Definition: AbstractWS.h:93
string getName()
get the name of the slider
Definition: AbstractWS.h:232
virtual void feedTupleS(TuplePtr ts)
to feed a tuple s
Definition: AbstractWS.h:168
size_t oldestWindowBelong(size_t ts)
To get the possible oldest a time stamp belongs to.
Definition: AbstractWS.h:66
virtual void initJoinProcessors()
to init the initJoinProcessors
Definition: AbstractWS.h:189
virtual size_t getJoinResult()
to get the result of join
Definition: AbstractWS.h:211
size_t getSlideLen()
get the length of slide
Definition: AbstractWS.h:121
void setWindowLen(size_t wl)
to set the length of window
Definition: AbstractWS.h:107
void setTimeBased(bool ts)
to configure the window type
Definition: AbstractWS.h:78
virtual void terminateJoinProcessors()
to terminate the join processors
Definition: AbstractWS.h:195
void setSysTime(struct timeval tv)
Set the time structure value ,the timeSys member.
Definition: AbstractWS.h:225
void setSlideLen(size_t sli)
set the length of slide
Definition: AbstractWS.h:114
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10