00001
#ifndef SJOIN_QBOX_H
00002
#define SJOIN_QBOX_H
00003
00004
#include "JoinQBox.h"
00005
#include "../ConsistencyMngr.h"
00006
#include <fstream>
00007
00008 BOREALIS_NAMESPACE_BEGIN;
00009
00016 class SJoinQBox :
public JoinQBox
00017 {
00018
00019
public:
00020
~SJoinQBox();
00021
00022
protected:
00023
void setupImpl()
throw (
AuroraException);
00024
void initImpl()
throw (
AuroraException);
00025
void runImpl(
QBoxInvocation&)
throw (
AuroraException);
00026
00027
private:
00028
00029
00030
00031
struct StateBound {
00032 int32 _left;
00033 int32 _right;
00034 StateBound(int32 left = -1, int32 right = -1)
00035 : _left(left), _right(right) {}
00036 operator const void *()
const {
00037
return ((_left == -1) || (_right == -1)) ? 0 :
this;
00038 }
00039 };
00040
00041
static const int MAX_HISTORY = 101001000;
00042
static const int PAGE_SIZE = 16384;
00043
00044
static const int TWO_INPUTS = 2;
00045
00046
void handleInputTuple(
const Tuple& tuple,
int stream_id,
bool& outputted);
00047
00048
00049
void handlePunctuation(
const Tuple& left_tuple,
const Tuple& right_tuple);
00050
void passThrough(
const Tuple& tuple);
00051
Timestamp findMinTimestamp(
int stream_id);
00052 int32 findOldestTupleId(
int stream_id);
00053
void emitPunctuationTuple(
Timestamp min_timestamp);
00054
00055
00056
void handleUndo();
00057
void scanForUndo(
int stream_id,
bool& has_undo, int32& undo_id);
00058
void dequeueUntilUndo(
int stream_id, int32 undo_tuple_id);
00059
void checkIterator(
int stream_id);
00060
void undoState(
bool has_undo[], int32 undo_tuple_id[]);
00061 StateBound findBound(
TupleQueue::RSeekIterator ri[],
bool has_undo[], int32 undo_tuple_ids[]);
00062
int rollback(
TupleQueue::RSeekIterator ri[], int32 bound_ids[]);
00063
void clearState();
00064
void rollforward(
TupleQueue::RSeekIterator ri[],
int counter);
00065
00067
void emitTuple(
const Tuple& undo_tuple, TupleType type);
00068
00070
void emitUndoRedoTuples(
const Tuple& tuple);
00071
00072
00073
void storeTupleForUndo(
const Tuple& tuple,
int stream_id);
00074
void saveStateBound(
const Tuple& tuple,
int stream_id);
00075
void cutHistory(
int stream_id);
00076
void processTuple(
const Tuple& tuple,
int stream_id,
bool& outputted);
00077
00078
int _output_tuple_size;
00079
int _input_tuple_size[TWO_INPUTS];
00080 ptr<TupleQueue> _input_history[TWO_INPUTS];
00081
int _max_history;
00082 ptr<PagePool> _pool;
00083
00084
bool _punctuation[TWO_INPUTS];
00085
Timestamp _punctuation_time[TWO_INPUTS];
00086
00087
00088 size_t _buffer_size[TWO_INPUTS];
00089
00090
typedef map< int32, StateBound > TupleStateBound;
00091 TupleStateBound _state_bounds;
00092
00093
00094
int _state_bound_interval;
00095
Timestamp _last_state_bound_time;
00096
MicroBenchmarkType _benchmark;
00097
00098
AURORA_DECLARE_QBOX(
SJoinQBox,
"sjoin");
00099
00100
00101 ofstream _log_file;
00102 vector<long long> _measurements;
00103 };
00104
00105
00106 BOREALIS_NAMESPACE_END;
00107
00108
#endif // SJOIN_QBOX_H