Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

QBox.h

Go to the documentation of this file.
00001 #ifndef QBOX_H 00002 #define QBOX_H 00003 00004 #include <vector> 00005 #include <deque> 00006 #include <sstream> 00007 00008 #include "Exceptions.h" 00009 #include "TupleQueue.h" 00010 #include <nmstl_util.h> 00011 #include "TupleDescription.h" 00012 #include "Stream.h" 00013 #include "Registry.h" 00014 #include "BoxStatsSample.h" 00015 #include "PtMutex.h" 00016 #include "LockHolder.h" 00017 #include "Scheduler.h" 00018 #include "Params.h" 00019 #include "LockSet.h" 00020 #include "Tuple.h" 00021 #include "CPView.h" 00022 00023 #include "xercesDomFwd.h" 00024 #include <sys/time.h> 00025 00026 #include "QBoxState.h" 00027 00028 BOREALIS_NAMESPACE_BEGIN; 00029 00030 class AuroraNode; 00031 class Table; 00032 class QBoxInvocation; 00033 00095 class QBox { 00096 public: 00099 static string getBoxType(const DOMElement*) throw (AuroraException); 00100 00101 // Instantiates a QBox from a DOMElement. Throws an 00102 // AuroraBadEntityException if the entity's type is unknown, or 00103 // throws other exceptions if instantiation fails for some other 00104 // reason. 00105 // 'node' is the AuroraNode whose query netowrk this QBox is part of. 00106 static QBox *instantiate(const DOMElement *) throw (AuroraException); 00107 00109 virtual ~QBox(); 00110 00114 const DOMElement *getCatalogElement() const { 00115 ASSERT(_catalog_element); 00116 return _catalog_element; 00117 } 00118 00120 Table *getTable() const { 00121 return _table; 00122 } 00123 00125 void setTable(Table *table) { 00126 assert(_state == CONSTRUCTED); 00127 _table = table; 00128 } 00129 00133 const TupleDescription& getInputDescription(unsigned int i) const { 00134 assert(i < _in_desc.size()); 00135 return *(_in_desc[i]); 00136 } 00137 00140 void setInputDescription(unsigned int i, ptr<TupleDescription> desc) { 00141 assert(_state == CONSTRUCTED); 00142 assert(i < 1000); 00143 if (_in_desc.size() <= i) 00144 _in_desc.resize(i + 1); 00145 _in_desc[i] = desc; 00146 } 00147 00150 void setInputDescription(unsigned int i, const TupleDescription& desc) { 00151 // copies the tupledescription 00152 setInputDescription(i, ptr<TupleDescription>(new TupleDescription(desc))); 00153 } 00154 00158 const TupleDescription& getOutputDescription(unsigned int i) const { 00159 assert(i < _out_desc.size()); 00160 return *(_out_desc[i]); 00161 } 00162 00165 void setOutputDescription(unsigned int i, ptr<TupleDescription> desc) { 00166 assert(_state == CONSTRUCTED); 00167 assert(i < 1000); 00168 if (_out_desc.size() <= i) 00169 _out_desc.resize(i + 1); 00170 _out_desc[i] = desc; 00171 } 00172 00175 void setOutputDescription(unsigned int i, const TupleDescription& desc) { 00176 // copies the tupledescription 00177 setOutputDescription(i, ptr<TupleDescription>(new TupleDescription(desc))); 00178 } 00179 00182 Stream& getInput(unsigned int n) const { 00183 assert(n < _inputs.size()); 00184 return *(_inputs[n]); 00185 } 00186 00188 void setInput(unsigned int i, Stream *desc) { 00189 assert(_state == CONSTRUCTED || _state == SETUP); 00190 assert(i < 1000); 00191 if (_inputs.size() <= i) 00192 _inputs.resize(i + 1); 00193 _inputs[i] = desc; 00194 } 00195 00198 Stream& getOutput(unsigned int n) const { 00199 assert(n < _outputs.size()); 00200 return *(_outputs[n]); 00201 } 00202 00205 void setCPView(ptr<CPView> cpview) { 00206 _cpview_on_input = cpview; 00207 } 00208 00210 void setOutput(unsigned int i, Stream *desc); 00211 00213 unsigned int getNumInputs() { return _in_desc.size(); } 00214 00216 unsigned int getNumOutputs() { return _out_desc.size(); } 00217 00219 string getName() const { return _name; } 00220 00222 string getType() const { return _type; } 00223 00225 string as_string() const; 00226 00229 void setup(AuroraNode&, const DOMElement* elt) throw (AuroraException); 00230 00233 void init(PagePool& pool) throw (AuroraException); 00234 00235 00236 // Queue packing is common to all boxes. 00237 void setPendingQueueState(ptr<QueueState> packed_queues) { _pending_queue_state = packed_queues; } 00238 void unpackQueue(ptr<QueueState> packed_queue); // Added by Magda 00239 ptr<QueueState> packQueues() { return ptr<QueueState>(new QueueState(_inq)); }; 00240 00241 // State packing is box specific. 00242 virtual void setPendingBoxState(ptr<AbstractBoxState> packed_box) { _pending_box_state = packed_box; } 00243 virtual void unpackState(ptr<AbstractBoxState> box_state) {}; 00244 virtual ptr<AbstractBoxState> packState() { return ptr<AbstractBoxState>(); }; 00245 00246 00247 #ifdef AURORA_RUNTIME_STATS 00248 00252 const deque<BoxStatsSample> & lockBoxStatsHistory() { 00253 _boxStatsHistoryLock.lock(); 00254 return _boxStatsHistory; 00255 } 00256 00257 void unlockBoxStatsHistory() { 00258 _boxStatsHistoryLock.unlock(); 00259 } 00260 00261 static string dumpBoxStatsHistory(const deque<BoxStatsSample> & h); 00262 void run(QBoxInvocation& inv); 00263 00264 #else 00265 00267 void run(QBoxInvocation& inv) { 00268 runImpl(inv); 00269 } 00270 00271 #endif 00272 00275 void scheduleRun() { getInput(0).notifyEnq(); } 00276 00279 TupleQueue *getInputQueue(unsigned int input) { return _inq[input].get(); } 00280 00281 00282 static const Params::Req PARAM_NOT_REQUIRED = Params::NOT_REQUIRED; 00283 static const Params::Req PARAM_REQUIRED = Params::REQUIRED; 00284 static const Params::Req PARAM_NON_EMPTY = Params::NON_EMPTY; 00285 00286 // Return a parameter. See Params::param(). 00287 string param(string name, Params::Req req = PARAM_NOT_REQUIRED) const 00288 throw (AuroraBadEntityException) 00289 { return _params.param(name, req); } 00290 00291 // Return a parameter. See Params::param(). 00292 string param(string name, string def) const 00293 { return _params.param(name, def); } 00294 00295 // Return a parameter. See Params::typedParam(). 00296 template <typename T> 00297 bool typedParam(string name, T& value, Params::Req req = PARAM_NOT_REQUIRED) const 00298 throw (AuroraBadEntityException) 00299 { return _params.typedParam(name, value, req); } 00300 00301 const Params& getParams() { return _params; } 00302 00303 protected: 00304 // We dequeue from TupleQueues, and enqueue to Streams. 00305 typedef TupleQueue::DeqIterator DeqIterator; 00306 typedef Stream::EnqIterator EnqIterator; 00307 typedef TupleQueue::SeekIterator SeekIterator; 00308 00309 QBox(); 00310 00311 // Called by runImpl(); returns a DeqIterator for the ith input port. 00312 DeqIterator deq(unsigned int index) { return _inq[index]->deq_iterator(); } 00313 00314 // Called by runImpl(); returns a DeqIterator for the ith input port. 00315 SeekIterator seek(unsigned int index) { return _inq[index]->seek_iterator(); } 00316 00317 // Called by runImpl(); returns an EnqIterator for the ith input port. 00318 EnqIterator enq(unsigned int index) { return _outputs[index]->enq_iterator(); } 00319 00320 // METHODS TO IMPLEMENT IN SUBCLASSES: 00321 00322 // Do preparing 00323 virtual void setupImpl() = 0; 00324 00325 // Do whatever's necessary to init the box 00326 virtual void initImpl() {} 00327 00328 // Run the thing 00329 virtual void runImpl(QBoxInvocation&) {} 00330 00331 ptr<LockSet> getLockSet(string name); 00332 00333 ptr<AbstractBoxState> _pending_box_state; 00334 00335 AuroraNode *_node; //moved away from private for now ... 00336 00337 // CPView on the input TupleQueue - set when that cpview is created 00338 ptr<CPView> _cpview_on_input; 00339 00340 00341 private: 00342 00343 void unpackQueue(Stream& stream, TupleQueue::EnqIterator enq); 00344 00345 enum State { 00346 CONSTRUCTED, 00347 SETUP, 00348 INITED, 00349 DEAD 00350 }; 00351 State _state; 00352 00353 string _name; 00354 string _type; 00355 00356 vector< ptr<TupleDescription> > _in_desc, _out_desc; 00357 vector<Stream*> _inputs, _outputs; 00358 00359 Params _params; 00360 00361 const DOMElement *_catalog_element; 00362 00363 vector< ptr<TupleQueue> > _inq; 00364 00365 ptr<QueueState> _pending_queue_state; 00366 00367 Table *_table; 00368 00369 ptr<Scheduler::BoxData> _sched_data; 00370 00371 #ifdef AURORA_RUNTIME_STATS 00372 // The only reason we declare this here rather than in the run() method 00373 // is so that we don't have to recreate the vector every time the box 00374 // is executed. 00375 // 00376 // The values in this vector are only meaningful to each individual invocation 00377 // of this box's run() method. 00378 vector<size_t> _tuplesEnqBeforeRun, _tuplesEnqDuringRun; 00379 00380 // Contains the stats sample history for this box. It will have the n-most 00381 // recent sample, where 'n' is _statsHistoryLength. No two elements in this deque 00382 // will have the same _tv_sec value. The deque is sorted by _tv_sec value such that the 00383 // front() of the list has the most recent sample (the sample with the highest _tv_sec 00384 // value). 00385 deque<BoxStatsSample> _boxStatsHistory; 00386 00387 // Any thread that's accessign _boxStatsHistory must hold this lock. 00388 PtMutex _boxStatsHistoryLock; 00389 00390 // This is given by the config file value "server"/"StatsHistoryLength". We cache the 00391 // value here to avoid extra function calls that would otherwise occur during each 00392 // invocation of the run() method. 00393 size_t _statsHistoryLength; 00394 #endif 00395 00396 friend class QBoxInvocation; 00397 friend class Scheduler; // for _sched_data 00398 }; 00399 00400 #define AURORA_DECLARE_QBOX(ClassName, BoxName) AURORA_DECLARE_REG_CLASS(QBox, ClassName) 00401 #define AURORA_DEFINE_QBOX(ClassName, BoxName) \ 00402 AURORA_DEFINE_REG_CLASS_WITH_KEY(QBox, ClassName, BoxName) 00403 00404 typedef Registry<QBox> QBoxRegistry; 00405 00406 00407 00408 00413 class QBoxInvocation { 00414 public: 00415 QBoxInvocation() : _tuples_dequeued( 0 ), _box(0), _end_time(0), _next_tick(1){} 00416 00419 unsigned int getMaxTuplesToDequeue(unsigned int input_port) const { 00420 return numeric_limits<int>::max(); 00421 } 00422 00425 bool continueDequeueOn(QBox::DeqIterator deq, unsigned int input_port) const; 00426 00427 // continueDequeueOn does not seem to do what it claims to do 00428 // i.e. it uses "processedsofar" from TupleQueue which is never reset 00429 // thus setting a _tuple_limit results in stalled execution once the 00430 // limit is reached. This is to be used to compare to _tuple_limit 00431 // instead of the current version. 00432 // Therefore the boxes should maintain this counter as they run. 00433 // It is also techincally wrong since there is a single counter, thus 00434 // single limit. but we migth want another controlling mechanism so 00435 // I am not going to make it too complicated now. 00436 mutable unsigned int _tuples_dequeued; 00437 00438 // Public members for now 00439 QBox* _box; 00440 00441 vector<unsigned int> _tuple_limit; 00442 unsigned long _end_time; 00443 00444 mutable unsigned long _last_ticks; 00445 mutable unsigned int _next_tick; 00446 00447 unsigned long ticks() const { 00448 if (!--_next_tick) { 00449 _next_tick = CALLS_BETWEEN_TICK; 00450 _last_ticks = Scheduler::ticks(); 00451 } 00452 00453 return _last_ticks; 00454 } 00455 00456 static const int CALLS_BETWEEN_TICK = 1; 00457 }; 00458 00459 inline bool QBoxInvocation::continueDequeueOn(QBox::DeqIterator deq, unsigned int input_port) const { 00460 if (!deq.avail()) 00461 return false; 00462 00463 _tuples_dequeued++; 00464 00465 //cout << " Box is here, t_sz " << _tuple_limit.size() << " num dequeued " << _box->getInputQueue(input_port) << " tuple limit " << ( _tuple_limit.size() ? _tuple_limit[input_port] : 0 ) << " already dequeued " << _tuples_dequeued << endl; 00466 00467 if (_box && _tuple_limit.size() && 00468 // the explanation of _tuples_dequeued for this commented line. 00469 //_box->getInputQueue(input_port)->getNumTuplesDequeued() >= _tuple_limit[input_port]) 00470 _tuples_dequeued >= _tuple_limit[input_port] ) 00471 { 00472 _tuples_dequeued = 0; 00473 //cout << endl << endl << endl << " RETURN FALSE " << endl << endl; 00474 return false; 00475 } 00476 if (_end_time && ticks() >= _end_time) 00477 return false; 00478 00479 return true; 00480 } 00481 00482 // Due to circular dependency in QBox, Scheduler (ick) 00483 inline void Scheduler::setBoxData(QBox& box, ptr<BoxData> data) { 00484 box._sched_data = data; 00485 } 00486 00487 inline Scheduler::BoxData* Scheduler::getBoxData(const QBox& box) const { 00488 return box._sched_data.get(); 00489 } 00490 00491 BOREALIS_NAMESPACE_END; 00492 00493 #endif

Generated on Fri Nov 12 15:15:21 2004 for Borealis by doxygen 1.3.8