Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

SJoinQBox.h

Go to the documentation of this file.
00001 #ifndef SJOIN_QBOX_H 00002 #define SJOIN_QBOX_H 00003 00004 #include "JoinQBox.h" 00005 #include "../ConsistencyMngr.h" // For benchmark types 00006 #include <fstream> 00007 00008 BOREALIS_NAMESPACE_BEGIN; 00009 00016 class SJoinQBox : public JoinQBox 00017 { 00018 00019 public: 00020 ~SJoinQBox(); // So we can log the stats in a file 00021 00022 protected: 00023 void setupImpl() throw (AuroraException); 00024 void initImpl() throw (AuroraException); 00025 void runImpl(QBoxInvocation&) throw (AuroraException); 00026 00027 private: 00028 00029 // Using a struct instead of a pair so we can do tests like 00030 // if ( _state_bounds[tuple_id] ) 00031 struct StateBound { 00032 int32 _left; 00033 int32 _right; 00034 StateBound(int32 left = -1, int32 right = -1) 00035 : _left(left), _right(right) {} 00036 operator const void *() const { 00037 return ((_left == -1) || (_right == -1)) ? 0 : this; 00038 } 00039 }; 00040 00041 static const int MAX_HISTORY = 101001000; // In tuples 00042 static const int PAGE_SIZE = 16384; // Arbitrary small value 00043 00044 static const int TWO_INPUTS = 2; // There are 2 input streams JoinQBox::LEFT && RIGHT 00045 00046 void handleInputTuple(const Tuple& tuple, int stream_id, bool& outputted); 00047 00048 // Punctuation 00049 void handlePunctuation(const Tuple& left_tuple, const Tuple& right_tuple); 00050 void passThrough(const Tuple& tuple); 00051 Timestamp findMinTimestamp(int stream_id); 00052 int32 findOldestTupleId(int stream_id); 00053 void emitPunctuationTuple(Timestamp min_timestamp); 00054 00055 // Undo-redo recovery 00056 void handleUndo(); 00057 void scanForUndo(int stream_id, bool& has_undo, int32& undo_id); 00058 void dequeueUntilUndo(int stream_id, int32 undo_tuple_id); 00059 void checkIterator(int stream_id); 00060 void undoState(bool has_undo[], int32 undo_tuple_id[]); 00061 StateBound findBound(TupleQueue::RSeekIterator ri[], bool has_undo[], int32 undo_tuple_ids[]); 00062 int rollback(TupleQueue::RSeekIterator ri[], int32 bound_ids[]); 00063 void clearState(); 00064 void rollforward(TupleQueue::RSeekIterator ri[], int counter); 00065 00067 void emitTuple(const Tuple& undo_tuple, TupleType type); 00068 00070 void emitUndoRedoTuples(const Tuple& tuple); 00071 00072 // Normal operation 00073 void storeTupleForUndo(const Tuple& tuple, int stream_id); 00074 void saveStateBound(const Tuple& tuple, int stream_id); 00075 void cutHistory(int stream_id); 00076 void processTuple(const Tuple& tuple, int stream_id, bool& outputted); 00077 00078 int _output_tuple_size; 00079 int _input_tuple_size[TWO_INPUTS]; 00080 ptr<TupleQueue> _input_history[TWO_INPUTS]; 00081 int _max_history; // Maximum undo/redo size 00082 ptr<PagePool> _pool; // Small page pool because TupleQueues use page pools 00083 00084 bool _punctuation[TWO_INPUTS]; // We wait to receive both punctuations before producing one ourselves 00085 Timestamp _punctuation_time[TWO_INPUTS]; // Timestamp received as punctuation 00086 00087 // Information about the state of the join 00088 size_t _buffer_size[TWO_INPUTS]; 00089 00090 typedef map< int32, StateBound > TupleStateBound; 00091 TupleStateBound _state_bounds; 00092 //int32 _oldest_id[TWO_INPUTS]; 00093 00094 int _state_bound_interval; 00095 Timestamp _last_state_bound_time; 00096 MicroBenchmarkType _benchmark; 00097 00098 AURORA_DECLARE_QBOX(SJoinQBox, "sjoin"); 00099 00100 // Temporary for experiments 00101 ofstream _log_file; 00102 vector<long long> _measurements; //Sorted list of measurements 00103 }; 00104 00105 00106 BOREALIS_NAMESPACE_END; 00107 00108 #endif // SJOIN_QBOX_H

Generated on Fri Nov 12 15:15:22 2004 for Borealis by doxygen 1.3.8