Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

StreamState.h

Go to the documentation of this file.
00001 #ifndef MEDUSA_STREAMSTATE_H 00002 #define MEDUSA_STREAMSTATE_H 00003 00004 #include "common.h" 00005 #include "Objects.h" 00006 #include "StreamBuffer.h" 00007 #include "Stats.h" 00008 00009 BOREALIS_NAMESPACE_BEGIN; 00010 00011 class StreamState { 00012 00013 private: 00014 00016 Name m_name; 00017 00019 ptr<StreamDef> m_stream_def; 00020 00021 public: 00022 00023 // Stream classification. 00024 enum Class { 00025 INJECT, 00026 OTHER 00027 }; 00028 00029 int m_class; 00030 bool is_injected_stream() { return m_class == INJECT; } 00031 00032 // List of subscriptions 00033 vector<Subscription> m_fast_subs; // List of clients subscribed to this stream 00034 00035 // If not a local stream, have I asked to receive events on this stream? 00036 bool m_subscribed; 00037 pair<MedusaID,Subscription> m_sub; // Remember our subscription so we can remove it later 00038 00039 // How many local queries are connected to this stream on either end 00040 int m_nb_queries; 00041 00042 StreamBuffer m_buffer; // Buffers tuples on this stream 00043 //bool m_delete; 00044 00045 // For stats purposes 00046 double m_tuples; 00047 double m_bytes; 00048 00049 StreamState(Name stream_name, ptr<StreamDef> stream_def, Class clazz) : 00050 m_name(stream_name), m_stream_def(stream_def), m_class(clazz), 00051 m_subscribed(false), m_nb_queries(0), 00052 m_buffer(stream_name), 00053 //m_delete(false), 00054 m_tuples(0.0), m_bytes(0.0) { 00055 } 00056 00057 string as_string() const { 00058 string out = string("StreamState{") + m_name; 00059 if (m_stream_def) 00060 out << "; def=" << *(m_stream_def); 00061 00062 out << "; class="; 00063 switch (m_class) { 00064 case INJECT: out << "INJECT"; break; 00065 default: out << "OTHER"; 00066 }; 00067 00068 if (m_subscribed) { 00069 out << "; subscribed"; 00070 out << " to " << m_sub.first; 00071 } 00072 out << "; nb_queries is " << m_nb_queries; 00073 out << "; buffer is " << m_buffer; 00074 out << "}"; 00075 00076 return out; 00077 } 00078 00079 string repr() const { return as_string(); } 00080 00081 ptr<StreamDef> get_stream_def() const { return m_stream_def; } 00082 void set_definition(ptr<StreamDef> str) { m_stream_def = str; } 00083 00084 const Name& get_name() const { return m_name; } 00085 Schema get_schema() const { return m_stream_def->get_schema(); } 00086 00087 // This will go away once we have a good StatsManager!!! 00088 // Rates of tuples and bytes on this stream 00089 struct stream_stats { 00090 double m_tuple_rate; 00091 double m_byte_rate; 00092 stream_stats() : m_tuple_rate(0.0), m_byte_rate(0.0) {} 00093 }; 00094 stream_stats m_stream_stats; 00095 void incrStats(double tuples, double bytes) { 00096 m_tuples += tuples; 00097 m_bytes += bytes; 00098 } 00099 void endStatPeriod(double delta_time_msecs) { 00100 m_stream_stats.m_tuple_rate 00101 = Stats::average(m_stream_stats.m_tuple_rate,MSECS*m_tuples/delta_time_msecs); 00102 m_stream_stats.m_byte_rate 00103 = Stats::average(m_stream_stats.m_byte_rate,MSECS*m_bytes/delta_time_msecs); 00104 m_tuples = m_bytes = 0.0; 00105 } 00106 stream_stats getStreamStats() { 00107 return m_stream_stats; 00108 } 00109 00110 double n_tuples() {return m_tuples;}; 00111 00112 00113 }; 00114 00115 BOREALIS_NAMESPACE_END 00116 00117 00118 #endif

Generated on Fri Nov 12 15:15:22 2004 for Borealis by doxygen 1.3.8