Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

AggregateQBox.h

Go to the documentation of this file.
00001 #ifndef SIMPLEAGG_QBOX_H 00002 #define SIMPLEAGG_QBOX_H 00003 00004 #include <ext/hash_map> 00005 #include <vector> 00006 #include <list> 00007 00008 #include "QBox.h" 00009 #include "Aggregate.h" 00010 #include "Timestamp.h" 00011 00012 #include <NMSTL/serial> 00013 #include <NMSTL/iserial> 00014 #include <NMSTL/oserial> 00015 00016 BOREALIS_NAMESPACE_BEGIN; 00017 00022 using __gnu_cxx::hash_map; 00023 using NMSTL::to_string; 00024 00025 class AggregateQBox : public QBox 00026 { 00027 friend class AggregateBoxState; 00028 00029 private: 00030 00031 struct HashString 00032 { 00033 size_t operator()(const string s) const { 00034 // TODO: Find a better hash function or something. 00035 uint32 h = 0; 00036 for (string::const_iterator i = s.begin() ; i != s.end(); ++i) 00037 h = (5 * h) + *i; 00038 return size_t(h); 00039 } 00040 00041 NMSTL_TO_STRING(HashString); 00042 NMSTL_SIMPLY_SERIALIZABLE(HashString, ); 00043 }; 00044 00045 00046 typedef vector<ptr<Aggregate> > AFVector; 00047 AFVector _afs; 00048 00049 // Size of the input tuples 00050 size_t _in_tuple_size; 00051 00052 // Size of the group_by values. 00053 size_t _group_by_value_size; 00054 00055 // Window will contain the last window_size values of 00056 // window_field, advancing by advance_size values 00057 // when the window fills. 00058 enum WindowMethod { 00059 WINDOW_BY_TUPLES, 00060 WINDOW_BY_VALUES 00061 }; 00062 WindowMethod _window_method; 00063 int _window_size; 00064 int _advance_size; 00065 // If WINDOW_BY_VALUES 00066 ptr<Expression> _window_field; 00067 00068 // Keep track of when the next window starts for each group by 00069 // value. 00070 // TODO: Consider tossing this in with the WindowList. 00071 struct WindowStart { 00072 bool initialized; 00073 int next; 00074 WindowStart() : initialized(false) { } 00075 00076 NMSTL_TO_STRING(WindowStart); 00077 NMSTL_SIMPLY_SERIALIZABLE(WindowStart, << initialized << next); 00078 00079 }; 00080 hash_map<string, WindowStart, HashString> _window_starts; 00081 00082 enum OrderByMethod { 00083 ORDER_BY_FIELD, 00084 ORDER_BY_TUPLENUM 00085 }; 00086 OrderByMethod _order_by_method; 00087 00088 EvalContext _ctxt; 00089 00090 // Group by a set of attributes. 00091 vector<ptr<Expression> > _group_by_fields; 00092 00093 class Window; 00094 typedef list<ptr<Window> > WindowList; 00095 00096 // Abstract class for structure that stores a Window 00097 // Manages the state of the Aggregate Functions. 00098 class Window { 00099 public: 00100 Window(const AFVector &af, const string &group_by) : 00101 _windows(), 00102 _group_by(group_by), _initialized(false) 00103 { 00104 // Open up windows. 00105 AFVector::const_iterator i; 00106 for (i = af.begin(); i != af.end(); ++i) { 00107 ptr<Aggregate::Window>w((*i)->openWindow()); 00108 ptr<Aggregate>agg_ptr(*i); 00109 //w->setAggregate(**i); 00110 w->setAggregate(agg_ptr); 00111 w->init(); 00112 _windows.push_back(w); 00113 } 00114 }; 00115 00116 Window() {} 00117 virtual ~Window() {}; 00118 00119 // Return true if this window should emit now. 00120 virtual bool insert_tuple(const EvalContext& ctxt); 00121 00122 virtual const int getStartValue() const { return -999; }; 00123 00124 const Timestamp t0() const { return _t0; } 00125 const Timestamp lt0() const { return _lt0; } 00126 const int tid() const { return _tid; } 00127 00128 const string &group_by_value() const { return _group_by; } 00129 00130 size_t final_value(char *pos) const; 00131 00132 virtual string as_string() const { return NULL; }; 00133 00134 void set_timeout_i(WindowList::iterator i) {_timeout_i = i; } 00135 void set_open_windows_i(WindowList::iterator i) {_ow_i = i; } 00136 WindowList::iterator get_timeout_i() { return _timeout_i; } 00137 WindowList::iterator get_open_windows_i() { return _ow_i; } 00138 00139 // set _window_field for ValueWindow 00140 // here because of serialization problems with Expression 00141 virtual void set_window_field(ptr<Expression> window_field) {} 00142 00143 private: 00144 vector<ptr<Aggregate::Window> > _windows; 00145 string _group_by; 00146 bool _initialized; 00147 Timestamp _t0; 00148 Timestamp _lt0; 00149 int _tid; 00150 WindowList::iterator _timeout_i, _ow_i; 00151 00152 public: 00153 NMSTL_TO_STRING(Window); 00154 NMSTL_SERIAL_BASE(Window, int32, 00155 << _windows << _group_by 00156 << _initialized << _t0 00157 << _lt0 << _tid); 00158 00159 00160 }; 00161 00162 // Structure that stores a Window by values 00163 class ValueWindow : public Window { 00164 public: 00165 ValueWindow(const AFVector &af, const string &group_by, 00166 ptr<Expression> window_field, 00167 int window_start, int window_end, 00168 int slack) : 00169 Window(af, group_by), _window_field(window_field), 00170 _first_val(window_start), _last_val(window_end), 00171 _slack_remaining(slack) 00172 {} 00173 00174 ValueWindow() {}; 00175 00176 virtual bool insert_tuple(const EvalContext& ctxt); 00177 00178 virtual const int getStartValue() const { 00179 return _first_val; 00180 } 00181 00182 void set_window_field(ptr<Expression> window_field) { 00183 _window_field = ptr<Expression>(window_field); 00184 } 00185 00186 virtual string as_string() const { 00187 return ("ValueWindow{[" + to_string(_first_val) + ", " + 00188 to_string(_last_val) + ") }"); 00189 } 00190 00191 private: 00192 ptr<Expression> _window_field; 00193 // Window runs from _first_val (inclusive) to _last_val (exclusive). 00194 int _first_val, _last_val; 00195 int _slack_remaining; 00196 00197 public: 00198 NMSTL_TO_STRING(ValueWindow); 00199 NMSTL_SERIAL_SUBCLASS(ValueWindow, Window, 00200 << _first_val << _last_val << _slack_remaining); 00201 }; 00202 00203 // Structure that stores a Window by tuples 00204 class TupleWindow : public Window { 00205 public: 00206 TupleWindow(const AFVector &af, const string group_by, 00207 int window_size, int start_value) : 00208 Window(af, group_by), _window_size(window_size), 00209 _count(0), _start_value(start_value) {}; 00210 00211 TupleWindow() {}; 00212 00213 virtual bool insert_tuple(const EvalContext& ctxt); 00214 virtual const int getStartValue() const { return _start_value; } 00215 00216 int size() const { return _count; }; 00217 00218 virtual string as_string() const { 00219 return ("TupleWindow{[" + to_string(_count) + " out of " 00220 + to_string(_window_size) + " tuples] }"); 00221 } 00222 00223 private: 00224 int _window_size; 00225 int _count; 00226 int _start_value; 00227 00228 public: 00229 NMSTL_TO_STRING(TupleWindow); 00230 NMSTL_SERIAL_SUBCLASS(TupleWindow, Window, 00231 << _window_size << _count << _start_value); 00232 00233 }; 00234 00235 // Map of group-by values to lists of open windows. 00236 typedef hash_map<string, WindowList, HashString> WindowHash; 00237 WindowHash _open_windows; 00238 00239 // Slack stuff 00240 int _slack; 00241 00242 // Timeout stuff. 00243 bool _timing_out; // True if we are doing timeout. 00244 Timestamp _timeout; // Timeout in seconds and useconds. 00245 WindowList _timeout_list; // List of windows, in the order they 00246 // will be timed-out. 00247 00248 // Private functions. 00249 void setupParameters(); 00250 TupleDescription createOutputDescription(); 00251 void parseGroupBy(const string atts, const ExprContext& ctxt); 00252 void emitForWindow(const ptr<Window> w); 00253 string groupByForTuple(const EvalContext& ctxt) const; 00254 void insert_new_window(WindowList &wl, ptr<Window> w); 00255 00256 00257 protected: 00258 void setupImpl() throw (AuroraException); 00259 void initImpl() throw (AuroraException); 00260 void runImpl(QBoxInvocation&) throw (AuroraException); 00261 00262 // packing and unpacking states 00263 void setPendingBoxState(ptr<AbstractBoxState> packed_box); 00264 ptr<AbstractBoxState> packState(); 00265 void unpackState(ptr<AbstractBoxState> box_state); 00266 00267 AURORA_DECLARE_QBOX(AggregateQBox, "simpleagg"); 00268 00269 friend string to_string(const WindowHash& m); 00270 }; 00271 00272 inline string to_string(const AggregateQBox::WindowHash& m) { 00273 string out = "WindowHash{"; 00274 00275 for (AggregateQBox::WindowHash::const_iterator i = m.begin(); i != m.end(); ++i) { 00276 if (i != m.begin()) out.append("; "); 00277 out.append(to_escaped_string(i->first)).append("->").append(to_string(i->second)); 00278 } 00279 00280 out.append("}"); 00281 return out; 00282 } 00283 00284 00285 class AggregateBoxState : public AbstractBoxState 00286 { 00287 00288 public: 00289 // Keep track of when the next window starts for each group by value. 00290 typedef hash_map<string, AggregateQBox::WindowStart, AggregateQBox::HashString> WindowStarts; 00291 // List of windows, in the order they will be timed-out. 00292 typedef AggregateQBox::WindowList WindowList; 00293 // hash_map of group-by values to lists of open windows 00294 typedef AggregateQBox::WindowHash WindowHash; 00295 00296 00297 AggregateBoxState(WindowStarts window_starts, WindowList timeout_list, WindowHash open_windows) 00298 { 00299 /* 00300 NMSTL OSerial can easily serialize vector, map, set 00301 serializing and deserializing hash_maps and lists are difficult. 00302 - need to write to_string methods to serialize 00303 - DON'T KNOW WHAT METHODS TO WRITE TO DESERIALIZE 00304 So the solution I (Anurag) came up with 00305 - convert hash_map to map to serialize 00306 - convert list to vector 00307 - vice-versa to deserialize 00308 IF SOMEONE KNOWS A BETTER WAY - LEMME KNOW 00309 */ 00310 00311 // convert hash_map window_starts to map 00312 map<string, AggregateQBox::WindowStart> window_starts_map; 00313 for (WindowStarts::iterator iter = window_starts.begin(); iter != window_starts.end(); ++iter) { 00314 window_starts_map[iter->first] = iter->second; 00315 } 00316 DEBUG << "Original WindowStarts hash_map size = " << window_starts.size() << " || " 00317 << "Copied WindowStarts map size = " << window_starts_map.size(); 00318 00319 // convert list timeout_list to vector 00320 vector<ptr<AggregateQBox::Window> > timeout_list_vector; 00321 for (WindowList::iterator iter = timeout_list.begin(); iter != timeout_list.end(); ++iter) { 00322 timeout_list_vector.push_back(*iter); 00323 } 00324 DEBUG << "Original WindowList list size = " << timeout_list.size() << " || " 00325 << "Copied WindowList vector size = " << timeout_list_vector.size(); 00326 00327 // convert hash_map open_windows to map 00328 map<string, vector<ptr<AggregateQBox::Window> > > open_windows_map; 00329 for (WindowHash::iterator iter = open_windows.begin(); iter != open_windows.end(); ++iter) { 00330 WindowList temp_list = iter->second; 00331 vector<ptr<AggregateQBox::Window> > temp_vector; 00332 // change the list to vector 00333 for (WindowList::iterator in_iter = temp_list.begin(); in_iter != temp_list.end(); ++in_iter) { 00334 temp_vector.push_back(*in_iter); 00335 } 00336 open_windows_map[iter->first] = temp_vector; 00337 } 00338 DEBUG << "Original WindowHash hash_map size = " << open_windows.size() << " || " 00339 << "Copied WindowHash map size = " << open_windows_map.size(); 00340 00341 OSerialString serial_box; 00342 serial_box << window_starts_map 00343 << timeout_list_vector 00344 << open_windows_map; 00345 _serialized_state = serial_box.str(); 00346 00347 DEBUG << _serialized_state; 00348 } 00349 00350 AggregateBoxState() {} 00351 ~AggregateBoxState() {} 00352 00353 string as_string() const { 00354 string out("AggregateQBoxState{"); 00355 out += _serialized_state; 00356 out += "}"; 00357 return out; 00358 } 00359 00360 NMSTL_SERIAL_SUBCLASS(AggregateBoxState, AbstractBoxState, ); 00361 00362 00363 }; 00364 00365 //NMSTL_TO_STRING(AggregateBoxState); 00366 00367 NMSTL_SERIAL_DEFINE(AggregateQBox::Window, 1100); 00368 NMSTL_SERIAL_DEFINE(AggregateQBox::ValueWindow, 1200); 00369 NMSTL_SERIAL_DEFINE(AggregateQBox::TupleWindow, 1300); 00370 NMSTL_SERIAL_DEFINE(AggregateBoxState, 1000); 00371 00372 BOREALIS_NAMESPACE_END; 00373 00374 #endif

Generated on Fri Nov 12 15:15:20 2004 for Borealis by doxygen 1.3.8