00001
#ifndef STREAM_H
00002
#define STREAM_H
00003
00004
#include <vector>
00005
#include <deque>
00006
#include <sstream>
00007
#include <sys/time.h>
00008
#include "common.h"
00009
00010
using namespace std;
00011
00012
#include "LockHolder.h"
00013
#include "TupleQueue.h"
00014
#include "TupleDescription.h"
00015
#include "TuplesAvailableMailbox.h"
00016
#include "QBoxInputPort.h"
00017
#include "QBoxOutputPort.h"
00018
#include "StreamStatsSample.h"
00019
#include "Params.h"
00020
#include "CpuStats.h"
00021
#include "Expression.h"
00022
#include "common.h"
00023
00024 BOREALIS_NAMESPACE_BEGIN;
00025
00026
class AuroraNode;
00027
class QBox;
00028
00029
class CP;
00030
00037 class Stream
00038 {
00039
public:
00041
Stream(string name,
const TupleDescription& desc,
AuroraNode& node,
00042
const Params& params =
Params())
00043
throw (
AuroraException);
00044
00046
~Stream();
00047
00049 string
getName()
const {
return _name; }
00050
00053 size_t
getNumTuplesEnqueued()
const {
return _numTuplesEnqueued; }
00054
00057
00058
00060 const TupleDescription&
getTupleDescription()
const {
return _desc; }
00061
00065
void addQueue(
TupleQueue* q);
00066
00068
void removeQueue(
TupleQueue*);
00069
00071 const vector<TupleQueue*>&
getQueues() {
return _queues; }
00072
00076
void addDestPort(
QBoxInputPort port);
00077
00079
void removeDestPort(
QBoxInputPort port);
00080
00082 const vector<QBoxInputPort>&
getDestPorts() {
return _dest_ports; }
00083
00088
void setSourcePort(
QBoxOutputPort port);
00089
00094 QBoxOutputPort getSourcePort() {
return _source_port; }
00095
00096
#ifdef AURORA_RUNTIME_STATS
00103
void updateStats()
00104
{
00105
if (_statsUnreportedEnq == 0)
00106 {
00107
return;
00108 }
00109
00110 timeval tm;
00111 gettimeofday(& tm, NULL);
00112
00113
LockHolder lh(_streamStatsHistoryLock);
00114
00115
if ((_streamStatsHistory.size() == 0) || (_streamStatsHistory.front()._tv_sec != tm.tv_sec))
00116 {
00117 _streamStatsHistory.push_front(
StreamStatsSample(tm.tv_sec, _statsUnreportedEnq));
00118
00119
if (_streamStatsHistory.size() > _statsHistoryLength)
00120 {
00121 _streamStatsHistory.pop_back();
00122 }
00123 }
00124
else
00125 {
00126 _streamStatsHistory.front()._tuplesEnqueued += _statsUnreportedEnq;
00127 }
00128
00129 _statsUnreportedEnq = 0;
00130 }
00131
00135
const deque<StreamStatsSample> & lockStreamStatsHistory()
00136 {
00137 _streamStatsHistoryLock.lock();
00138
return _streamStatsHistory;
00139 }
00140
00141
void unlockStreamStatsHistory()
00142 {
00143 _streamStatsHistoryLock.unlock();
00144 }
00145
00146
static string dumpStreamStatsHistory(
const deque<StreamStatsSample> & h)
00147 {
00148 ostringstream os;
00149
for (deque<StreamStatsSample>::const_iterator pos = h.begin();
00150 pos != h.end(); ++pos)
00151 {
00152 os << pos->to_string() << endl;
00153 }
00154 os <<
"---------------------------";
00155
return os.str();
00156 }
00157
00158
#endif
00159
00161 class EnqIterator
00162 {
00163
public:
00165 EnqIterator() : _stream(0) {}
00166
00169 void *
tuple()
const { ASSERT(_stream);
return _stream->
_buf; }
00170
00173 inline EnqIterator&
operator ++ ()
00174 {
00175 ASSERT(_stream);
00176 _stream->
enqueueTuple();
00177
return *
this;
00178 }
00179
00180
private:
00181
EnqIterator(
Stream& stream) : _stream(&stream) {}
00182
Stream* _stream;
00183
00184
friend class Stream;
00185 };
00186
friend class EnqIterator;
00187
00189 EnqIterator enq_iterator() {
return EnqIterator(*
this); }
00190
00192 string
as_string() const;
00193
00196 enum SchemaRepresentation
00197 {
00199
SCHEMA_NAME,
00200
00203
FULL_SCHEMA_IF_ANONYMOUS,
00204
00206
FULL_SCHEMA
00207 };
00208
00210 string
as_xml(SchemaRepresentation rep = FULL_SCHEMA_IF_ANONYMOUS)
const;
00211
00216
void notifyEnq();
00217
00218
void beginBuffering();
00219
00220
void stopBuffering();
00221
00222 const Params&
getParams()
const {
return _params; }
00223
00224
00225
void setCP(ptr<CP> cp);
00226 ptr<CP>
getCP();
00227
00228
private:
00229
void enqueueTuple();
00230
00231
00232
00233
AuroraNode & _node;
00234
const CpuStats& _cpu_stats;
00235
00236
00237
void *_buf;
00238
00239
00240
00241
void pushTuple(
const void *_buf );
00242
00243 string _name;
00244 size_t _numTuplesEnqueued;
00245 TupleDescription _desc;
00246 vector<TupleQueue*> _queues;
00247 vector<QBoxInputPort> _dest_ports;
00248
QBoxOutputPort _source_port;
00249
00250
bool _can_drop;
00251
double _drop_threshold;
00252 ptr<Expression> _drop_expr;
00253 ptr<EvalContext> _drop_ctxt;
00254
00255
00256
00257
bool _buffering;
00258
00259
00260
00261
TupleQueue *_tuple_buffer;
00262
00263
Params _params;
00264
00265
00266
00267 ptr<CP> _cp;
00268
00269
#ifdef AURORA_RUNTIME_STATS
00270
00271
00272 size_t _statsUnreportedEnq;
00273
00274
00275
00276
00277
00278
00279 deque<StreamStatsSample> _streamStatsHistory;
00280
00281
00282
PtMutex _streamStatsHistoryLock;
00283
00284
00285
00286
00287 size_t _statsHistoryLength;
00288
#endif
00289
00290 };
00291
00292 BOREALIS_NAMESPACE_END;
00293
00294
#endif // STREAM_H