AllianceDB  0.0.1
AllianceDB is an open-source suite, including benchmarks and libs for evaluating and improving stream operation algorithms on modern hardwares.
VerifyBench.h
1 
2 #ifndef _VERIFYBENCH_H_
3 #define _VERIFYBENCH_H_
4 
5 #include <Common/Types.h>
8 using namespace INTELLI;
9 namespace INTELLI {
23 template<class wsType=AbstractEagerWS>
24 class VerifyBench {
25  public:
35  static bool test(Result &joinResult,
36  RelationCouple &relationCouple,
37  size_t threads = THREAD_NUMBER,
38  size_t windowLen = WINDOW_SIZE,
39  size_t slideLen = WINDOW_SIZE) {
40  //wsType windowSlider to be verified
41  size_t sLen = relationCouple.relationS.size();
42  size_t rLen = relationCouple.relationR.size();
43  wsType windowSlider(sLen, rLen);
44  joinResult.algoName = windowSlider.getName();
45  cout << "verify algo:" + joinResult.algoName + ", threads=" + to_string(threads) + " windowLen="
46  + to_string(windowLen) + ", slideLen=" + to_string(slideLen) << endl;
47  windowSlider.setParallelSMP(threads);
48  // windowSlider.setStopCondition(0, sLen, rLen);
49  windowSlider.setTimeBased(true);
50  windowSlider.setWindowLen(windowLen);
51  windowSlider.setSlideLen(slideLen);
52  windowSlider.setRunTimeScheduling(true);
53  windowSlider.initJoinProcessors();
54 
55  // The verifyws as the reference
56 
57  VerifyWS vfSlider(sLen, rLen);
58  vfSlider.setParallelSMP(threads);
59  vfSlider.setTimeBased(true);
60  vfSlider.setWindowLen(windowLen);
61  vfSlider.setSlideLen(slideLen);
62  vfSlider.setRunTimeScheduling(true);
63  struct timeval timeSys = windowSlider.getSysTime();
64  vfSlider.setSysTime(timeSys);
65  vfSlider.initJoinProcessors();
66  //size_t timeBase=windowSlider.getStartTime();
67  //cout<<"system start at "<<timeBase<<endl;
68 
69  size_t cnt = 0;
70  INTELLI::UtilityFunctions::timerStart(joinResult);
71  while (!relationCouple.relationR.empty() || !relationCouple.relationS.empty()) {
72  size_t timeFeed = UtilityFunctions::timeLastUs(timeSys) / TIME_STEP;
73  if (!relationCouple.relationR.empty()) {
74  INTELLI::TuplePtr tr = relationCouple.relationR.front();
75  //
76  if (timeFeed >= tr->subKey) {
77  // cout<<to_string(timeFeed)+","+ to_string(tr->subKey)<<endl;
78  relationCouple.relationR.pop();
79  windowSlider.feedTupleR(tr);
80  vfSlider.feedTupleR(tr);
81  }
82  }
83  if (!relationCouple.relationS.empty()) {
84  INTELLI::TuplePtr ts = relationCouple.relationS.front();
85  if (timeFeed >= ts->subKey) {
86  relationCouple.relationS.pop();
87  windowSlider.feedTupleS(ts);
88  vfSlider.feedTupleS(ts);
89  }
90 
91  }
92  // cout<<"process tuple"<<cnt<<endl;
93  cnt++;
94  usleep(TIME_STEP / 2);
95  }
96 
97  cout << "end of tuple feeding" << endl;
98  joinResult.joinNumber = 0;
99  windowSlider.terminateJoinProcessors();
100 
101  vfSlider.terminateJoinProcessors();
102  joinResult.joinNumber = windowSlider.getJoinResult();
103  INTELLI::UtilityFunctions::timerEnd(joinResult);
104  if (vfSlider.getJoinResult() == windowSlider.getJoinResult()) {
105  cout << "Congratulations, the result " + to_string(joinResult.joinNumber) + " is correct!" << endl;
106  return true;
107  } else {
108  cout << "Ops, ot matched, expecting " + to_string(vfSlider.getJoinResult()) + "but return "
109  + to_string(windowSlider.getJoinResult()) << endl;
110  return false;
111  }
112  }
113 };
114 }
115 
116 #endif //HYBRID_JOIN_SRC_JOINMETHODS_ABSTRACTJOINMETHOD_H_
void setParallelSMP(size_t threads)
to set the parallel level under SMP model
Definition: AbstractWS.h:128
virtual void feedTupleR(TuplePtr tr)
to feed a tuple R
Definition: AbstractWS.h:178
void setRunTimeScheduling(bool r)
to configure the scheduling place
Definition: AbstractWS.h:93
virtual void feedTupleS(TuplePtr ts)
to feed a tuple s
Definition: AbstractWS.h:168
void setWindowLen(size_t wl)
to set the length of window
Definition: AbstractWS.h:107
void setTimeBased(bool ts)
to configure the window type
Definition: AbstractWS.h:78
void setSysTime(struct timeval tv)
Set the time structure value ,the timeSys member.
Definition: AbstractWS.h:225
void setSlideLen(size_t sli)
set the length of slide
Definition: AbstractWS.h:114
Definition: Types.h:164
Definition: Types.h:210
Definition: VerifyBench.h:24
static bool test(Result &joinResult, RelationCouple &relationCouple, size_t threads=THREAD_NUMBER, size_t windowLen=WINDOW_SIZE, size_t slideLen=WINDOW_SIZE)
Definition: VerifyBench.h:35
The single-thread window slider used for verify results of other WS.
Definition: VerifyWS.h:29
size_t getJoinResult()
to get the result of join
void terminateJoinProcessors()
to terminate the join processors
void initJoinProcessors()
to init the initJoinProcessors
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
Definition: DatasetTool.h:10