AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
AbstractJP.h
1 
2 //
3 // Created by tony on 12/03/22.
4 //
5 
6 #ifndef _JOINPROCESSOR_ABSTRACTJP_H_
7 #define _JOINPROCESSOR_ABSTRACTJP_H_
9 #include <Common/Types.h>
10 #include <barrier>
12 #include <Utils/C20Buffers.hpp>
13 #include <JoinAlgo/JoinAlgoTable.h>
14 #include <memory>
15 namespace INTELLI {
16 class AbstractLazyJP;
38 class AbstractJP : public AbstractC20Thread {
39  protected:
40  TuplePtrQueue TuplePtrQueueInS;
41  TuplePtrQueue TuplePtrQueueInR;
42  // cmd linked to window slider
43  CmdQueuePtr cmdQueueIn;
44  CmdQueuePtr cmdQueueOut;
45  size_t sysId;
46  struct timeval timeSys;
47  bool timeBased = false;
48  size_t windowLen = 0;
49  size_t windowLenGlobal = 0;
50  size_t slideLenGlobal = 0;
51  size_t joinedResult = 0;
52  JoinAlgoTablePtr myAlgo;
53  //response of my self
54  void sendResponseCmd(join_cmd_t cmd) {
55  cmdQueueOut->push(cmd);
56  }
57  void sendAck() {
58  sendResponseCmd(CMD_ACK);
59  }
65  bool testCmd(join_cmd_t cmd) {
66  if (!cmdQueueIn->empty()) {
67  join_cmd_t cmdIn = *cmdQueueIn->front();
68  cmdQueueIn->pop();
69  if (cmdIn == cmd) {
70  return true;
71  }
72  }
73  return false;
74  }
79  virtual void inlineMain() {
80 
81  }
87  size_t oldestWindowBelong(size_t ts) {
88  if (ts < windowLenGlobal) {
89  return 0;
90  }
91  return ((ts - windowLenGlobal) / slideLenGlobal) + 1;
92  }
93  public:
94  AbstractJP() {}
95  ~ AbstractJP() {}
100  void inputCmd(join_cmd_t cmd) {
101  cmdQueueIn->push(cmd);
102  }
107  join_cmd_t waitResponse() {
108  while (cmdQueueOut->empty()) {}
109  join_cmd_t ru = *cmdQueueOut->front();
110  cmdQueueOut->pop();
111  return ru;
112  }
119  virtual void init(size_t sLen, size_t rLen, size_t _sysId) {
120  TuplePtrQueueInS = newTuplePtrQueue(sLen);
121  TuplePtrQueueInR = newTuplePtrQueue(rLen);
122  cmdQueueIn = newCmdQueue(1);
123  cmdQueueOut = newCmdQueue(1);
124  sysId = _sysId;
125  myAlgo = newJoinAlgoTable();
126  }
131  void setTimeBased(bool ts) {
132  timeBased = ts;
133  }
138  bool isTimeBased() {
139  return timeBased;
140  }
145  void setWindowLen(size_t wl) {
146  windowLen = wl;
147  }
153  void setGlobalWindow(size_t wlen, size_t sli) {
154  windowLenGlobal = wlen;
155  slideLenGlobal = sli;
156  }
161  virtual void feedTupleS(TuplePtr ts) {
162  TuplePtrQueueInS->push(ts);
163  }
168  virtual void feedTupleR(TuplePtr tr) {
169  TuplePtrQueueInR->push(tr);
170  }
175  void setTimeVal(struct timeval tv) {
176  timeSys = tv;
177  }
182  size_t getTimeStamp() {
183  return UtilityFunctions::timeLastUs(timeSys) / TIME_STEP;
184  }
189  size_t getJoinedResult() {
190  return joinedResult;
191  }
192  int cpuBind = -1;
197  void setCore(int id) {
198  cpuBind = id;
199  }
200 };
201 
202 }
208 #endif //ALIANCEDB_SRC_JOINPROCESSOR_ABSTRACTJP_H_
The base class and abstraction of C++20 thread, and it can be derived into other threads.
Definition: AbstractC20Thread.h:28
The basic class of join processor.
Definition: AbstractJP.h:38
void setCore(int id)
bind to specific core
Definition: AbstractJP.h:197
void inputCmd(join_cmd_t cmd)
input an outside command
Definition: AbstractJP.h:100
virtual void feedTupleR(TuplePtr tr)
feed a tuple r into the r input queue
Definition: AbstractJP.h:168
size_t getJoinedResult()
get the join results
Definition: AbstractJP.h:189
void setTimeVal(struct timeval tv)
set the timeval struct
Definition: AbstractJP.h:175
virtual void feedTupleS(TuplePtr ts)
feed a tuple s into the s input queue
Definition: AbstractJP.h:161
bool isTimeBased()
to read the window type
Definition: AbstractJP.h:138
virtual void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: AbstractJP.h:119
void setWindowLen(size_t wl)
to set the length of window
Definition: AbstractJP.h:145
bool testCmd(join_cmd_t cmd)
To test if a cmd is remained in queue and in desired cmd.
Definition: AbstractJP.h:65
size_t getTimeStamp()
Get the time stamp.
Definition: AbstractJP.h:182
void setTimeBased(bool ts)
to configure the window type
Definition: AbstractJP.h:131
void setGlobalWindow(size_t wlen, size_t sli)
set the window parameters of global window
Definition: AbstractJP.h:153
join_cmd_t waitResponse()
wait and return the response of this join processor
Definition: AbstractJP.h:107
size_t oldestWindowBelong(size_t ts)
To get the possible oldest window a time stamp belongs to.
Definition: AbstractJP.h:87
virtual void inlineMain()
The 'main' function of AbstractP.
Definition: AbstractJP.h:79
struct timeval timeSys
Definition: AbstractJP.h:46
#define newTuplePtrQueue(n)
To create a new TuplePtrQueue.
Definition: Types.h:236
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10