AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
HandShakeJP.h
1 //
2 // Created by tony on 2022/2/9.
3 //
4 #pragma once
5 #ifndef JOINPROCESSOR_HANDSHAKJP_H_
6 #define JOINPROCESSOR_HANDSHAKEJP_H_
8 #include <memory>
9 
10 using namespace std;
11 using namespace INTELLI;
12 namespace INTELLI {
13 class HandShakeJP;
14 typedef std::shared_ptr<HandShakeJP> HandShakeJPPtr;
15 /*class:HandShakeHashJP
16 description: join processor for handshake hash join
17 note: S->, R<-
18 date:20220228
19 */
20 class HandShakeJP : public AbstractJP {
21  protected:
22  /* data */
23  HandShakeJPPtr leftJP = nullptr;
24  HandShakeJPPtr rightJP = nullptr;
33 
37  CmdQueuePtr sRecvAck;
42 
43  BarrierPtr initBar = nullptr;
44  // TuplePtrQueue selfWindowS, selfWindowR;
45  size_t countR = 0, countS = 0;
46  size_t timeOffsetS, timeOffsetR;
47  size_t rQueue = 0, sQueue = 0;
48  void setupQueue();
49  void expireS(size_t cond);
50  void expireR(size_t cond);
51  virtual void inlineMain();
52  public:
53  void setLeft(HandShakeJPPtr l) {
54  leftJP = l;
55  }
56  void setRight(HandShakeJPPtr r) {
57  rightJP = r;
58  }
59 
60  HandShakeJP(/* args */) {
61 
62  }
63  ~HandShakeJP() {
64 
65  }
66  void setTimeOffset(size_t ts, size_t tr) {
67  timeOffsetS = ts;
68  timeOffsetR = tr;
69  }
70  void paraseTupleS();
71  void paraseTupleR();
72  void setNeighborJP(HandShakeJPPtr l, HandShakeJPPtr r) {
73  leftJP = l;
74  rightJP = r;
75  }
76  void init(size_t sLen, size_t rLen, size_t _sysId) {
77  rQueue = rLen;
78  sQueue = sLen;
79  joinedResult = 0;
80  sysId = _sysId;
81  }
82 
83  // co
88  void setInitBar(BarrierPtr barPrev) {
89  initBar = barPrev;
90  }
94  void waitInitBar(void) {
95  if (initBar) {
96  initBar->arrive_and_wait();
97  }
98  }
102  void indicateSRecv(void) {
103  sRecvAck->push(CMD_ACK);
104  }
105 };
106 
107 }
108 #endif //HYBRID_JOIN_INCLUDE_JOINPROCESSOR_HANDSHAKEHASHJP_H_
The basic class of join processor.
Definition: AbstractJP.h:38
Definition: HandShakeJP.h:20
TuplePtrQueue TuplePtrQueueForwardS
forward queue of S
Definition: HandShakeJP.h:32
void indicateSRecv(void)
To indicate that a tuple S has been received.
Definition: HandShakeJP.h:102
void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: HandShakeJP.h:76
TuplePtrQueue TuplePtrQueueLocalR
local queue storage of R, used for manage R window
Definition: HandShakeJP.h:41
CmdQueuePtr sRecvAck
The command queue for ack of S.
Definition: HandShakeJP.h:37
void setInitBar(BarrierPtr barPrev)
Set up the init barrier.
Definition: HandShakeJP.h:88
void waitInitBar(void)
Wait for the init barrier done and then contine.
Definition: HandShakeJP.h:94
TuplePtrQueue TuplePtrQueueLocalS
local queue storage of S, used for manage S window
Definition: HandShakeJP.h:28
virtual void inlineMain()
The 'main' function of AbstractP.
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10