00001 00002 #ifndef TUPLEQUEUE_H 00003 #define TUPLEQUEUE_H 00004 00005 #include "PagePool.h" 00006 #include "TuplesAvailableMailbox.h" 00007 00008 BOREALIS_NAMESPACE_BEGIN; 00009 00010 class Stream; 00011 class QBox; 00012 00015 class TupleQueueListener { 00016 public: 00018 virtual ~TupleQueueListener(); 00019 00021 virtual void notify(const TupleQueue& stream) = 0; 00022 00023 QBox* getBox() { return _box_to_run; } 00024 void setBox(QBox* box) { _box_to_run = box; } 00025 00026 void suspend() { _suspended = true; } 00027 void resume () { _suspended = false; } 00028 00029 protected: 00030 TupleQueueListener(); 00031 QBox* _box_to_run; 00032 bool _suspended; 00033 }; 00034 00035 00036 00040 class TupleQueue { 00041 public: 00044 TupleQueue(PagePool& pool, unsigned int tuple_size); 00045 00047 ~TupleQueue(); 00048 00050 void setStream(Stream *stream) { _stream = stream; } 00051 Stream *getStream() { return _stream; } 00052 00053 00054 /* 00055 Added for Connection point support 00056 */ 00057 // empties the tuple queue by freeing all pages and resetting pointers 00058 void empty(); 00059 00060 // puts the vector of tuples in the tuple queue 00061 void enqueueMultiple(vector<char*> tuples); 00062 00063 // returns all the tuples in the tuple queue as a vector of tuples 00064 vector<char*> dequeueAll(); 00065 00066 00068 class EnqIterator { 00069 public: 00071 EnqIterator() : _q(0) {} 00072 00075 void *tuple() const { 00076 assert(_q); 00077 return _q->_enq_location; 00078 } 00079 00082 EnqIterator& operator ++ (); 00083 00084 private: 00085 explicit EnqIterator(TupleQueue &q) : _q(&q) {} 00086 00089 EnqIterator operator ++ (int); 00090 00091 TupleQueue *_q; 00092 00093 friend class TupleQueue; 00094 }; 00095 friend class EnqIterator; 00096 00098 class DeqIterator { 00099 public: 00101 DeqIterator() : _q(0) {} 00102 00104 bool avail() const { 00105 assert(_q); 00106 00107 return _q->_deq_location != _q->_enq_location; 00108 } 00109 00112 const void *tuple() const { 00113 assert(avail()); 00114 return _q->_deq_location; 00115 } 00116 00118 DeqIterator& operator ++ (); 00119 00120 private: 00121 explicit DeqIterator(TupleQueue &q) : _q(&q) {} 00122 00124 DeqIterator operator ++ (int); 00125 00126 TupleQueue *_q; 00127 00128 friend class TupleQueue; 00129 }; 00130 friend class DeqIterator; 00131 00132 class SeekIterator { 00133 public: 00135 SeekIterator() : _q(0) {} 00136 00137 bool avail() const { 00138 assert(_q); 00139 return _seek_location != _q->_enq_location; 00140 } 00141 00143 // in SeekIterator this would not work right. Not sure if I can remove 00144 // this function. 00145 //bool avail() const { 00146 // assert(_q); 00147 00148 //return true; //_q->_deq_location != _q->_enq_location; 00149 //} 00150 00153 const void *tuple() const { 00154 return _seek_location; 00155 } 00156 00158 SeekIterator& operator ++ (); 00159 SeekIterator& operator -- (); 00160 00161 private: 00162 explicit SeekIterator(TupleQueue &q) : _q(&q) 00163 { 00164 _seek_location = _q->_deq_location; 00165 _seek_page = _q->_deq_page; 00166 00167 size_t page_size = _q->_pool.page_size(); 00168 size_t tuple_size = _q->_tuple_size; 00169 int tuples_per_page = (page_size - sizeof(PageHdr)) / tuple_size; 00170 _offset_last_tuple_on_page = sizeof(PageHdr) + tuple_size*(tuples_per_page-1); 00171 00172 } 00173 00175 SeekIterator operator ++ (int); 00176 00178 SeekIterator operator --(int); 00179 00180 TupleQueue *_q; 00181 00182 // this is where our seek iterator is pointing 00183 void *_seek_location; 00185 void *_seek_page; 00186 00187 size_t _offset_last_tuple_on_page; 00188 friend class TupleQueue; 00189 }; 00190 00194 class RSeekIterator { 00195 public: 00197 RSeekIterator() : _q(0) {} 00198 00199 bool avail() const { 00200 assert(_q); 00201 return _seek_location != _q->_deq_location; 00202 } 00203 00206 const void *tuple() const { 00207 return _seek_location; 00208 } 00209 00211 RSeekIterator& operator ++ (); 00212 RSeekIterator& operator -- (); 00213 00214 private: 00215 explicit RSeekIterator(TupleQueue &q) : _q(&q) { 00216 00217 size_t page_size = _q->_pool.page_size(); 00218 size_t tuple_size = _q->_tuple_size; 00219 int tuples_per_page = (page_size - sizeof(PageHdr)) / tuple_size; 00220 _offset_last_tuple_on_page = sizeof(PageHdr) + tuple_size*(tuples_per_page-1); 00221 00222 _seek_location = (char*)_q->_enq_location - tuple_size; 00223 _seek_page = _q->_enq_page; 00224 00225 if ( (char*)_seek_location < (char*)_seek_page + sizeof(PageHdr)) { 00226 if (_seek_page != _q->_deq_page) { 00227 _seek_page = ((PageHdr*)_seek_page)->prev; 00228 _seek_location = (char*)_seek_page + _offset_last_tuple_on_page; 00229 } else { // Nothing there 00230 _seek_page = _q->_deq_page; 00231 _seek_location = _q->_deq_location; 00232 } 00233 } 00234 } 00235 00237 RSeekIterator operator ++ (int); 00238 00240 RSeekIterator operator -- (int); 00241 00242 TupleQueue *_q; 00243 00244 // this is where our seek iterator is pointing 00245 void *_seek_location; 00247 void *_seek_page; 00248 00249 size_t _offset_last_tuple_on_page; 00250 friend class TupleQueue; 00251 }; 00252 00253 RSeekIterator rseek_iterator() { return RSeekIterator(*this); } 00254 00255 SeekIterator seek_iterator() { return SeekIterator(*this); } 00256 00260 EnqIterator enq_iterator() { return EnqIterator(*this); } 00261 00265 DeqIterator deq_iterator() { return DeqIterator(*this); } 00266 00268 void dump() const; 00269 00271 int size() const { return _enq_count - _deq_count; } 00272 00278 size_t getNumTuplesDequeued() const { return _deq_count; } 00279 00286 size_t getNumTuplesEnqueued() const { return _enq_count; } 00287 00290 void addListener(TupleQueueListener* listener) { 00291 _listeners.push_back(listener); 00292 } 00293 00295 void removeListener(TupleQueueListener* listener) { 00296 erase_in(_listeners, listener); 00297 } 00298 00300 unsigned int getTupleSize() const { 00301 return _tuple_size; 00302 } 00303 00309 void setEnqNotifyTarget(TuplesAvailableMailbox * pMailbox); 00310 00313 void notifyEnq(); 00314 00315 private: 00316 TuplesAvailableMailbox * _pMailbox; 00317 00318 struct PageHdr { 00319 PageHdr *next; 00320 PageHdr *prev; 00321 }; 00322 00323 PagePool &_pool; 00324 unsigned int _tuple_size; 00325 unsigned int _tuples_per_page; 00326 00327 unsigned int _enq_count; 00328 unsigned int _deq_count; 00329 00331 PageHdr *_enq_page; 00332 00335 void *_enq_location; 00336 00338 PageHdr *_deq_page; 00339 00342 void *_deq_location; 00343 00345 Stream *_stream; 00346 00347 typedef vector<TupleQueueListener*> Listeners; 00348 Listeners _listeners; 00349 }; 00350 00351 inline TupleQueue::EnqIterator& TupleQueue::EnqIterator::operator ++ () 00352 { 00353 assert(_q); 00354 00355 void *next_enq_location = (char*)_q->_enq_location + _q->_tuple_size; 00356 if ((char*)next_enq_location + _q->_tuple_size > (char*)_q->_enq_page + _q->_pool.page_size()) { 00357 // Allocate new page. Note that the DeqIterator 00358 // cannot ever advance to the new page until we set 00359 // _enq_location (at the very end of this method) 00360 00361 PageHdr *new_page = (PageHdr*)_q->_pool.alloc(); 00362 new_page->next = 0; 00363 00364 new_page->prev = _q->_enq_page; 00365 _q->_enq_page->next = new_page; 00366 _q->_enq_page = new_page; 00367 00368 next_enq_location = (char*)new_page + sizeof(PageHdr); 00369 } 00370 00371 _q->_enq_location = next_enq_location; 00372 ++(_q->_enq_count); 00373 return *this; 00374 } 00375 00376 inline TupleQueue::DeqIterator& TupleQueue::DeqIterator::operator ++ () 00377 { 00378 assert(avail()); 00379 00380 void *next_deq_location = (char*)_q->_deq_location + _q->_tuple_size; 00381 if ((char*)next_deq_location + _q->_tuple_size > (char*)_q->_deq_page + _q->_pool.page_size()) { 00382 // Advance to next page; can free this page 00383 PageHdr *next_page = _q->_deq_page->next; 00384 _q->_pool.free(_q->_deq_page); 00385 _q->_deq_page = next_page; 00386 00387 next_deq_location = (char*)next_page + sizeof(PageHdr); 00388 } 00389 00390 _q->_deq_location = next_deq_location; 00391 ++(_q->_deq_count); 00392 return *this; 00393 } 00394 00396 inline TupleQueue::SeekIterator& TupleQueue::SeekIterator::operator ++ () 00397 { 00398 void *next_seek_location = (char*)_seek_location + _q->_tuple_size; 00399 if ((char*)next_seek_location + _q->_tuple_size > (char*)_seek_page + _q->_pool.page_size()) { 00400 _seek_page = ((PageHdr *)_seek_page)->next; 00401 next_seek_location = (char*)_seek_page + sizeof(PageHdr); 00402 } 00403 00404 _seek_location = next_seek_location; 00405 return *this; 00406 } 00407 00409 inline TupleQueue::SeekIterator& TupleQueue::SeekIterator::operator -- () 00410 { 00411 void *next_seek_location = (char*)_seek_location - _q->_tuple_size; 00412 if ((char*)next_seek_location < (char*)_seek_page + sizeof(PageHdr) ) { 00413 _seek_page = ((PageHdr *)_seek_page)->prev; 00414 next_seek_location = (char*)_seek_page + _offset_last_tuple_on_page; 00415 } 00416 00417 _seek_location = next_seek_location; 00418 return *this; 00419 } 00420 00422 inline TupleQueue::RSeekIterator& TupleQueue::RSeekIterator::operator ++ () 00423 { 00424 void *next_seek_location = (char*)_seek_location - _q->_tuple_size; 00425 if ((char*)next_seek_location < (char*)_seek_page + sizeof(PageHdr) ) { 00426 _seek_page = ((PageHdr *)_seek_page)->prev; 00427 next_seek_location = (char*)_seek_page + _offset_last_tuple_on_page; 00428 } 00429 00430 _seek_location = next_seek_location; 00431 return *this; 00432 } 00433 00435 inline TupleQueue::RSeekIterator& TupleQueue::RSeekIterator::operator -- () 00436 { 00437 void *next_seek_location = (char*)_seek_location + _q->_tuple_size; 00438 if ((char*)next_seek_location + _q->_tuple_size > (char*)_seek_page + _q->_pool.page_size()) { 00439 _seek_page = ((PageHdr *)_seek_page)->next; 00440 next_seek_location = (char*)_seek_page + sizeof(PageHdr); 00441 } 00442 00443 _seek_location = next_seek_location; 00444 return *this; 00445 } 00446 00447 BOREALIS_NAMESPACE_END; 00448 00449 #endif