00001
#ifndef REVISION_AGG_QBOX_H
00002
#define REVISION_AGG_QBOX_H
00003
00004
#include <ext/hash_map>
00005
#include <vector>
00006
#include <list>
00007
00008
#include "QBox.h"
00009
#include "Aggregate.h"
00010
#include "Timestamp.h"
00011
#include "CP.h"
00012
00013
#include <NMSTL/serial>
00014
#include <NMSTL/iserial>
00015
#include <NMSTL/oserial>
00016
00017 BOREALIS_NAMESPACE_BEGIN;
00018
00023
using __gnu_cxx::hash_map;
00024
using NMSTL::to_string;
00025
00026 #define BEFORE_INDEX 0
00027 #define AFTER_INDEX 1
00028
00029 class RevisionAggregateQBox :
public QBox
00030 {
00031
friend class AggregateBoxState;
00032
00033
private:
00034
00035
struct HashString
00036 {
00037 size_t operator()(
const string s)
const {
00038
00039 uint32 h = 0;
00040
for (string::const_iterator i = s.begin() ; i != s.end(); ++i)
00041 h = (5 * h) + *i;
00042
return size_t(h);
00043 }
00044
00045
NMSTL_TO_STRING(HashString);
00046 NMSTL_SIMPLY_SERIALIZABLE(HashString, );
00047 };
00048
00049
00050
typedef vector<ptr<Aggregate> > AFVector;
00051 AFVector _afs;
00052
00053
00054 size_t _in_tuple_size;
00055
00056
00057 size_t _group_by_value_size;
00058
00059
00060
00061
00062
enum WindowMethod {
00063 WINDOW_BY_TUPLES,
00064 WINDOW_BY_VALUES
00065 };
00066 WindowMethod _window_method;
00067
int _window_size;
00068
int _advance_size;
00069
00070 ptr<Expression> _window_field;
00071
00072
00073
00074
00075
struct WindowStart {
00076
bool initialized;
00077
int next;
00078 WindowStart() : initialized(
false) { }
00079
00080
NMSTL_TO_STRING(WindowStart);
00081 NMSTL_SIMPLY_SERIALIZABLE(WindowStart, << initialized << next);
00082
00083 };
00084 hash_map<string, WindowStart, HashString> _window_starts;
00085
00086
enum OrderByMethod {
00087 ORDER_BY_FIELD,
00088 ORDER_BY_TUPLENUM
00089 };
00090 OrderByMethod _order_by_method;
00091
00092
EvalContext _ctxt;
00093
00094
00095 vector<ptr<Expression> > _group_by_fields;
00096
00097
class Window;
00098
typedef list<ptr<Window> > WindowList;
00099
00100
00101
00102
class Window {
00103
public:
00104 Window(
const AFVector &af,
const string &group_by) :
00105 _windows(),
00106 _group_by(group_by), _initialized(
false)
00107 {
00108
00109 AFVector::const_iterator i;
00110
for (i = af.begin(); i != af.end(); ++i) {
00111 ptr<Aggregate::Window>w((*i)->openWindow());
00112 ptr<Aggregate>agg_ptr(*i);
00113
00114 w->setAggregate(agg_ptr);
00115 w->init();
00116 _windows.push_back(w);
00117 }
00118 };
00119
00120 Window() {}
00121
virtual ~Window() {};
00122
00123
00124
virtual bool insert_tuple(
const EvalContext& ctxt);
00125
00126
virtual bool window_affected(
const EvalContext& ctxt) {
return false; }
00127
00128
void remove_tuple(
const EvalContext& ctxt);
00129
00130
virtual const int getStartValue()
const {
return -999; };
00131
00132
const Timestamp t0()
const {
return _t0; }
00133
const Timestamp lt0()
const {
return _lt0; }
00134
const int tid()
const {
return _tid; }
00135
00136
const string &group_by_value()
const {
return _group_by; }
00137
00138 size_t final_value(
char *pos)
const;
00139 string final_value_string()
const;
00140
00141
virtual string
as_string()
const {
return NULL; };
00142
00143
void set_timeout_i(WindowList::iterator i) {_timeout_i = i; }
00144
void set_open_windows_i(WindowList::iterator i) {_ow_i = i; }
00145 WindowList::iterator get_timeout_i() {
return _timeout_i; }
00146 WindowList::iterator get_open_windows_i() {
return _ow_i; }
00147
00148
00149
00150
virtual void set_window_field(ptr<Expression> window_field) {}
00151
00152
private:
00153 vector<ptr<Aggregate::Window> > _windows;
00154 string _group_by;
00155
bool _initialized;
00156
Timestamp _t0;
00157
Timestamp _lt0;
00158
int _tid;
00159 WindowList::iterator _timeout_i, _ow_i;
00160
00161
public:
00162
NMSTL_TO_STRING(Window);
00163 NMSTL_SERIAL_BASE(Window, int32,
00164 << _windows << _group_by
00165 << _initialized << _t0
00166 << _lt0 << _tid);
00167
00168
00169 };
00170
00171
00172
class ValueWindow :
public Window {
00173
public:
00174 ValueWindow(
const AFVector &af,
const string &group_by,
00175 ptr<Expression> window_field,
00176
int window_start,
int window_end,
00177
int slack) :
00178 Window(af, group_by), _window_field(window_field),
00179 _first_val(window_start), _last_val(window_end),
00180 _slack_remaining(slack)
00181 {}
00182
00183 ValueWindow() {};
00184
00185
virtual bool insert_tuple(
const EvalContext& ctxt);
00186
virtual bool window_affected(
const EvalContext& ctxt);
00187
00188
virtual const int getStartValue()
const {
00189
return _first_val;
00190 }
00191
00192
void set_window_field(ptr<Expression> window_field) {
00193 _window_field = ptr<Expression>(window_field);
00194 }
00195
00196
virtual string
as_string()
const {
00197
return (
"ValueWindow{[" +
to_string(_first_val) +
", " +
00198
to_string(_last_val) +
") }");
00199 }
00200
00201
private:
00202 ptr<Expression> _window_field;
00203
00204
int _first_val, _last_val;
00205
int _slack_remaining;
00206
00207
public:
00208
NMSTL_TO_STRING(ValueWindow);
00209 NMSTL_SERIAL_SUBCLASS(ValueWindow, Window,
00210 << _first_val << _last_val << _slack_remaining);
00211 };
00212
00213
00214
class TupleWindow :
public Window {
00215
public:
00216 TupleWindow(
const AFVector &af,
const string group_by,
00217
int window_size,
int start_value) :
00218 Window(af, group_by), _window_size(window_size),
00219 _count(0), _start_value(start_value) {};
00220
00221 TupleWindow() {};
00222
00223
virtual bool insert_tuple(
const EvalContext& ctxt);
00224
virtual bool window_affected(
const EvalContext& ctxt);
00225
00226
virtual const int getStartValue()
const {
return _start_value; }
00227
00228
int size()
const {
return _count; };
00229
00230
virtual string
as_string()
const {
00231
return (
"TupleWindow{[" +
to_string(_count) +
" out of "
00232 +
to_string(_window_size) +
" tuples] }");
00233 }
00234
00235
private:
00236
int _window_size;
00237
int _count;
00238
int _start_value;
00239
00240
public:
00241
NMSTL_TO_STRING(TupleWindow);
00242 NMSTL_SERIAL_SUBCLASS(TupleWindow, Window,
00243 << _window_size << _count << _start_value);
00244
00245 };
00246
00247
00248
typedef hash_map<string, WindowList, HashString> WindowHash;
00249 WindowHash _open_windows;
00250
00251
00252
int _slack;
00253
00254
00255
bool _timing_out;
00256
Timestamp _timeout;
00257 WindowList _timeout_list;
00258
00259
00260
00261
void setupParameters();
00262 TupleDescription createOutputDescription();
00263
void parseGroupBy(
const string atts,
const ExprContext& ctxt);
00264
void emitForWindow(
const ptr<Window> w);
00265 string groupByForTuple(
const EvalContext& ctxt)
const;
00266
void insert_new_window(WindowList &wl, ptr<Window> w);
00267
00268
00269
00270
00271
int _revision_id_counter;
00272 map<int, const char*> _revision_tuple_store;
00273
00274
int _first_window_start;
00275
00276
struct RevisionStates {
00277
bool initialized;
00278
int revision_id;
00279
bool revision_processed;
00280 WindowHash before_states;
00281 WindowHash after_states;
00282 hash_map<string, WindowStart, HashString> before_wstarts;
00283 hash_map<string, WindowStart, HashString> after_wstarts;
00284 hash_map<string, WindowStart, HashString> wstarts;
00285
00286 RevisionStates() : initialized(
false), revision_processed(
false) {}
00287
00288 string
as_string()
const {
00289
return string() +
"RevisionStates{" +
00290
"revision-id=" + revision_id
00291 +
"," + before_states.size() +
" before_states" +
00292 +
"," + after_states.size() +
" after_states" +
00293 +
"," + before_wstarts.size() +
" before_wstarts" +
00294 +
"," + after_wstarts.size() +
" after_wstarts" +
00295
"}";
00296 }
00297 };
00298
00299 map<int, RevisionStates> _revision_states;
00300
00301
00302
00303
bool processRevision(Tuple revision_tuple, Tuple historical_tuple,
int revision_id);
00304
00305 pair<int, string> produceCoreTuple(
const ptr<Window> w);
00306
00307
bool checkAndEmitRevision(
bool before_emitp,
bool after_emitp, ptr<Window> before_w, ptr<Window> after_w);
00308
void emitRevision(
const ptr<Window> w, TupleType revision_type);
00309
00310
00311
protected:
00312
void setupImpl()
throw (
AuroraException);
00313
void initImpl()
throw (
AuroraException);
00314
void runImpl(
QBoxInvocation&)
throw (
AuroraException);
00315
00316
AURORA_DECLARE_QBOX(
RevisionAggregateQBox,
"revisionagg");
00317
00318
friend string
to_string(
const WindowHash& m);
00319 };
00320
00321 inline string
to_string(
const RevisionAggregateQBox::WindowHash& m) {
00322 string out =
"WindowHash{";
00323
00324
for (RevisionAggregateQBox::WindowHash::const_iterator i = m.begin(); i != m.end(); ++i) {
00325
if (i != m.begin()) out.append(
"; ");
00326 out.append(to_escaped_string(i->first)).append(
"->").append(
to_string(i->second));
00327 }
00328
00329 out.append(
"}");
00330
return out;
00331 }
00332
00333
NMSTL_SERIAL_DEFINE(RevisionAggregateQBox::Window, 1100);
00334
NMSTL_SERIAL_DEFINE(RevisionAggregateQBox::ValueWindow, 1200);
00335
NMSTL_SERIAL_DEFINE(RevisionAggregateQBox::TupleWindow, 1300);
00336
00337
00338 BOREALIS_NAMESPACE_END;
00339
00340
#endif