00001
#ifndef MEDUSA_STREAMSTATE_H
00002
#define MEDUSA_STREAMSTATE_H
00003
00004
#include "common.h"
00005
#include "Objects.h"
00006
#include "StreamBuffer.h"
00007
#include "Stats.h"
00008
00009 BOREALIS_NAMESPACE_BEGIN;
00010
00011 class StreamState {
00012
00013
private:
00014
00016 Name m_name;
00017
00019 ptr<StreamDef> m_stream_def;
00020
00021
public:
00022
00023
00024 enum Class {
00025
INJECT,
00026
OTHER
00027 };
00028
00029 int m_class;
00030 bool is_injected_stream() {
return m_class ==
INJECT; }
00031
00032
00033 vector<Subscription>
m_fast_subs;
00034
00035
00036 bool m_subscribed;
00037 pair<MedusaID,Subscription>
m_sub;
00038
00039
00040 int m_nb_queries;
00041
00042 StreamBuffer m_buffer;
00043
00044
00045
00046 double m_tuples;
00047 double m_bytes;
00048
00049 StreamState(Name stream_name, ptr<StreamDef> stream_def, Class clazz) :
00050 m_name(stream_name), m_stream_def(stream_def),
m_class(clazz),
00051
m_subscribed(false),
m_nb_queries(0),
00052
m_buffer(stream_name),
00053
00054
m_tuples(0.0),
m_bytes(0.0) {
00055 }
00056
00057 string
as_string()
const {
00058 string out = string(
"StreamState{") + m_name;
00059
if (m_stream_def)
00060 out <<
"; def=" << *(m_stream_def);
00061
00062 out <<
"; class=";
00063
switch (
m_class) {
00064
case INJECT: out <<
"INJECT";
break;
00065
default: out <<
"OTHER";
00066 };
00067
00068
if (
m_subscribed) {
00069 out <<
"; subscribed";
00070 out <<
" to " <<
m_sub.first;
00071 }
00072 out <<
"; nb_queries is " <<
m_nb_queries;
00073 out <<
"; buffer is " <<
m_buffer;
00074 out <<
"}";
00075
00076
return out;
00077 }
00078
00079 string
repr()
const {
return as_string(); }
00080
00081 ptr<StreamDef>
get_stream_def()
const {
return m_stream_def; }
00082 void set_definition(ptr<StreamDef> str) { m_stream_def = str; }
00083
00084 const Name&
get_name()
const {
return m_name; }
00085 Schema
get_schema()
const {
return m_stream_def->get_schema(); }
00086
00087
00088
00089 struct stream_stats {
00090 double m_tuple_rate;
00091 double m_byte_rate;
00092 stream_stats() :
m_tuple_rate(0.0),
m_byte_rate(0.0) {}
00093 };
00094 stream_stats m_stream_stats;
00095 void incrStats(
double tuples,
double bytes) {
00096
m_tuples += tuples;
00097
m_bytes += bytes;
00098 }
00099 void endStatPeriod(
double delta_time_msecs) {
00100
m_stream_stats.
m_tuple_rate
00101 = Stats::average(
m_stream_stats.
m_tuple_rate,MSECS*
m_tuples/delta_time_msecs);
00102
m_stream_stats.
m_byte_rate
00103 = Stats::average(
m_stream_stats.
m_byte_rate,MSECS*
m_bytes/delta_time_msecs);
00104
m_tuples =
m_bytes = 0.0;
00105 }
00106 stream_stats getStreamStats() {
00107
return m_stream_stats;
00108 }
00109
00110 double n_tuples() {
return m_tuples;};
00111
00112
00113 };
00114
00115
BOREALIS_NAMESPACE_END
00116
00117
00118
#endif