00001
#ifndef JOIN_QBOX_H
00002
#define JOIN_QBOX_H
00003
00004
#include "QBox.h"
00005
#include "Tuple.h"
00006
#include <list>
00007
00008 BOREALIS_NAMESPACE_BEGIN;
00009
00015 class JoinQBox :
public QBox
00016 {
00017
00018
friend class JoinBoxState;
00019
00020
00021
friend class SJoinQBox;
00022
00023
protected:
00024
void setupImpl()
throw (
AuroraException);
00025
void initImpl()
throw (
AuroraException);
00026
void runImpl(
QBoxInvocation&)
throw (
AuroraException);
00027
00028
00029
void setPendingBoxState(ptr<AbstractBoxState> packed_box);
00030 ptr<AbstractBoxState>
packState();
00031
void unpackState(ptr<AbstractBoxState> box_state);
00032
00033
private:
00035
00036
static const int LEFT;
00037
static const int RIGHT;
00038
00040 string tupleToString(
int stream_id,
const void *data)
00041 {
00042
return getInputDescription( stream_id ).tupleToString( data );
00043 }
00044
00046
void runStream(
QBoxInvocation &inv,
int stream_id,
bool &outputted );
00047
00049
void emitTuple(
EvalContext &ctxt );
00050
00052
void produceDontEmitTuple(
EvalContext &ctxt );
00053
00055
bool _timing_out;
00056
Timestamp _timeout;
00057
00058
00059
class BufferElement
00060 {
00061
public:
00062
const void *getData()
const
00063
{
00064
00065
return _data.data();
00066 }
00067
00068
const Tuple &getTuple()
const
00069
{
00070
if (_tuple.getData() != _data.data())
00071 { FATAL <<
"Tuple points to wrong data";
00072 }
00073
return _tuple;
00074 }
00075
00076
Timestamp getTimestamp()
const
00077
{
return _tuple.getTimestamp();
00078 }
00079
00080
int getFieldValue()
const
00081
{
return _field_value;
00082 }
00083
00084 BufferElement(
const void *data,
00085 size_t length,
00086
int field_value = 0 ) :
00087
00088 _data( (
const char *)data, length ),
00089
00090 _tuple(Tuple::factory( _data)),
00091 _field_value( field_value ) {}
00092
00093
private:
00094
const string _data;
00095
const Tuple _tuple;
00096
const int _field_value;
00097 };
00098
00099
00100
class JoinBuffer
00101 {
00102
public:
00103 JoinBuffer(
int stream_id,
JoinQBox *qbox ) :
00104 _stream_id( stream_id ),
00105 _qbox( qbox ),
00106 _tuple_size( _qbox->getInputDescription( _stream_id ).getSizeInBytes()),
00107 _ctxt( 2 )
00108 {}
00109
00110
virtual ~JoinBuffer() {}
00111
00112
virtual void insertTuple(
const void *tuple_data );
00113
00114
void join(
const void *other_tuple,
00115
Expression *predicate,
00116
bool &outputted );
00117
00119
void joinNoEmit(
const void *other_tuple,
00120
Expression *predicate,
00121
bool &outputted );
00122
00123
void clear();
00124
00125
int size() {
return _buffer.size(); }
00126
00127
int dump(dynbuf& dst);
00128
00129
void init(
const dynbuf& src,
int nb_tuples);
00130
00131
protected:
00132
typedef list<BufferElement>
BufferList;
00133
BufferList _buffer;
00134
int _stream_id;
00135
JoinQBox *_qbox;
00136
00137
private:
00138 size_t _tuple_size;
00139
EvalContext _ctxt;
00140 };
00141
00142
friend class JoinBuffer;
00143
00144
00145
class TupleCountJoinBuffer :
public JoinBuffer
00146 {
00147
public:
00148 TupleCountJoinBuffer(
int stream_id,
JoinQBox *qbox,
00149 size_t buffer_size) :
00150 JoinBuffer(stream_id, qbox),
00151 _buffer_size(buffer_size)
00152 { }
00153
virtual void insertTuple(
const void *tuple_data);
00154
private:
00155 size_t _buffer_size;
00156 };
00157
00158
00159
class FieldJoinBuffer :
public JoinBuffer
00160 {
00161
public:
00162 FieldJoinBuffer(
int stream_id,
JoinQBox *qbox,
00163 ptr<Expression> field,
00164 size_t buffer_size) :
00165 JoinBuffer(stream_id, qbox), _field(field),
00166 _buffer_size(buffer_size)
00167 {
00168
if (!_field->is<int32>())
00169
Throw (AuroraTypingException,
00170
"Order by field value must be of type int.");
00171 }
00172
00173
virtual void insertTuple(
const void *tuple_data);
00174
private:
00175
int fieldValue(
const void *tuple_data);
00176
00177 ptr<Expression> _field;
00178 size_t _buffer_size;
00179
EvalContext _ctxt;
00180 };
00181
00182
00183 ptr<JoinBuffer> createBufferByParameter(
int stream_id);
00184
00185
00186 ptr<JoinBuffer> _buffer[2];
00187
00188 ptr<Expression> _predicate;
00189
00190
00191
bool _just_concat;
00192 vector<ptr<Expression> > _output_expressions;
00193
00194
00195
unsigned int _left_body_size, _right_body_size;
00196
00197
AURORA_DECLARE_QBOX(
JoinQBox,
"join");
00198 };
00199
00200
00201
00205 class JoinBoxState :
public AbstractBoxState
00206 {
00207
public:
00208 JoinBoxState() : _left_nb_tuples(0), _right_nb_tuples(0) {}
00209 JoinBoxState(
const dynbuf& left,
int nb_left,
const dynbuf& right,
int nb_right) :
00210 _left_buffer(left), _left_nb_tuples(nb_left),
00211 _right_buffer(right), _right_nb_tuples(nb_right)
00212 {
00213
00214
00215
00216
00217
00218 }
00219 ~JoinBoxState() {}
00220
00221
NMSTL_SERIAL_SUBCLASS(
JoinBoxState, AbstractBoxState, << _left_buffer << _left_nb_tuples
00222 << _right_buffer << _right_nb_tuples);
00223
00224 const dynbuf&
getLeft()
const {
return _left_buffer; }
00225 int getNbLeftTuples()
const {
return _left_nb_tuples; }
00226
00227 const dynbuf&
getRight()
const {
return _right_buffer; }
00228 int getNbRightTuples()
const {
return _right_nb_tuples; }
00229
00230
00231
private:
00232 dynbuf _left_buffer;
00233
int _left_nb_tuples;
00234
00235 dynbuf _right_buffer;
00236
int _right_nb_tuples;
00237
00238 };
00239
00240
00241 BOREALIS_NAMESPACE_END;
00242
00243
#endif // JOIN_QBOX_H