6 #ifndef _JOINPROCESSOR_ABSTRACTJP_H_
7 #define _JOINPROCESSOR_ABSTRACTJP_H_
13 #include <JoinAlgo/JoinAlgoTable.h>
43 CmdQueuePtr cmdQueueIn;
44 CmdQueuePtr cmdQueueOut;
47 bool timeBased =
false;
49 size_t windowLenGlobal = 0;
50 size_t slideLenGlobal = 0;
51 size_t joinedResult = 0;
52 JoinAlgoTablePtr myAlgo;
54 void sendResponseCmd(join_cmd_t cmd) {
55 cmdQueueOut->push(cmd);
58 sendResponseCmd(CMD_ACK);
66 if (!cmdQueueIn->empty()) {
67 join_cmd_t cmdIn = *cmdQueueIn->front();
88 if (ts < windowLenGlobal) {
91 return ((ts - windowLenGlobal) / slideLenGlobal) + 1;
101 cmdQueueIn->push(cmd);
108 while (cmdQueueOut->empty()) {}
109 join_cmd_t ru = *cmdQueueOut->front();
119 virtual void init(
size_t sLen,
size_t rLen,
size_t _sysId) {
122 cmdQueueIn = newCmdQueue(1);
123 cmdQueueOut = newCmdQueue(1);
125 myAlgo = newJoinAlgoTable();
154 windowLenGlobal = wlen;
155 slideLenGlobal = sli;
162 TuplePtrQueueInS->push(ts);
169 TuplePtrQueueInR->push(tr);
183 return UtilityFunctions::timeLastUs(
timeSys) / TIME_STEP;
The base class and abstraction of C++20 thread, and it can be derived into other threads.
Definition: AbstractC20Thread.h:28
The basic class of join processor.
Definition: AbstractJP.h:38
void setCore(int id)
bind to specific core
Definition: AbstractJP.h:197
void inputCmd(join_cmd_t cmd)
input an outside command
Definition: AbstractJP.h:100
virtual void feedTupleR(TuplePtr tr)
feed a tuple r into the r input queue
Definition: AbstractJP.h:168
size_t getJoinedResult()
get the join results
Definition: AbstractJP.h:189
void setTimeVal(struct timeval tv)
set the timeval struct
Definition: AbstractJP.h:175
virtual void feedTupleS(TuplePtr ts)
feed a tuple s into the s input queue
Definition: AbstractJP.h:161
bool isTimeBased()
to read the window type
Definition: AbstractJP.h:138
virtual void init(size_t sLen, size_t rLen, size_t _sysId)
init the join processor with buffer/queue length and id
Definition: AbstractJP.h:119
void setWindowLen(size_t wl)
to set the length of window
Definition: AbstractJP.h:145
bool testCmd(join_cmd_t cmd)
To test if a cmd is remained in queue and in desired cmd.
Definition: AbstractJP.h:65
size_t getTimeStamp()
Get the time stamp.
Definition: AbstractJP.h:182
void setTimeBased(bool ts)
to configure the window type
Definition: AbstractJP.h:131
void setGlobalWindow(size_t wlen, size_t sli)
set the window parameters of global window
Definition: AbstractJP.h:153
join_cmd_t waitResponse()
wait and return the response of this join processor
Definition: AbstractJP.h:107
size_t oldestWindowBelong(size_t ts)
To get the possible oldest window a time stamp belongs to.
Definition: AbstractJP.h:87
virtual void inlineMain()
The 'main' function of AbstractP.
Definition: AbstractJP.h:79
struct timeval timeSys
Definition: AbstractJP.h:46
#define newTuplePtrQueue(n)
To create a new TuplePtrQueue.
Definition: Types.h:236
std::shared_ptr< class Tuple > TuplePtr
The class to describe a shared pointer to Tuple.
Definition: Types.h:150
std::shared_ptr< INTELLI::SPSCQueue< INTELLI::TuplePtr > > TuplePtrQueue
To describe a queue of TuplePtr under SPSCQueue.
Definition: Types.h:228
Definition: DatasetTool.h:10