AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
SplitJoinJP.h
Go to the documentation of this file.
1 
2 //
3 // Created by tony on 18/03/22.
4 //
5 
6 #ifndef _JOINPROCESSOR_SPLITJOINJP_H_
7 #define _JOINPROCESSOR_SPLITJOINJP_H_
8 #include <thread>
9 #include <Common/Types.h>
11 #include <barrier>
12 
13 #include <JoinProcessor/AbstractJP.h>
14 using namespace INTELLI;
15 using namespace std;
16 namespace INTELLI {
22 class SplitJoinJP : public AbstractJP {
23  protected:
24  virtual void inlineMain();
33  /*
34  * @brief A 'current window' copy of TuplePtrQueueLocalS when join a tuple
35  */
36  //C20Buffer<TuplePtr> windowS;
37  /*
38  * @brief A 'current window' copy of TuplePtrQueueLocalR when join a tuple
39  */
40  // C20Buffer<TuplePtr> windowR;
41  size_t sCnt = 0;
42  size_t sMax = 0;
43  void expireS(size_t cond);
44  void expireR(size_t cond);
45  void joinS(TuplePtr ts);
46  void joinR(TuplePtr tr);
47  public:
48  SplitJoinJP() {
49 
50  }
51  ~SplitJoinJP() {
52 
53  }
60  void init(size_t sLen, size_t rLen, size_t _sysId) {
61  AbstractJP::init(sLen, rLen, _sysId);
62  TuplePtrQueueLocalS = newTuplePtrQueue(sLen);
63  TuplePtrQueueLocalR = newTuplePtrQueue(rLen);
64  //windowS = C20Buffer<TuplePtr>(sLen);
65  // windowR = C20Buffer<TuplePtr>(rLen);
66  sCnt = 0;
67  }
72  void setMaxSCnt(size_t ms) {
73  sMax = ms;
74  }
75  /*
76  * @brief feed a tuple s into the s input queue
77  * @param ts The tuple
78  *//*
79  virtual void feedTupleS(TuplePtr ts) {
80  sCnt++;
81  if (sCnt == sysId + 1) //should process this S
82  {
83  TuplePtrQueueInS->push(ts);
84  }
85  if (sCnt == sMax) {
86  sCnt = 0;
87  }
88  }*/
89 };
90 typedef std::shared_ptr<SplitJoinJP> SplitJoinJPPtr;
91 }
92 #endif //ALIANCEDB_INCLUDE_JOINPROCESSOR_SPLITJP_H_
The basic class of join processor.
Definition: AbstractJP.h:38
virtual void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: AbstractJP.h:119
The class of split join join processor.
Definition: SplitJoinJP.h:22
void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: SplitJoinJP.h:60
virtual void inlineMain()
The 'main' function of AbstractP.
TuplePtrQueue TuplePtrQueueLocalS
local queue storage of S, used for manage S window
Definition: SplitJoinJP.h:28
TuplePtrQueue TuplePtrQueueLocalR
local queue storage of R, used for manage R window
Definition: SplitJoinJP.h:32
void setMaxSCnt(size_t ms)
Set the max value of sCnt variable.
Definition: SplitJoinJP.h:72
#define newTuplePtrQueue(n)
To create a new TuplePtrQueue.
Definition: Types.h:236
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10