Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

ConsistencyMngr.h

Go to the documentation of this file.
00001 #ifndef BOREALIS_CONSISTENCYMNGR_H 00002 #define BOREALIS_CONSISTENCYMNGR_H 00003 00004 #include "common.h" 00005 #include "Query.h" 00006 #include "StreamEvent.h" 00007 #include "Timestamp.h" 00008 #include "Tuple.h" 00009 00010 #include <NMSTL/tqueuerpc> 00011 #include <NMSTL/tcprpc> 00012 #include <NMSTL/xmlrpc> 00013 #include <fstream> 00014 00015 BOREALIS_NAMESPACE_BEGIN; 00016 00017 class QueryProcessor; 00018 00019 // Type of microbenchmark that we are running 00020 class MicroBenchmarkType { 00021 00022 public: 00023 00024 enum Type { 00025 // Micro benchmarking the operator (one item at the time) 00026 NONE = 0, // SJoin without anything else (like an ordinary join) 00027 TUPLE_TYPES, // SJoin that looks at the message types before processing them 00028 UNDO_LOG, // SJoin that looks keeps an undo log of variable size 00029 CHECKPOINT, // SJoin interrupted by checkpoints 00030 00031 // Micro benchmarking the sunion/soutput operators 00032 SUNION, // Just add an sunion in front of an sjoin 00033 SUNION_PUNCTUATION, // Just sunion and punctuation 00034 SOUTPUT, // Just add an soutput at the end 00035 00036 BLOCK, // Never process any unstable bucket 00037 00038 // Micro benchmarking the approach as a whole but don't move subscriptions 00039 ALL_TOGETHER_UNDO_REDO_NO_MOVE, 00040 ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE, 00041 00042 // Micro benchmarking the approach as a whole 00043 ALL_TOGETHER_UNDO_REDO, 00044 ALL_TOGETHER_CHECKPOINT_REDO, 00045 00046 // Finally, the complete adaptive algorithm 00047 ADAPTIVE 00048 00049 00050 }; 00051 00052 bool doKeepUndoLog() 00053 { return ((_type == UNDO_LOG) 00054 || (_type == ALL_TOGETHER_UNDO_REDO_NO_MOVE) 00055 || (_type == ALL_TOGETHER_UNDO_REDO) 00056 || (_type == ADAPTIVE)); 00057 } 00058 00059 bool doSerialize() 00060 { return ((_type == SUNION) || (_type == SUNION_PUNCTUATION) || (_type == BLOCK) 00061 || (_type == ALL_TOGETHER_UNDO_REDO_NO_MOVE) || (_type == ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE) 00062 || (_type == ALL_TOGETHER_UNDO_REDO) || (_type == ALL_TOGETHER_CHECKPOINT_REDO) 00063 || (_type == ADAPTIVE)); 00064 } 00065 00066 bool doPunctuation() 00067 { return ((_type == SUNION_PUNCTUATION) || (_type == BLOCK) 00068 || (_type == ALL_TOGETHER_UNDO_REDO_NO_MOVE) || (_type == ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE) 00069 || (_type == ALL_TOGETHER_UNDO_REDO) || (_type == ALL_TOGETHER_CHECKPOINT_REDO) 00070 || (_type == ADAPTIVE)); 00071 00072 } 00073 00074 bool doCheckpoint() 00075 { return ((_type == CHECKPOINT) 00076 || (_type == ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE) 00077 || (_type == ALL_TOGETHER_CHECKPOINT_REDO) 00078 || (_type == ADAPTIVE) 00079 ); 00080 } 00081 00082 bool doBlock() 00083 { return (_type == BLOCK); 00084 } 00085 00086 bool doForceMoveSubscriptions() 00087 { return ( (_type == ALL_TOGETHER_UNDO_REDO) || (_type == ALL_TOGETHER_CHECKPOINT_REDO)); 00088 } 00089 00090 bool doUndoReconciliation() 00091 { return ((_type == ALL_TOGETHER_UNDO_REDO_NO_MOVE) || (_type == ALL_TOGETHER_UNDO_REDO)); 00092 } 00093 00094 bool doAdaptive() 00095 { return ( _type == ADAPTIVE); 00096 } 00097 00098 MicroBenchmarkType(Type t = ALL_TOGETHER_UNDO_REDO) 00099 : _type(t) 00100 { 00101 } 00102 00103 MicroBenchmarkType(string type) 00104 { 00105 if ( type == "none" || type == "NONE" ) 00106 _type = NONE; 00107 00108 else if ( type == "tuple_types" || type == "TUPLE_TYPES" ) 00109 _type = TUPLE_TYPES; 00110 00111 else if ( type == "undo_log" || type == "UNDO_LOG" ) 00112 _type = UNDO_LOG; 00113 00114 else if ( type == "checkpoint" || type == "CHECKPOINT" ) 00115 _type = CHECKPOINT; 00116 00117 else if ( type == "sunion" || type == "SUNION" ) 00118 _type = SUNION; 00119 00120 else if ( type == "sunion_punctuation" || type == "SUNION_PUNCTUATION" ) 00121 _type = SUNION_PUNCTUATION; 00122 00123 else if ( type == "soutput" || type == "SOUTPUT" ) 00124 _type = SOUTPUT; 00125 00126 else if ( type == "block" || type == "BLOCK" ) 00127 _type = BLOCK; 00128 00129 else if ( type == "all_together_checkpoint_redo" || type == "ALL_TOGETHER_CHECKPOINT_REDO" ) 00130 _type = ALL_TOGETHER_CHECKPOINT_REDO; 00131 00132 else if ( type == "all_together_checkpoint_redo_no_move" || type == "ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE" ) 00133 _type = ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE; 00134 00135 else if ( type == "all_together_undo_redo" || type == "ALL_TOGETHER_UNDO_REDO" ) 00136 _type = ALL_TOGETHER_UNDO_REDO; 00137 00138 else if ( type == "all_together_undo_redo_no_move" || type == "ALL_TOGETHER_UNDO_REDO_NO_MOVE" ) 00139 _type = ALL_TOGETHER_UNDO_REDO_NO_MOVE; 00140 00141 else _type = ADAPTIVE; 00142 00143 } 00144 00145 string as_string( ) const { 00146 switch (_type) { 00147 case NONE: return string("none"); 00148 case TUPLE_TYPES : return string("tuple_types"); 00149 case UNDO_LOG: return string("undo_log"); 00150 case CHECKPOINT: return string("checkpoint"); 00151 case SUNION: return string("sunion"); 00152 case SUNION_PUNCTUATION: return string("sunion_punctuation"); 00153 case SOUTPUT: return string("soutput"); 00154 case BLOCK: return string("block"); 00155 case ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE: return string("all_together_checkpoint_redo_no_move"); 00156 case ALL_TOGETHER_CHECKPOINT_REDO: return string("all_together_checkpoint_redo"); 00157 case ALL_TOGETHER_UNDO_REDO_NO_MOVE : return string("all_together_undo_redo_no_move"); 00158 case ALL_TOGETHER_UNDO_REDO : return string("all_together_undo_redo"); 00159 case ADAPTIVE : return string("adaptive"); 00160 default: return string("adaptive"); 00161 } 00162 } 00163 00164 Type _type; 00165 00166 }; 00167 00168 NMSTL_TO_STRING(MicroBenchmarkType); 00169 00170 00177 class ConsistencyMngr { 00178 public: 00179 00185 ConsistencyMngr(QueryProcessor& qp); 00186 00190 ~ConsistencyMngr(); 00191 00196 void checkForControlStreams(Query& q); 00197 00198 void set_replicas(vector<MedusaID> replicas); 00199 void add_subscription(Subscription sub); 00200 void remove_subscription(Subscription sub); 00201 00202 private: 00203 00205 Name findControlStreamName(const Box::StreamBindings& outputs); 00206 00208 void monitorControlStream(Name control_stream_name); 00209 00211 Status handleControlTuples(ptr<StreamEvent> event); 00212 00214 void sendControlTuple(Name output_stream_name, TupleType type, Timestamp since); 00215 00216 void startCheckpointing(); 00217 00219 void checkpoint(); 00220 00221 void launchReconciliation(Name control_stream, Timestamp time); 00222 void reconcile(Name control_stream, Timestamp time); 00223 void finishReconciliation(); 00224 00226 void updateSubHistory(); 00227 void moveSubscriptionsIfNeeded(); 00228 void releaseDataPathIfNeeded(); 00229 void moveSubscriptionsAway(); 00230 void moveSubscriptionsAway2(MedusaID replica, RPC< vector<Subscription> > result); 00231 00232 void moveSubscriptionsBack(); 00233 void moveSubscriptionsBack2(RPC< vector<Subscription> > result); 00234 00236 QueryProcessor& _qp; 00237 00239 vector<MedusaID> _replicas; 00240 MedusaID _sub_temp_location; 00241 00243 vector<Subscription> _subs; 00244 00250 typedef map< Name, pair<Name, int> > IOControl; 00251 IOControl _io_control; 00252 00254 bool _reconciliation_in_progress; 00255 00256 static const int CHECKPOINT_INTERVAL = 1000; 00257 int _checkpoint_interval; 00258 Timestamp _last_checkpoint_time; 00259 bool _checkpointing_started; 00260 bool _suspend_checkpoints; 00261 00262 00263 static const int MAX_FAILURE = 30000; 00264 static const int MAX_NO_SEND_DELAY = 10000; 00265 static const int MAX_DELAY_DEFAULT = 1000; // In case we make the parameter optional 00266 static const double ALPHA_MAX = 0.6; // We never block for max_delay but a fraction of it 00267 int _max_delay; // How much we can delay processing for 00268 double _actual_delay; 00269 Timestamp _failure_start_time; 00270 00271 // All the stuff below is used to evaluate the approach 00272 MicroBenchmarkType _benchmark; 00273 ofstream _log_file; 00274 vector<long long> _measurements; 00275 int _state_size; 00276 00277 }; 00278 00279 BOREALIS_NAMESPACE_END; 00280 00281 #endif

Generated on Fri Nov 12 15:15:20 2004 for Borealis by doxygen 1.3.8