00001
#ifndef QBOX_H
00002
#define QBOX_H
00003
00004
#include <vector>
00005
#include <deque>
00006
#include <sstream>
00007
00008
#include "Exceptions.h"
00009
#include "TupleQueue.h"
00010
#include <nmstl_util.h>
00011
#include "TupleDescription.h"
00012
#include "Stream.h"
00013
#include "Registry.h"
00014
#include "BoxStatsSample.h"
00015
#include "PtMutex.h"
00016
#include "LockHolder.h"
00017
#include "Scheduler.h"
00018
#include "Params.h"
00019
#include "LockSet.h"
00020
#include "Tuple.h"
00021
#include "CPView.h"
00022
00023
#include "xercesDomFwd.h"
00024
#include <sys/time.h>
00025
00026
#include "QBoxState.h"
00027
00028 BOREALIS_NAMESPACE_BEGIN;
00029
00030
class AuroraNode;
00031
class Table;
00032
class QBoxInvocation;
00033
00095 class QBox {
00096
public:
00099
static string
getBoxType(
const DOMElement*)
throw (
AuroraException);
00100
00101
00102
00103
00104
00105
00106
static QBox *
instantiate(
const DOMElement *)
throw (
AuroraException);
00107
00109
virtual ~QBox();
00110
00114 const DOMElement *
getCatalogElement()
const {
00115 ASSERT(_catalog_element);
00116
return _catalog_element;
00117 }
00118
00120 Table *
getTable()
const {
00121
return _table;
00122 }
00123
00125 void setTable(
Table *table) {
00126 assert(_state == CONSTRUCTED);
00127 _table = table;
00128 }
00129
00133 const TupleDescription&
getInputDescription(
unsigned int i)
const {
00134 assert(i < _in_desc.size());
00135
return *(_in_desc[i]);
00136 }
00137
00140 void setInputDescription(
unsigned int i, ptr<TupleDescription> desc) {
00141 assert(_state == CONSTRUCTED);
00142 assert(i < 1000);
00143
if (_in_desc.size() <= i)
00144 _in_desc.resize(i + 1);
00145 _in_desc[i] = desc;
00146 }
00147
00150 void setInputDescription(
unsigned int i,
const TupleDescription& desc) {
00151
00152
setInputDescription(i, ptr<TupleDescription>(
new TupleDescription(desc)));
00153 }
00154
00158 const TupleDescription&
getOutputDescription(
unsigned int i)
const {
00159 assert(i < _out_desc.size());
00160
return *(_out_desc[i]);
00161 }
00162
00165 void setOutputDescription(
unsigned int i, ptr<TupleDescription> desc) {
00166 assert(_state == CONSTRUCTED);
00167 assert(i < 1000);
00168
if (_out_desc.size() <= i)
00169 _out_desc.resize(i + 1);
00170 _out_desc[i] = desc;
00171 }
00172
00175 void setOutputDescription(
unsigned int i,
const TupleDescription& desc) {
00176
00177
setOutputDescription(i, ptr<TupleDescription>(
new TupleDescription(desc)));
00178 }
00179
00182 Stream&
getInput(
unsigned int n)
const {
00183 assert(n < _inputs.size());
00184
return *(_inputs[n]);
00185 }
00186
00188 void setInput(
unsigned int i,
Stream *desc) {
00189 assert(_state == CONSTRUCTED || _state == SETUP);
00190 assert(i < 1000);
00191
if (_inputs.size() <= i)
00192 _inputs.resize(i + 1);
00193 _inputs[i] = desc;
00194 }
00195
00198 Stream&
getOutput(
unsigned int n)
const {
00199 assert(n < _outputs.size());
00200
return *(_outputs[n]);
00201 }
00202
00205 void setCPView(ptr<CPView> cpview) {
00206
_cpview_on_input = cpview;
00207 }
00208
00210
void setOutput(
unsigned int i,
Stream *desc);
00211
00213 unsigned int getNumInputs() {
return _in_desc.size(); }
00214
00216 unsigned int getNumOutputs() {
return _out_desc.size(); }
00217
00219 string
getName()
const {
return _name; }
00220
00222 string
getType()
const {
return _type; }
00223
00225 string
as_string() const;
00226
00229
void setup(
AuroraNode&, const DOMElement* elt) throw (
AuroraException);
00230
00233
void init(
PagePool& pool) throw (AuroraException);
00234
00235
00236
00237 void setPendingQueueState(ptr<
QueueState> packed_queues) { _pending_queue_state = packed_queues; }
00238
void unpackQueue(ptr<QueueState> packed_queue);
00239 ptr<QueueState>
packQueues() {
return ptr<QueueState>(
new QueueState(_inq)); };
00240
00241
00242 virtual void setPendingBoxState(ptr<AbstractBoxState> packed_box) {
_pending_box_state = packed_box; }
00243 virtual void unpackState(ptr<AbstractBoxState> box_state) {};
00244 virtual ptr<AbstractBoxState>
packState() {
return ptr<AbstractBoxState>(); };
00245
00246
00247
#ifdef AURORA_RUNTIME_STATS
00248
00252
const deque<BoxStatsSample> & lockBoxStatsHistory() {
00253 _boxStatsHistoryLock.lock();
00254
return _boxStatsHistory;
00255 }
00256
00257
void unlockBoxStatsHistory() {
00258 _boxStatsHistoryLock.unlock();
00259 }
00260
00261
static string dumpBoxStatsHistory(
const deque<BoxStatsSample> & h);
00262
void run(
QBoxInvocation& inv);
00263
00264
#else
00265
00267 void run(
QBoxInvocation& inv) {
00268
runImpl(inv);
00269 }
00270
00271
#endif
00272
00275 void scheduleRun() {
getInput(0).
notifyEnq(); }
00276
00279 TupleQueue *
getInputQueue(
unsigned int input) {
return _inq[input].get(); }
00280
00281
00282 static const Params::Req
PARAM_NOT_REQUIRED = Params::NOT_REQUIRED;
00283 static const Params::Req
PARAM_REQUIRED = Params::REQUIRED;
00284 static const Params::Req
PARAM_NON_EMPTY = Params::NON_EMPTY;
00285
00286
00287 string
param(string name, Params::Req req = PARAM_NOT_REQUIRED)
const
00288
throw (AuroraBadEntityException)
00289 {
return _params.
param(name, req); }
00290
00291
00292 string
param(string name, string def)
const
00293
{
return _params.
param(name, def); }
00294
00295
00296
template <
typename T>
00297 bool typedParam(string name, T& value, Params::Req req = PARAM_NOT_REQUIRED)
const
00298
throw (AuroraBadEntityException)
00299 {
return _params.
typedParam(name, value, req); }
00300
00301 const Params&
getParams() {
return _params; }
00302
00303
protected:
00304
00305 typedef TupleQueue::DeqIterator DeqIterator;
00306 typedef Stream::EnqIterator EnqIterator;
00307 typedef TupleQueue::SeekIterator SeekIterator;
00308
00309
QBox();
00310
00311
00312 DeqIterator deq(
unsigned int index) {
return _inq[index]->deq_iterator(); }
00313
00314
00315 SeekIterator seek(
unsigned int index) {
return _inq[index]->seek_iterator(); }
00316
00317
00318 EnqIterator enq(
unsigned int index) {
return _outputs[index]->enq_iterator(); }
00319
00320
00321
00322
00323
virtual void setupImpl() = 0;
00324
00325
00326 virtual void initImpl() {}
00327
00328
00329 virtual void runImpl(
QBoxInvocation&) {}
00330
00331 ptr<LockSet>
getLockSet(string name);
00332
00333 ptr<AbstractBoxState>
_pending_box_state;
00334
00335 AuroraNode *
_node;
00336
00337
00338 ptr<CPView>
_cpview_on_input;
00339
00340
00341
private:
00342
00343
void unpackQueue(
Stream& stream,
TupleQueue::EnqIterator enq);
00344
00345
enum State {
00346 CONSTRUCTED,
00347 SETUP,
00348 INITED,
00349 DEAD
00350 };
00351 State _state;
00352
00353 string _name;
00354 string _type;
00355
00356 vector< ptr<TupleDescription> > _in_desc, _out_desc;
00357 vector<Stream*> _inputs, _outputs;
00358
00359
Params _params;
00360
00361
const DOMElement *_catalog_element;
00362
00363 vector< ptr<TupleQueue> > _inq;
00364
00365 ptr<QueueState> _pending_queue_state;
00366
00367
Table *_table;
00368
00369 ptr<Scheduler::BoxData> _sched_data;
00370
00371
#ifdef AURORA_RUNTIME_STATS
00372
00373
00374
00375
00376
00377
00378 vector<size_t> _tuplesEnqBeforeRun, _tuplesEnqDuringRun;
00379
00380
00381
00382
00383
00384
00385 deque<BoxStatsSample> _boxStatsHistory;
00386
00387
00388
PtMutex _boxStatsHistoryLock;
00389
00390
00391
00392
00393 size_t _statsHistoryLength;
00394
#endif
00395
00396
friend class QBoxInvocation;
00397
friend class Scheduler;
00398 };
00399
00400 #define AURORA_DECLARE_QBOX(ClassName, BoxName) AURORA_DECLARE_REG_CLASS(QBox, ClassName)
00401 #define AURORA_DEFINE_QBOX(ClassName, BoxName) \
00402
AURORA_DEFINE_REG_CLASS_WITH_KEY(QBox, ClassName, BoxName)
00403
00404 typedef Registry<QBox> QBoxRegistry;
00405
00406
00407
00408
00413 class QBoxInvocation {
00414
public:
00415 QBoxInvocation() :
_tuples_dequeued( 0 ),
_box(0),
_end_time(0),
_next_tick(1){}
00416
00419 unsigned int getMaxTuplesToDequeue(
unsigned int input_port)
const {
00420
return numeric_limits<int>::max();
00421 }
00422
00425
bool continueDequeueOn(
QBox::DeqIterator deq,
unsigned int input_port)
const;
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436 mutable unsigned int _tuples_dequeued;
00437
00438
00439 QBox*
_box;
00440
00441 vector<unsigned int>
_tuple_limit;
00442 unsigned long _end_time;
00443
00444 mutable unsigned long _last_ticks;
00445 mutable unsigned int _next_tick;
00446
00447 unsigned long ticks()
const {
00448
if (!--
_next_tick) {
00449
_next_tick =
CALLS_BETWEEN_TICK;
00450
_last_ticks =
Scheduler::ticks();
00451 }
00452
00453
return _last_ticks;
00454 }
00455
00456 static const int CALLS_BETWEEN_TICK = 1;
00457 };
00458
00459 inline bool QBoxInvocation::continueDequeueOn(
QBox::DeqIterator deq,
unsigned int input_port)
const {
00460
if (!deq.
avail())
00461
return false;
00462
00463
_tuples_dequeued++;
00464
00465
00466
00467
if (
_box &&
_tuple_limit.size() &&
00468
00469
00470
_tuples_dequeued >=
_tuple_limit[input_port] )
00471 {
00472
_tuples_dequeued = 0;
00473
00474
return false;
00475 }
00476
if (
_end_time &&
ticks() >=
_end_time)
00477
return false;
00478
00479
return true;
00480 }
00481
00482
00483 inline void Scheduler::setBoxData(
QBox& box, ptr<BoxData> data) {
00484 box.
_sched_data = data;
00485 }
00486
00487 inline Scheduler::BoxData*
Scheduler::getBoxData(
const QBox& box)
const {
00488
return box.
_sched_data.get();
00489 }
00490
00491 BOREALIS_NAMESPACE_END;
00492
00493
#endif