AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
AbstractJoinMethod.h
1 
2 #ifndef JOINMETHODS_ABSTRACTJOINMETHOD_H_
3 #define JOINMETHODS_ABSTRACTJOINMETHOD_H_
4 
5 #include "Common/Types.h"
7 using namespace INTELLI;
8 namespace INTELLI {
9 template<class wsType=AbstractEagerWS>
11  public:
12  static void test(Result &joinResult, RelationCouple &relationCouple) {
13  //wsType windowSlider;
14  size_t sLen = relationCouple.relationS.size();
15  size_t rLen = relationCouple.relationR.size();
16  wsType windowSlider(sLen, rLen);
17  joinResult.algoName = windowSlider.getName();
18  windowSlider.setParallelSMP(THREAD_NUMBER);
19  // windowSlider.setStopCondition(0, sLen, rLen);
20  windowSlider.setTimeBased(true);
21  windowSlider.setWindowLen(WINDOW_SIZE);
22  windowSlider.setSlideLen(WINDOW_SIZE);
23  windowSlider.setRunTimeScheduling(true);
24  windowSlider.initJoinProcessors();
25  //size_t timeBase=windowSlider.getStartTime();
26  //cout<<"system start at "<<timeBase<<endl;
27  struct timeval timeSys = windowSlider.getSysTime();
28  size_t cnt = 0;
29  INTELLI::UtilityFunctions::timerStart(joinResult);
30  while (!relationCouple.relationR.empty() || !relationCouple.relationS.empty()) {
31  size_t timeFeed = UtilityFunctions::timeLastUs(timeSys) / TIME_STEP;
32  if (!relationCouple.relationR.empty()) {
33  INTELLI::TuplePtr tr = relationCouple.relationR.front();
34  if (timeFeed >= tr->subKey) {
35  // cout<<"feed r:"<<timeSys<<endl;
36  relationCouple.relationR.pop();
37  windowSlider.feedTupleR(tr);
38  }
39  }
40  if (!relationCouple.relationS.empty()) {
41  INTELLI::TuplePtr ts = relationCouple.relationS.front();
42  if (timeFeed >= ts->subKey) {
43  relationCouple.relationS.pop();
44  windowSlider.feedTupleS(ts);
45  }
46 
47  }
48  // cout<<"process tuple"<<cnt<<endl;
49  cnt++;
50  usleep(TIME_STEP / 2);
51  }
52  cout << "end of tuple feeding" << endl;
53  joinResult.joinNumber = 0;
54  windowSlider.terminateJoinProcessors();
55  joinResult.joinNumber = windowSlider.getJoinResult();
56  INTELLI::UtilityFunctions::timerEnd(joinResult);
57  }
58 };
59 }
60 
61 #endif //HYBRID_JOIN_SRC_JOINMETHODS_ABSTRACTJOINMETHOD_H_
Definition: AbstractJoinMethod.h:10
Definition: Types.h:164
Definition: Types.h:210
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
Definition: DatasetTool.h:10