00001
#ifndef RESAMPLE_QBOX_H
00002
#define RESAMPLE_QBOX_H
00003
00004
#include "QBox.h"
00005
#include "Predicate.h"
00006
#include "Parse.h"
00007
#include "FieldExt.h"
00008
#include "BufferList.h"
00009
00010
#include "HashForNewState.h"
00011
#include <vector>
00012
00013 BOREALIS_NAMESPACE_BEGIN;
00014
00015 class ResampleQBox :
public QBox
00016 {
00017
00018
private:
00019
00020 EnqIterator _my_enq;
00021
00022 AggregateFunction* _resample_function;
00023
00024
00025
BufferList *_left_buffer;
00026
BufferList *_right_buffer;
00027
00028 HashForNewState *_state_hash;
00029
00030
int _window_size;
00031
00032
int _left_slack;
00033
int _right_slack;
00034
00035
char *_left_order_att_value;
00036
char *_left_order_att_str;
00037
char _left_order_att_type;
00038
int _left_order_att_size;
00039
00040
char *_right_order_att_value;
00041
00042
char _right_order_att_type;
00043
int _right_order_att_size;
00044
00045
FieldExt *_left_field_att;
00046
FieldExt *_right_field_att;
00047
00048
00049
bool _LEFT_ORDER_BY_TUPLENUM;
00050
bool _RIGHT_ORDER_BY_TUPLENUM;
00051
char* _left_tuple_counter;
00052
char* _right_tuple_counter;
00053
unsigned int* _left_tuple_counter_state;
00054
unsigned int* _right_tuple_counter_state;
00055
00056
char *_left_curr_tuple;
00057
char *_right_curr_tuple;
00058
Timestamp _curr_ts;
00059 int32 _curr_seconds;
00060
00061
int _left_tuple_size;
00062
int _right_tuple_size;
00063
00064
char *_output_tuple;
00065
int _output_tuple_size;
00066
int _num_tuples_emitted;
00067
00068
void emitTuple(NewState *s);
00069
00070
00071
Timestamp getTs(
char *tuple)
00072 {
00073 int32 sec = ((
Timestamp*) tuple)->tv_sec;
00074 int32 micro = ((
Timestamp*) tuple)->tv_usec;
00075
return Timestamp(sec,micro);
00076
00077
00078 }
00079
int getSidSize()
00080 {
00081
return sizeof(
int);
00082 }
00083
00084
int getTsSize()
00085 {
00086
return sizeof(timeval);
00087 }
00088
00089
protected:
00090
00091
void setupImpl()
throw (
AuroraException);
00092
void initImpl()
throw (
AuroraException);
00093
void runImpl(
QBoxInvocation&)
throw (
AuroraException);
00094
00095
AURORA_DECLARE_QBOX(
ResampleQBox,
"resample");
00096 };
00097
00098 BOREALIS_NAMESPACE_END;
00099
00100
#endif