Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

RevisionAggregateQBox.h

Go to the documentation of this file.
00001 #ifndef REVISION_AGG_QBOX_H 00002 #define REVISION_AGG_QBOX_H 00003 00004 #include <ext/hash_map> 00005 #include <vector> 00006 #include <list> 00007 00008 #include "QBox.h" 00009 #include "Aggregate.h" 00010 #include "Timestamp.h" 00011 #include "CP.h" 00012 00013 #include <NMSTL/serial> 00014 #include <NMSTL/iserial> 00015 #include <NMSTL/oserial> 00016 00017 BOREALIS_NAMESPACE_BEGIN; 00018 00023 using __gnu_cxx::hash_map; 00024 using NMSTL::to_string; 00025 00026 #define BEFORE_INDEX 0 00027 #define AFTER_INDEX 1 00028 00029 class RevisionAggregateQBox : public QBox 00030 { 00031 friend class AggregateBoxState; 00032 00033 private: 00034 00035 struct HashString 00036 { 00037 size_t operator()(const string s) const { 00038 // TODO: Find a better hash function or something. 00039 uint32 h = 0; 00040 for (string::const_iterator i = s.begin() ; i != s.end(); ++i) 00041 h = (5 * h) + *i; 00042 return size_t(h); 00043 } 00044 00045 NMSTL_TO_STRING(HashString); 00046 NMSTL_SIMPLY_SERIALIZABLE(HashString, ); 00047 }; 00048 00049 00050 typedef vector<ptr<Aggregate> > AFVector; 00051 AFVector _afs; 00052 00053 // Size of the input tuples 00054 size_t _in_tuple_size; 00055 00056 // Size of the group_by values. 00057 size_t _group_by_value_size; 00058 00059 // Window will contain the last window_size values of 00060 // window_field, advancing by advance_size values 00061 // when the window fills. 00062 enum WindowMethod { 00063 WINDOW_BY_TUPLES, 00064 WINDOW_BY_VALUES 00065 }; 00066 WindowMethod _window_method; 00067 int _window_size; 00068 int _advance_size; 00069 // If WINDOW_BY_VALUES 00070 ptr<Expression> _window_field; 00071 00072 // Keep track of when the next window starts for each group by 00073 // value. 00074 // TODO: Consider tossing this in with the WindowList. 00075 struct WindowStart { 00076 bool initialized; 00077 int next; 00078 WindowStart() : initialized(false) { } 00079 00080 NMSTL_TO_STRING(WindowStart); 00081 NMSTL_SIMPLY_SERIALIZABLE(WindowStart, << initialized << next); 00082 00083 }; 00084 hash_map<string, WindowStart, HashString> _window_starts; 00085 00086 enum OrderByMethod { 00087 ORDER_BY_FIELD, 00088 ORDER_BY_TUPLENUM 00089 }; 00090 OrderByMethod _order_by_method; 00091 00092 EvalContext _ctxt; 00093 00094 // Group by a set of attributes. 00095 vector<ptr<Expression> > _group_by_fields; 00096 00097 class Window; 00098 typedef list<ptr<Window> > WindowList; 00099 00100 // Abstract class for structure that stores a Window 00101 // Manages the state of the Aggregate Functions. 00102 class Window { 00103 public: 00104 Window(const AFVector &af, const string &group_by) : 00105 _windows(), 00106 _group_by(group_by), _initialized(false) 00107 { 00108 // Open up windows. 00109 AFVector::const_iterator i; 00110 for (i = af.begin(); i != af.end(); ++i) { 00111 ptr<Aggregate::Window>w((*i)->openWindow()); 00112 ptr<Aggregate>agg_ptr(*i); 00113 //w->setAggregate(**i); 00114 w->setAggregate(agg_ptr); 00115 w->init(); 00116 _windows.push_back(w); 00117 } 00118 }; 00119 00120 Window() {} 00121 virtual ~Window() {}; 00122 00123 // Return true if this window should emit now. 00124 virtual bool insert_tuple(const EvalContext& ctxt); 00125 // returns true if the tuple is to be put in this window 00126 virtual bool window_affected(const EvalContext& ctxt) { return false; } 00127 // removes the tuple from the window 00128 void remove_tuple(const EvalContext& ctxt); 00129 00130 virtual const int getStartValue() const { return -999; }; 00131 00132 const Timestamp t0() const { return _t0; } 00133 const Timestamp lt0() const { return _lt0; } 00134 const int tid() const { return _tid; } 00135 00136 const string &group_by_value() const { return _group_by; } 00137 00138 size_t final_value(char *pos) const; 00139 string final_value_string() const; 00140 00141 virtual string as_string() const { return NULL; }; 00142 00143 void set_timeout_i(WindowList::iterator i) {_timeout_i = i; } 00144 void set_open_windows_i(WindowList::iterator i) {_ow_i = i; } 00145 WindowList::iterator get_timeout_i() { return _timeout_i; } 00146 WindowList::iterator get_open_windows_i() { return _ow_i; } 00147 00148 // set _window_field for ValueWindow 00149 // here because of serialization problems with Expression 00150 virtual void set_window_field(ptr<Expression> window_field) {} 00151 00152 private: 00153 vector<ptr<Aggregate::Window> > _windows; 00154 string _group_by; 00155 bool _initialized; 00156 Timestamp _t0; 00157 Timestamp _lt0; 00158 int _tid; 00159 WindowList::iterator _timeout_i, _ow_i; 00160 00161 public: 00162 NMSTL_TO_STRING(Window); 00163 NMSTL_SERIAL_BASE(Window, int32, 00164 << _windows << _group_by 00165 << _initialized << _t0 00166 << _lt0 << _tid); 00167 00168 00169 }; 00170 00171 // Structure that stores a Window by values 00172 class ValueWindow : public Window { 00173 public: 00174 ValueWindow(const AFVector &af, const string &group_by, 00175 ptr<Expression> window_field, 00176 int window_start, int window_end, 00177 int slack) : 00178 Window(af, group_by), _window_field(window_field), 00179 _first_val(window_start), _last_val(window_end), 00180 _slack_remaining(slack) 00181 {} 00182 00183 ValueWindow() {}; 00184 00185 virtual bool insert_tuple(const EvalContext& ctxt); 00186 virtual bool window_affected(const EvalContext& ctxt); 00187 00188 virtual const int getStartValue() const { 00189 return _first_val; 00190 } 00191 00192 void set_window_field(ptr<Expression> window_field) { 00193 _window_field = ptr<Expression>(window_field); 00194 } 00195 00196 virtual string as_string() const { 00197 return ("ValueWindow{[" + to_string(_first_val) + ", " + 00198 to_string(_last_val) + ") }"); 00199 } 00200 00201 private: 00202 ptr<Expression> _window_field; 00203 // Window runs from _first_val (inclusive) to _last_val (exclusive). 00204 int _first_val, _last_val; 00205 int _slack_remaining; 00206 00207 public: 00208 NMSTL_TO_STRING(ValueWindow); 00209 NMSTL_SERIAL_SUBCLASS(ValueWindow, Window, 00210 << _first_val << _last_val << _slack_remaining); 00211 }; 00212 00213 // Structure that stores a Window by tuples 00214 class TupleWindow : public Window { 00215 public: 00216 TupleWindow(const AFVector &af, const string group_by, 00217 int window_size, int start_value) : 00218 Window(af, group_by), _window_size(window_size), 00219 _count(0), _start_value(start_value) {}; 00220 00221 TupleWindow() {}; 00222 00223 virtual bool insert_tuple(const EvalContext& ctxt); 00224 virtual bool window_affected(const EvalContext& ctxt); 00225 00226 virtual const int getStartValue() const { return _start_value; } 00227 00228 int size() const { return _count; }; 00229 00230 virtual string as_string() const { 00231 return ("TupleWindow{[" + to_string(_count) + " out of " 00232 + to_string(_window_size) + " tuples] }"); 00233 } 00234 00235 private: 00236 int _window_size; 00237 int _count; 00238 int _start_value; 00239 00240 public: 00241 NMSTL_TO_STRING(TupleWindow); 00242 NMSTL_SERIAL_SUBCLASS(TupleWindow, Window, 00243 << _window_size << _count << _start_value); 00244 00245 }; 00246 00247 // Map of group-by values to lists of open windows. 00248 typedef hash_map<string, WindowList, HashString> WindowHash; 00249 WindowHash _open_windows; 00250 00251 // Slack stuff 00252 int _slack; 00253 00254 // Timeout stuff. 00255 bool _timing_out; // True if we are doing timeout. 00256 Timestamp _timeout; // Timeout in seconds and useconds. 00257 WindowList _timeout_list; // List of windows, in the order they 00258 // will be timed-out. 00259 00260 // Private functions. 00261 void setupParameters(); 00262 TupleDescription createOutputDescription(); 00263 void parseGroupBy(const string atts, const ExprContext& ctxt); 00264 void emitForWindow(const ptr<Window> w); 00265 string groupByForTuple(const EvalContext& ctxt) const; 00266 void insert_new_window(WindowList &wl, ptr<Window> w); 00267 00268 00269 00270 /****** Added on this box *******/ 00271 int _revision_id_counter; 00272 map<int, const char*> _revision_tuple_store; 00273 // the very first window value, since this value determines values for all windows 00274 int _first_window_start; 00275 00276 struct RevisionStates { 00277 bool initialized; 00278 int revision_id; 00279 bool revision_processed; 00280 WindowHash before_states; 00281 WindowHash after_states; 00282 hash_map<string, WindowStart, HashString> before_wstarts; 00283 hash_map<string, WindowStart, HashString> after_wstarts; 00284 hash_map<string, WindowStart, HashString> wstarts; 00285 00286 RevisionStates() : initialized(false), revision_processed(false) {} 00287 00288 string as_string() const { 00289 return string() + "RevisionStates{" + 00290 "revision-id=" + revision_id 00291 + "," + before_states.size() + " before_states" + 00292 + "," + after_states.size() + " after_states" + 00293 + "," + before_wstarts.size() + " before_wstarts" + 00294 + "," + after_wstarts.size() + " after_wstarts" + 00295 "}"; 00296 } 00297 }; 00298 00299 map<int, RevisionStates> _revision_states; 00300 00301 // process revision tuple 00302 // returns true when revision tuple is outputted 00303 bool processRevision(Tuple revision_tuple, Tuple historical_tuple, int revision_id); 00304 // returns a pair tid->concatenated group-by, order-by, agg values 00305 pair<int, string> produceCoreTuple(const ptr<Window> w); 00306 00307 bool checkAndEmitRevision(bool before_emitp, bool after_emitp, ptr<Window> before_w, ptr<Window> after_w); 00308 void emitRevision(const ptr<Window> w, TupleType revision_type); 00309 /*******************************/ 00310 00311 protected: 00312 void setupImpl() throw (AuroraException); 00313 void initImpl() throw (AuroraException); 00314 void runImpl(QBoxInvocation&) throw (AuroraException); 00315 00316 AURORA_DECLARE_QBOX(RevisionAggregateQBox, "revisionagg"); 00317 00318 friend string to_string(const WindowHash& m); 00319 }; 00320 00321 inline string to_string(const RevisionAggregateQBox::WindowHash& m) { 00322 string out = "WindowHash{"; 00323 00324 for (RevisionAggregateQBox::WindowHash::const_iterator i = m.begin(); i != m.end(); ++i) { 00325 if (i != m.begin()) out.append("; "); 00326 out.append(to_escaped_string(i->first)).append("->").append(to_string(i->second)); 00327 } 00328 00329 out.append("}"); 00330 return out; 00331 } 00332 00333 NMSTL_SERIAL_DEFINE(RevisionAggregateQBox::Window, 1100); 00334 NMSTL_SERIAL_DEFINE(RevisionAggregateQBox::ValueWindow, 1200); 00335 NMSTL_SERIAL_DEFINE(RevisionAggregateQBox::TupleWindow, 1300); 00336 00337 00338 BOREALIS_NAMESPACE_END; 00339 00340 #endif

Generated on Fri Nov 12 15:15:21 2004 for Borealis by doxygen 1.3.8