Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

Stream.h

Go to the documentation of this file.
00001 #ifndef STREAM_H 00002 #define STREAM_H 00003 00004 #include <vector> 00005 #include <deque> 00006 #include <sstream> 00007 #include <sys/time.h> 00008 #include "common.h" 00009 00010 using namespace std; 00011 00012 #include "LockHolder.h" 00013 #include "TupleQueue.h" 00014 #include "TupleDescription.h" 00015 #include "TuplesAvailableMailbox.h" 00016 #include "QBoxInputPort.h" 00017 #include "QBoxOutputPort.h" 00018 #include "StreamStatsSample.h" 00019 #include "Params.h" 00020 #include "CpuStats.h" 00021 #include "Expression.h" 00022 #include "common.h" 00023 00024 BOREALIS_NAMESPACE_BEGIN; 00025 00026 class AuroraNode; 00027 class QBox; 00028 //class CPoint; 00029 class CP; 00030 00037 class Stream 00038 { 00039 public: 00041 Stream(string name, const TupleDescription& desc, AuroraNode& node, 00042 const Params& params = Params()) 00043 throw (AuroraException); 00044 00046 ~Stream(); 00047 00049 string getName() const { return _name; } 00050 00053 size_t getNumTuplesEnqueued() const { return _numTuplesEnqueued; } 00054 00057 //void setConnectionPoint(CPoint *cpoint) { _cpoint = cpoint; } 00058 00060 const TupleDescription& getTupleDescription() const { return _desc; } 00061 00065 void addQueue(TupleQueue* q); 00066 00068 void removeQueue(TupleQueue*); 00069 00071 const vector<TupleQueue*>& getQueues() { return _queues; } 00072 00076 void addDestPort(QBoxInputPort port); 00077 00079 void removeDestPort(QBoxInputPort port); 00080 00082 const vector<QBoxInputPort>& getDestPorts() { return _dest_ports; } 00083 00088 void setSourcePort(QBoxOutputPort port); 00089 00094 QBoxOutputPort getSourcePort() { return _source_port; } 00095 00096 #ifdef AURORA_RUNTIME_STATS 00103 void updateStats() 00104 { 00105 if (_statsUnreportedEnq == 0) 00106 { 00107 return; 00108 } 00109 00110 timeval tm; 00111 gettimeofday(& tm, NULL); 00112 00113 LockHolder lh(_streamStatsHistoryLock); 00114 00115 if ((_streamStatsHistory.size() == 0) || (_streamStatsHistory.front()._tv_sec != tm.tv_sec)) 00116 { 00117 _streamStatsHistory.push_front(StreamStatsSample(tm.tv_sec, _statsUnreportedEnq)); 00118 00119 if (_streamStatsHistory.size() > _statsHistoryLength) 00120 { 00121 _streamStatsHistory.pop_back(); 00122 } 00123 } 00124 else 00125 { 00126 _streamStatsHistory.front()._tuplesEnqueued += _statsUnreportedEnq; 00127 } 00128 00129 _statsUnreportedEnq = 0; 00130 } 00131 00135 const deque<StreamStatsSample> & lockStreamStatsHistory() 00136 { 00137 _streamStatsHistoryLock.lock(); 00138 return _streamStatsHistory; 00139 } 00140 00141 void unlockStreamStatsHistory() 00142 { 00143 _streamStatsHistoryLock.unlock(); 00144 } 00145 00146 static string dumpStreamStatsHistory(const deque<StreamStatsSample> & h) 00147 { 00148 ostringstream os; 00149 for (deque<StreamStatsSample>::const_iterator pos = h.begin(); 00150 pos != h.end(); ++pos) 00151 { 00152 os << pos->to_string() << endl; 00153 } 00154 os << "---------------------------"; 00155 return os.str(); 00156 } 00157 00158 #endif 00159 00161 class EnqIterator 00162 { 00163 public: 00165 EnqIterator() : _stream(0) {} 00166 00169 void *tuple() const { ASSERT(_stream); return _stream->_buf; } 00170 00173 inline EnqIterator& operator ++ () 00174 { 00175 ASSERT(_stream); 00176 _stream->enqueueTuple(); 00177 return *this; 00178 } 00179 00180 private: 00181 EnqIterator(Stream& stream) : _stream(&stream) {} 00182 Stream* _stream; 00183 00184 friend class Stream; 00185 }; 00186 friend class EnqIterator; 00187 00189 EnqIterator enq_iterator() { return EnqIterator(*this); } 00190 00192 string as_string() const; 00193 00196 enum SchemaRepresentation 00197 { 00199 SCHEMA_NAME, 00200 00203 FULL_SCHEMA_IF_ANONYMOUS, 00204 00206 FULL_SCHEMA 00207 }; 00208 00210 string as_xml(SchemaRepresentation rep = FULL_SCHEMA_IF_ANONYMOUS) const; 00211 00216 void notifyEnq(); 00217 00218 void beginBuffering(); 00219 00220 void stopBuffering(); 00221 00222 const Params& getParams() const { return _params; } 00223 00224 // methods to access CP on this stream 00225 void setCP(ptr<CP> cp); 00226 ptr<CP> getCP(); 00227 00228 private: 00229 void enqueueTuple(); 00230 //void enqFromCPoint(TupleQueue* q); 00231 00232 // The AuroraNode whose query network uses this Stream. 00233 AuroraNode & _node; 00234 const CpuStats& _cpu_stats; 00235 00236 // Buffer returned by the iterator 00237 void *_buf; 00238 00239 // this is just a expression elimination. The function would 00240 // send the _buf to all outputs. 00241 void pushTuple( const void *_buf ); 00242 00243 string _name; 00244 size_t _numTuplesEnqueued; 00245 TupleDescription _desc; 00246 vector<TupleQueue*> _queues; 00247 vector<QBoxInputPort> _dest_ports; 00248 QBoxOutputPort _source_port; 00249 00250 bool _can_drop; 00251 double _drop_threshold; 00252 ptr<Expression> _drop_expr; 00253 ptr<EvalContext> _drop_ctxt; 00254 00255 // this flag tells the stream to start buffering tuples (instead of 00256 // forwarding them to queues) until further notice. 00257 bool _buffering; 00258 00259 // tuple queue to keep buffered tuples. Will use the same page pool 00260 // so that storage manager has no surprises 00261 TupleQueue *_tuple_buffer; 00262 00263 Params _params; 00264 //CPoint *_cpoint; 00265 00266 // pointer to the CP on this stream 00267 ptr<CP> _cp; 00268 00269 #ifdef AURORA_RUNTIME_STATS 00270 // The number of Enqueues that have occured on this stream, but which haven't yet 00271 // been reported by the updateStats() method. 00272 size_t _statsUnreportedEnq; 00273 00274 // Contains the stats sample history for this Stream. It will have the n-most 00275 // recent sample, where 'n' is _statsHistoryLength. No two elements in this deque 00276 // will have the same _tv_sec value. The deque is sorted by _tv_sec value such that the 00277 // front() of the list has the most recent sample (the sample with the highest _tv_sec 00278 // value). 00279 deque<StreamStatsSample> _streamStatsHistory; 00280 00281 // Any thread that's accessign _boxStatsHistory must hold this lock. 00282 PtMutex _streamStatsHistoryLock; 00283 00284 // This is given by the config file value "server"/"StatsHistoryLength". We cache the 00285 // value here to avoid extra function calls that would otherwise occur during each 00286 // invocation of the run() method. 00287 size_t _statsHistoryLength; 00288 #endif 00289 00290 }; 00291 00292 BOREALIS_NAMESPACE_END; 00293 00294 #endif // STREAM_H

Generated on Fri Nov 12 15:15:22 2004 for Borealis by doxygen 1.3.8