Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

JoinQBox.h

Go to the documentation of this file.
00001 #ifndef JOIN_QBOX_H 00002 #define JOIN_QBOX_H 00003 00004 #include "QBox.h" 00005 #include "Tuple.h" 00006 #include <list> 00007 00008 BOREALIS_NAMESPACE_BEGIN; 00009 00015 class JoinQBox : public QBox 00016 { 00017 00018 friend class JoinBoxState; 00019 00020 // SJoinQBox is a kind of JoinBox. It manipulates the _buffer directly 00021 friend class SJoinQBox; 00022 00023 protected: 00024 void setupImpl() throw (AuroraException); 00025 void initImpl() throw (AuroraException); 00026 void runImpl(QBoxInvocation&) throw (AuroraException); 00027 00028 // packing and unpacking states 00029 void setPendingBoxState(ptr<AbstractBoxState> packed_box); 00030 ptr<AbstractBoxState> packState(); 00031 void unpackState(ptr<AbstractBoxState> box_state); 00032 00033 private: 00035 // TODO: Consider tpeseafe enumeration. 00036 static const int LEFT; 00037 static const int RIGHT; 00038 00040 string tupleToString(int stream_id, const void *data) 00041 { 00042 return getInputDescription( stream_id ).tupleToString( data ); 00043 } 00044 00046 void runStream( QBoxInvocation &inv, int stream_id, bool &outputted ); 00047 00049 void emitTuple( EvalContext &ctxt ); 00050 00052 void produceDontEmitTuple( EvalContext &ctxt ); 00053 00055 bool _timing_out; 00056 Timestamp _timeout; 00057 00058 // Structure for storing a tuple in the buffer. 00059 class BufferElement 00060 { 00061 public: 00062 const void *getData() const 00063 { 00064 //return _tuple.getData(); 00065 return _data.data(); 00066 } 00067 00068 const Tuple &getTuple() const 00069 { 00070 if (_tuple.getData() != _data.data()) 00071 { FATAL << "Tuple points to wrong data"; 00072 } 00073 return _tuple; 00074 } 00075 00076 Timestamp getTimestamp() const 00077 { return _tuple.getTimestamp(); 00078 } 00079 00080 int getFieldValue() const 00081 { return _field_value; 00082 } 00083 00084 BufferElement( const void *data, 00085 size_t length, 00086 int field_value = 0 ) : 00087 00088 _data( (const char *)data, length ), 00089 //_tuple(Tuple::factory( data )), --> not data will go away!!! 00090 _tuple(Tuple::factory( _data)), 00091 _field_value( field_value ) {} 00092 00093 private: 00094 const string _data; 00095 const Tuple _tuple; 00096 const int _field_value; 00097 }; 00098 00099 00100 class JoinBuffer 00101 { 00102 public: 00103 JoinBuffer( int stream_id, JoinQBox *qbox ) : 00104 _stream_id( stream_id ), 00105 _qbox( qbox ), 00106 _tuple_size( _qbox->getInputDescription( _stream_id ).getSizeInBytes()), 00107 _ctxt( 2 ) 00108 {} 00109 00110 virtual ~JoinBuffer() {} 00111 00112 virtual void insertTuple( const void *tuple_data ); 00113 00114 void join( const void *other_tuple, 00115 Expression *predicate, 00116 bool &outputted ); 00117 00119 void joinNoEmit( const void *other_tuple, 00120 Expression *predicate, 00121 bool &outputted ); 00122 00123 void clear(); 00124 00125 int size() { return _buffer.size(); } 00126 00127 int dump(dynbuf& dst); // Copy tuples to buffer and return nb tuples copied 00128 00129 void init(const dynbuf& src, int nb_tuples); // Insert the tuples from this buffer into self 00130 00131 protected: 00132 typedef list<BufferElement> BufferList; 00133 BufferList _buffer; 00134 int _stream_id; 00135 JoinQBox *_qbox; 00136 00137 private: 00138 size_t _tuple_size; 00139 EvalContext _ctxt; 00140 }; 00141 00142 friend class JoinBuffer; 00143 00144 // Subclass for ORDER_BY_VALUES 00145 class TupleCountJoinBuffer : public JoinBuffer 00146 { 00147 public: 00148 TupleCountJoinBuffer(int stream_id, JoinQBox *qbox, 00149 size_t buffer_size) : 00150 JoinBuffer(stream_id, qbox), 00151 _buffer_size(buffer_size) 00152 { } 00153 virtual void insertTuple(const void *tuple_data); 00154 private: 00155 size_t _buffer_size; 00156 }; 00157 00158 // Subclass for ORDER_BY_FIELD 00159 class FieldJoinBuffer : public JoinBuffer 00160 { 00161 public: 00162 FieldJoinBuffer(int stream_id, JoinQBox *qbox, 00163 ptr<Expression> field, 00164 size_t buffer_size) : 00165 JoinBuffer(stream_id, qbox), _field(field), 00166 _buffer_size(buffer_size) 00167 { 00168 if (!_field->is<int32>()) 00169 Throw (AuroraTypingException, 00170 "Order by field value must be of type int."); 00171 } 00172 00173 virtual void insertTuple(const void *tuple_data); 00174 private: 00175 int fieldValue(const void *tuple_data); 00176 00177 ptr<Expression> _field; 00178 size_t _buffer_size; 00179 EvalContext _ctxt; 00180 }; 00181 00182 // Create "left" and "right" buffers. 00183 ptr<JoinBuffer> createBufferByParameter(int stream_id); 00184 00185 // Left and right join buffers. 00186 ptr<JoinBuffer> _buffer[2]; 00187 00188 ptr<Expression> _predicate; 00189 00190 // What kind of output. 00191 bool _just_concat; 00192 vector<ptr<Expression> > _output_expressions; 00193 00194 // Cached information about tuples 00195 unsigned int _left_body_size, _right_body_size; 00196 00197 AURORA_DECLARE_QBOX(JoinQBox, "join"); 00198 }; 00199 00200 00201 00205 class JoinBoxState : public AbstractBoxState 00206 { 00207 public: 00208 JoinBoxState() : _left_nb_tuples(0), _right_nb_tuples(0) {} 00209 JoinBoxState(const dynbuf& left, int nb_left, const dynbuf& right, int nb_right) : 00210 _left_buffer(left), _left_nb_tuples(nb_left), 00211 _right_buffer(right), _right_nb_tuples(nb_right) 00212 { 00213 // The copy constructor of a dynbuff makes another reference to 00214 // the other one's underlying storage. The basic dynbuf constructor 00215 // uses malloc so the dynbufs are always on the heap 00216 //memcpy(_left_buffer.data(),left->data(),left->max_length()); 00217 //memcpy(_right_buffer.data(),right->data(),right->max_length()); 00218 } 00219 ~JoinBoxState() {} 00220 00221 NMSTL_SERIAL_SUBCLASS(JoinBoxState, AbstractBoxState, << _left_buffer << _left_nb_tuples 00222 << _right_buffer << _right_nb_tuples); 00223 00224 const dynbuf& getLeft() const { return _left_buffer; } 00225 int getNbLeftTuples() const { return _left_nb_tuples; } 00226 00227 const dynbuf& getRight() const { return _right_buffer; } 00228 int getNbRightTuples() const { return _right_nb_tuples; } 00229 00230 00231 private: 00232 dynbuf _left_buffer; 00233 int _left_nb_tuples; 00234 00235 dynbuf _right_buffer; 00236 int _right_nb_tuples; 00237 00238 }; 00239 00240 00241 BOREALIS_NAMESPACE_END; 00242 00243 #endif // JOIN_QBOX_H

Generated on Fri Nov 12 15:15:21 2004 for Borealis by doxygen 1.3.8