Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

SUnionQBox.h

Go to the documentation of this file.
00001 #ifndef SUNION_QBOX_H 00002 #define SUNION_QBOX_H 00003 00004 #include "UnionQBox.h" 00005 #include "JoinQBox.h" 00006 #include <vector> 00007 #include "SControlQBox.h" 00008 00009 BOREALIS_NAMESPACE_BEGIN; 00010 00011 // -------------------------------------------------- 00012 // Just a simple buffer for tuples. Grows automatically. XXX should add iterators. 00013 class TuplesBuffer { 00014 public: 00015 static const int DEFAULT_SIZE = 1000; 00016 TuplesBuffer(size_t tuple_size); 00017 void insertTuple(const Tuple& data); 00018 int nbTuples() { return _cur_nb_tuples; } 00019 const Tuple tupleAt(int i); 00020 // Deletes all tuples from this buffer 00021 void clear(); 00022 // Deletes all tuples following the one with given tuple id 00023 void clearAfter(int tuple_id); 00024 // Buffer is unstable if earliest tuple is unstable. It is stable otherwise 00025 bool isStableBuffer(); 00026 private: 00027 size_t _tuple_size; 00028 int _cur_nb_tuples; 00029 int _max_nb_tuples; 00030 ptr<dynbuf> _tuples; 00031 }; 00032 00033 00034 00035 // -------------------------------------------------- 00042 class SUnionQBox : public SControlQBox 00043 { 00044 00045 friend class SUninBoxState; 00046 00047 public: 00048 00050 static TupleDescription createControlOutputDescription(); 00051 00052 protected: 00053 virtual void setupImpl() throw (AuroraException); 00054 virtual void initImpl() throw (AuroraException); 00055 virtual void runImpl(QBoxInvocation&) throw (AuroraException); 00056 void pureUnionRunImpl(QBoxInvocation&) throw (AuroraException); 00057 00058 // packing and unpacking states 00059 void setPendingBoxState(ptr<AbstractBoxState> packed_box); 00060 bool isStableForCheckpoint(); 00061 ptr<AbstractBoxState> packState(); 00062 void unpackState(ptr<AbstractBoxState> box_state); 00063 00064 private: 00065 00066 // XXX No good reason why this specific value, we just want to avoid hogging the CPU too long 00067 static const int MAX_BUCKETS_PER_ITERATION = 5; 00068 00069 // We block for _max_delay = ALPHA_BLOCK * max-delay 00070 static const double MAX_ALPHA = 0.9; // For regular blocking 00071 //static const double MAX_ALPHA = 1.0; // For benefits1 benchmark 00072 00073 // Once we are in Unstable state, then we only block for a very short time 00074 static const double MIN_ALPHA = 0.1; 00075 //static const double MIN_ALPHA = 1.0; // For benefits1b benchmark 00076 00077 // There is a race condition between checkpoints and info about unstable state, so we 00078 // erase buckets only after some slack 00079 static const int SLACK = 10; 00080 00081 class Bucket; 00082 00084 void setupParameters(); 00085 00087 void handleInputTuple(const Tuple& tuple, int stream_id); 00088 00090 void applyUndo(const Tuple& tuple, int stream_id, long long interval_start); 00091 00093 void undoRedo(long long interval_start); 00094 00096 //void produceRedoTuples(long long interval_start); 00097 00099 //void completedRecovery(); 00100 00102 void insertTupleIntoBucket(const Tuple& tuple, int stream_id, long long interval_start); 00103 00105 bool forwardWithoutWaiting(); 00106 00108 void produceOutput(); 00109 bool processOutputStableBucket(Bucket& bucket, long long interval_nb); 00110 bool processOutputUnStableBucket(Bucket& bucket, long long interval_nb); 00111 00113 void producePunctuation(Timestamp punctuation_time); 00114 00116 void checkpoint(); 00117 00119 //void notifyOutputs(); 00120 00122 void emitTuples(Bucket& bucket, TupleType tuple_type); 00123 00125 //void emitTuple(const Tuple& tuple); 00126 00128 //void emitControlTuple(int stream_id, TupleType type, Timestamp data); 00129 00131 void emitUndoTuple(const Tuple& tuple); 00132 00133 00135 long long timestampToIntervalNb(Timestamp timestamp); 00136 00138 Timestamp intervalNbToTimestamp(long long interval_nb); 00139 00140 static const long MILLION = 1000000; 00141 static const long THOUSAND = 1000; 00142 00144 unsigned int _numberOfInputs; 00145 00147 int _interval; // In msecs (size of buckets) 00148 00150 int _max_delay; // In msecs (max time we can wait for input tuples 00151 double _actual_delay; // In msecs (time we are actually going to wait for) 00152 bool _stagger; // Should we accelerate 00153 00154 // The operator is a state machine 00155 struct State { 00156 public: 00158 State () : _state_value(STABLE_STATE), _unstable_start(Timestamp::now()), 00159 _unstable_interval_nb(0) {} 00160 00162 bool isStable() { return _state_value == STABLE_STATE; } 00163 00165 bool isUnstable() { return _state_value == UNSTABLE_STATE; } 00166 00168 bool canProcessUnstable(); 00169 00171 void processingUnstableBucket(Timestamp unstable_start, long long interval_nb); 00172 00173 void undoing(); 00174 00176 void newStableBucket(); 00177 00179 bool somethingToCorrect(); 00180 Timestamp correctSinceWhen(); // Time of beginning of unstability 00181 long long firstUnstableInterval(); // First unstable bucket 00182 00184 void reconciling(); 00185 bool isReconciling(); 00186 void reconcilingRollforward(); 00187 bool isReconcilingForward(); 00188 00190 void reconciled(); 00191 00193 void noReconciliation(); 00194 00195 private: 00196 static const int STABLE_STATE = 0; // Produce tuples in their final form 00197 static const int UNSTABLE_STATE = 1; // Produce tuples to avoid delaying 00198 static const int UNDONE_STATE = 2; // In the process of undoing/redoing buffer content 00199 static const int CONVERGED_STATE = 3; // Produce correct tuple but left-over incorrect tuples remain 00200 static const int RECONCILING_STATE_ROLLBACK = 4; // Correcting previous state (rolling back) 00201 static const int RECONCILING_STATE_ROLLFORWARD = 5; // Correcting previous state (rolling forward) 00202 00203 int _state_value; 00204 Timestamp _unstable_start; 00205 long long _unstable_interval_nb; 00206 00207 }; 00208 State _state; 00209 00210 // A bucket associates windows of tuples received with their input stream (identified by port nb) 00211 typedef map<int, ptr<TuplesBuffer> > TuplesBufferSet; 00212 struct Bucket { 00213 00214 Bucket(int nb_buffers, int tuple_size); 00215 00216 TuplesBufferSet _buffers; 00217 00218 bool _contains_unstable_tuples; 00219 map<int, bool> _punctuations; 00220 Timestamp _creation_time; 00221 00222 Timestamp getCreationTime(); 00223 void clear(); 00224 bool punctuated(); 00225 // Exhaustively checks all buffers for unstable tuples 00226 void resetStability(); 00227 }; 00228 00229 // There is one bucket per time period. Periods (or intervals) are in msecs, same unit as _interval 00230 typedef map<long long, ptr<Bucket> > Buckets; 00231 00232 // Set of buckets 00233 Buckets _buckets; 00234 long long _current_interval_nb; 00235 00237 int32 _last_stable_tuple_id; 00238 00239 // Checkpointed state 00240 long long _checkpointed_interval_nb; 00241 00242 // Last time operator emmitted punctuation tuple 00243 Timestamp _last_punctuation_time; 00244 00245 // The benchmark that we are running 00246 MicroBenchmarkType _benchmark; 00247 00248 bool _one_iteration_after_queues_cleared; 00249 00250 // If there's a single input stream, should we sort or not? 00251 bool _sort; 00252 00254 bool _should_reconcile; 00255 00256 AURORA_DECLARE_QBOX(SUnionQBox, "sunion"); 00257 }; 00258 00264 class SUnionBoxState : public AbstractBoxState 00265 { 00266 public: 00267 SUnionBoxState() : _interval_nb(INIT) {} 00268 SUnionBoxState(long long interval_nb) : _interval_nb(interval_nb) {} 00269 ~SUnionBoxState() {} 00270 00271 long long getIntervalNb() { return _interval_nb; } 00272 NMSTL_SERIAL_SUBCLASS(SUnionBoxState, AbstractBoxState, << _interval_nb); 00273 00274 private: 00275 long long _interval_nb; 00276 }; 00277 00278 BOREALIS_NAMESPACE_END; 00279 00280 #endif

Generated on Fri Nov 12 15:15:22 2004 for Borealis by doxygen 1.3.8