AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Modules Pages
SplitJoinJP.h
Go to the documentation of this file.
1 
2 //
3 // Created by tony on 18/03/22.
4 //
5 
6 #ifndef _JOINPROCESSOR_SPLITJOINJP_H_
7 #define _JOINPROCESSOR_SPLITJOINJP_H_
8 #include <thread>
9 #include <Common/Types.h>
11 #include <barrier>
12 
13 #include <JoinProcessor/AbstractJP.h>
14 using namespace INTELLI;
15 using namespace std;
16 namespace INTELLI {
22 class SplitJoinJP : public AbstractJP {
23  protected:
24  virtual void inlineMain();
33  /*
34  * @brief A 'current window' copy of TuplePtrQueueLocalS when join a tuple
35  */
36  //C20Buffer<TuplePtr> windowS;
37  /*
38  * @brief A 'current window' copy of TuplePtrQueueLocalR when join a tuple
39  */
40  // C20Buffer<TuplePtr> windowR;
41  size_t sCnt = 0;
42  size_t sMax = 0;
43  void expireS(size_t cond);
44  void expireR(size_t cond);
45  void joinS(TuplePtr ts);
46  void joinR(TuplePtr tr);
47  public:
48  SplitJoinJP() {
49 
50  }
51  ~SplitJoinJP() {
52 
53  }
60  void init(size_t sLen, size_t rLen, size_t _sysId) {
61  AbstractJP::init(sLen, rLen, _sysId);
62  TuplePtrQueueLocalS = newTuplePtrQueue(sLen);
63  TuplePtrQueueLocalR = newTuplePtrQueue(rLen);
64  //windowS = C20Buffer<TuplePtr>(sLen);
65  // windowR = C20Buffer<TuplePtr>(rLen);
66  sCnt = 0;
67  }
72  void setMaxSCnt(size_t ms) {
73  sMax = ms;
74  }
75  /*
76  * @brief feed a tuple s into the s input queue
77  * @param ts The tuple
78  *//*
79  virtual void feedTupleS(TuplePtr ts) {
80  sCnt++;
81  if (sCnt == sysId + 1) //should process this S
82  {
83  TuplePtrQueueInS->push(ts);
84  }
85  if (sCnt == sMax) {
86  sCnt = 0;
87  }
88  }*/
89 };
90 typedef std::shared_ptr<SplitJoinJP> SplitJoinJPPtr;
91 }
92 #endif //ALIANCEDB_INCLUDE_JOINPROCESSOR_SPLITJP_H_
The basic class of join processor.
Definition: AbstractJP.h:38
virtual void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: AbstractJP.h:119
The class of split join join processor.
Definition: SplitJoinJP.h:22
void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: SplitJoinJP.h:60
virtual void inlineMain()
The 'main' function of AbstractP.
TuplePtrQueue TuplePtrQueueLocalS
local queue storage of S, used for manage S window
Definition: SplitJoinJP.h:28
TuplePtrQueue TuplePtrQueueLocalR
local queue storage of R, used for manage R window
Definition: SplitJoinJP.h:32
void setMaxSCnt(size_t ms)
Set the max value of sCnt variable.
Definition: SplitJoinJP.h:72
#define newTuplePtrQueue(n)
To create a new TuplePtrQueue.
Definition: Types.h:236
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10