00001
#ifndef SIMPLEAGG_QBOX_H
00002
#define SIMPLEAGG_QBOX_H
00003
00004
#include <ext/hash_map>
00005
#include <vector>
00006
#include <list>
00007
00008
#include "QBox.h"
00009
#include "Aggregate.h"
00010
#include "Timestamp.h"
00011
00012
#include <NMSTL/serial>
00013
#include <NMSTL/iserial>
00014
#include <NMSTL/oserial>
00015
00016 BOREALIS_NAMESPACE_BEGIN;
00017
00022
using __gnu_cxx::hash_map;
00023
using NMSTL::to_string;
00024
00025 class AggregateQBox :
public QBox
00026 {
00027
friend class AggregateBoxState;
00028
00029
private:
00030
00031
struct HashString
00032 {
00033 size_t operator()(
const string s)
const {
00034
00035 uint32 h = 0;
00036
for (string::const_iterator i = s.begin() ; i != s.end(); ++i)
00037 h = (5 * h) + *i;
00038
return size_t(h);
00039 }
00040
00041
NMSTL_TO_STRING(HashString);
00042 NMSTL_SIMPLY_SERIALIZABLE(HashString, );
00043 };
00044
00045
00046
typedef vector<ptr<Aggregate> > AFVector;
00047 AFVector _afs;
00048
00049
00050 size_t _in_tuple_size;
00051
00052
00053 size_t _group_by_value_size;
00054
00055
00056
00057
00058
enum WindowMethod {
00059 WINDOW_BY_TUPLES,
00060 WINDOW_BY_VALUES
00061 };
00062 WindowMethod _window_method;
00063
int _window_size;
00064
int _advance_size;
00065
00066 ptr<Expression> _window_field;
00067
00068
00069
00070
00071
struct WindowStart {
00072
bool initialized;
00073
int next;
00074 WindowStart() : initialized(
false) { }
00075
00076
NMSTL_TO_STRING(WindowStart);
00077 NMSTL_SIMPLY_SERIALIZABLE(WindowStart, << initialized << next);
00078
00079 };
00080 hash_map<string, WindowStart, HashString> _window_starts;
00081
00082
enum OrderByMethod {
00083 ORDER_BY_FIELD,
00084 ORDER_BY_TUPLENUM
00085 };
00086 OrderByMethod _order_by_method;
00087
00088
EvalContext _ctxt;
00089
00090
00091 vector<ptr<Expression> > _group_by_fields;
00092
00093
class Window;
00094
typedef list<ptr<Window> > WindowList;
00095
00096
00097
00098
class Window {
00099
public:
00100 Window(
const AFVector &af,
const string &group_by) :
00101 _windows(),
00102 _group_by(group_by), _initialized(
false)
00103 {
00104
00105 AFVector::const_iterator i;
00106
for (i = af.begin(); i != af.end(); ++i) {
00107 ptr<Aggregate::Window>w((*i)->openWindow());
00108 ptr<Aggregate>agg_ptr(*i);
00109
00110 w->setAggregate(agg_ptr);
00111 w->init();
00112 _windows.push_back(w);
00113 }
00114 };
00115
00116 Window() {}
00117
virtual ~Window() {};
00118
00119
00120
virtual bool insert_tuple(
const EvalContext& ctxt);
00121
00122
virtual const int getStartValue()
const {
return -999; };
00123
00124
const Timestamp t0()
const {
return _t0; }
00125
const Timestamp lt0()
const {
return _lt0; }
00126
const int tid()
const {
return _tid; }
00127
00128
const string &group_by_value()
const {
return _group_by; }
00129
00130 size_t final_value(
char *pos)
const;
00131
00132
virtual string
as_string()
const {
return NULL; };
00133
00134
void set_timeout_i(WindowList::iterator i) {_timeout_i = i; }
00135
void set_open_windows_i(WindowList::iterator i) {_ow_i = i; }
00136 WindowList::iterator get_timeout_i() {
return _timeout_i; }
00137 WindowList::iterator get_open_windows_i() {
return _ow_i; }
00138
00139
00140
00141
virtual void set_window_field(ptr<Expression> window_field) {}
00142
00143
private:
00144 vector<ptr<Aggregate::Window> > _windows;
00145 string _group_by;
00146
bool _initialized;
00147
Timestamp _t0;
00148
Timestamp _lt0;
00149
int _tid;
00150 WindowList::iterator _timeout_i, _ow_i;
00151
00152
public:
00153
NMSTL_TO_STRING(Window);
00154 NMSTL_SERIAL_BASE(Window, int32,
00155 << _windows << _group_by
00156 << _initialized << _t0
00157 << _lt0 << _tid);
00158
00159
00160 };
00161
00162
00163
class ValueWindow :
public Window {
00164
public:
00165 ValueWindow(
const AFVector &af,
const string &group_by,
00166 ptr<Expression> window_field,
00167
int window_start,
int window_end,
00168
int slack) :
00169 Window(af, group_by), _window_field(window_field),
00170 _first_val(window_start), _last_val(window_end),
00171 _slack_remaining(slack)
00172 {}
00173
00174 ValueWindow() {};
00175
00176
virtual bool insert_tuple(
const EvalContext& ctxt);
00177
00178
virtual const int getStartValue()
const {
00179
return _first_val;
00180 }
00181
00182
void set_window_field(ptr<Expression> window_field) {
00183 _window_field = ptr<Expression>(window_field);
00184 }
00185
00186
virtual string
as_string()
const {
00187
return (
"ValueWindow{[" +
to_string(_first_val) +
", " +
00188
to_string(_last_val) +
") }");
00189 }
00190
00191
private:
00192 ptr<Expression> _window_field;
00193
00194
int _first_val, _last_val;
00195
int _slack_remaining;
00196
00197
public:
00198
NMSTL_TO_STRING(ValueWindow);
00199 NMSTL_SERIAL_SUBCLASS(ValueWindow, Window,
00200 << _first_val << _last_val << _slack_remaining);
00201 };
00202
00203
00204
class TupleWindow :
public Window {
00205
public:
00206 TupleWindow(
const AFVector &af,
const string group_by,
00207
int window_size,
int start_value) :
00208 Window(af, group_by), _window_size(window_size),
00209 _count(0), _start_value(start_value) {};
00210
00211 TupleWindow() {};
00212
00213
virtual bool insert_tuple(
const EvalContext& ctxt);
00214
virtual const int getStartValue()
const {
return _start_value; }
00215
00216
int size()
const {
return _count; };
00217
00218
virtual string
as_string()
const {
00219
return (
"TupleWindow{[" +
to_string(_count) +
" out of "
00220 +
to_string(_window_size) +
" tuples] }");
00221 }
00222
00223
private:
00224
int _window_size;
00225
int _count;
00226
int _start_value;
00227
00228
public:
00229
NMSTL_TO_STRING(TupleWindow);
00230 NMSTL_SERIAL_SUBCLASS(TupleWindow, Window,
00231 << _window_size << _count << _start_value);
00232
00233 };
00234
00235
00236
typedef hash_map<string, WindowList, HashString> WindowHash;
00237 WindowHash _open_windows;
00238
00239
00240
int _slack;
00241
00242
00243
bool _timing_out;
00244
Timestamp _timeout;
00245 WindowList _timeout_list;
00246
00247
00248
00249
void setupParameters();
00250 TupleDescription createOutputDescription();
00251
void parseGroupBy(
const string atts,
const ExprContext& ctxt);
00252
void emitForWindow(
const ptr<Window> w);
00253 string groupByForTuple(
const EvalContext& ctxt)
const;
00254
void insert_new_window(WindowList &wl, ptr<Window> w);
00255
00256
00257
protected:
00258
void setupImpl()
throw (
AuroraException);
00259
void initImpl()
throw (
AuroraException);
00260
void runImpl(
QBoxInvocation&)
throw (
AuroraException);
00261
00262
00263
void setPendingBoxState(ptr<AbstractBoxState> packed_box);
00264 ptr<AbstractBoxState>
packState();
00265
void unpackState(ptr<AbstractBoxState> box_state);
00266
00267
AURORA_DECLARE_QBOX(
AggregateQBox,
"simpleagg");
00268
00269
friend string
to_string(
const WindowHash& m);
00270 };
00271
00272 inline string
to_string(
const AggregateQBox::WindowHash& m) {
00273 string out =
"WindowHash{";
00274
00275
for (AggregateQBox::WindowHash::const_iterator i = m.begin(); i != m.end(); ++i) {
00276
if (i != m.begin()) out.append(
"; ");
00277 out.append(to_escaped_string(i->first)).append(
"->").append(
to_string(i->second));
00278 }
00279
00280 out.append(
"}");
00281
return out;
00282 }
00283
00284
00285 class AggregateBoxState :
public AbstractBoxState
00286 {
00287
00288
public:
00289
00290 typedef hash_map<string, AggregateQBox::WindowStart, AggregateQBox::HashString>
WindowStarts;
00291
00292 typedef AggregateQBox::WindowList
WindowList;
00293
00294 typedef AggregateQBox::WindowHash
WindowHash;
00295
00296
00297 AggregateBoxState(
WindowStarts window_starts,
WindowList timeout_list,
WindowHash open_windows)
00298 {
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312 map<string, AggregateQBox::WindowStart> window_starts_map;
00313
for (WindowStarts::iterator iter = window_starts.begin(); iter != window_starts.end(); ++iter) {
00314 window_starts_map[iter->first] = iter->second;
00315 }
00316 DEBUG <<
"Original WindowStarts hash_map size = " << window_starts.size() <<
" || "
00317 <<
"Copied WindowStarts map size = " << window_starts_map.size();
00318
00319
00320 vector<ptr<AggregateQBox::Window> > timeout_list_vector;
00321
for (WindowList::iterator iter = timeout_list.begin(); iter != timeout_list.end(); ++iter) {
00322 timeout_list_vector.push_back(*iter);
00323 }
00324 DEBUG <<
"Original WindowList list size = " << timeout_list.size() <<
" || "
00325 <<
"Copied WindowList vector size = " << timeout_list_vector.size();
00326
00327
00328 map<string, vector<ptr<AggregateQBox::Window> > > open_windows_map;
00329
for (WindowHash::iterator iter = open_windows.begin(); iter != open_windows.end(); ++iter) {
00330
WindowList temp_list = iter->second;
00331 vector<ptr<AggregateQBox::Window> > temp_vector;
00332
00333
for (WindowList::iterator in_iter = temp_list.begin(); in_iter != temp_list.end(); ++in_iter) {
00334 temp_vector.push_back(*in_iter);
00335 }
00336 open_windows_map[iter->first] = temp_vector;
00337 }
00338 DEBUG <<
"Original WindowHash hash_map size = " << open_windows.size() <<
" || "
00339 <<
"Copied WindowHash map size = " << open_windows_map.size();
00340
00341 OSerialString serial_box;
00342 serial_box << window_starts_map
00343 << timeout_list_vector
00344 << open_windows_map;
00345 _serialized_state = serial_box.str();
00346
00347 DEBUG << _serialized_state;
00348 }
00349
00350 AggregateBoxState() {}
00351 ~AggregateBoxState() {}
00352
00353 string
as_string()
const {
00354 string out(
"AggregateQBoxState{");
00355 out += _serialized_state;
00356 out +=
"}";
00357
return out;
00358 }
00359
00360
NMSTL_SERIAL_SUBCLASS(
AggregateBoxState, AbstractBoxState, );
00361
00362
00363 };
00364
00365
00366
00367
NMSTL_SERIAL_DEFINE(AggregateQBox::Window, 1100);
00368
NMSTL_SERIAL_DEFINE(AggregateQBox::ValueWindow, 1200);
00369
NMSTL_SERIAL_DEFINE(AggregateQBox::TupleWindow, 1300);
00370
NMSTL_SERIAL_DEFINE(
AggregateBoxState, 1000);
00371
00372 BOREALIS_NAMESPACE_END;
00373
00374
#endif