Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

QueryProcessor.h

Go to the documentation of this file.
00001 00002 #ifndef BOREALIS_QUERYPROCESSOR_H 00003 #define BOREALIS_QUERYPROCESSOR_H 00004 00005 #include "AuroraNode.h" 00006 #include "BasicComponent.h" 00007 #include "DataPath.h" 00008 #include "StreamState.h" 00009 #include "QueryState.h" 00010 #include "Recovery.h" 00011 #include "ConsistencyMngr.h" 00012 00013 #include "QBoxState.h" 00014 00015 #include "CP.h" 00016 00017 BOREALIS_NAMESPACE_BEGIN; 00018 00022 class QueryProcessor : public BasicComponent { 00023 00024 public: 00025 00026 QueryProcessor(string id, InetAddress data_add, 00027 int ha_interval = HA_INTERVAL, 00028 bool no_dups = false); 00029 virtual ~QueryProcessor(); 00030 00031 // RPC: #include "MTuple.h" 00032 // RPC: #include "Name.h" 00033 // RPC: #include "Query.h" 00034 // RPC: #include "Stats.h" 00035 // RPC: #include "Recovery.h" 00036 // RPC: #include "StreamEvent.h" 00037 // RPC: #include "BoxPackage.h" 00038 00045 RPC<Query> typecheck(Query query); 00046 00052 AsyncRPC<void> setup_query(Query query); 00053 00058 RPC<Query> typecheck_and_setup(Query query); 00059 00060 00064 AsyncRPC<void> load_box(string file_path); 00065 00070 RPC< vector<Query> > choke_queries(vector<Query> queries); 00071 00075 RPC<void> resume_queries(vector<Query> queries); 00076 00080 RPC<Query> pack_query(Query query); 00081 00086 RPC< vector<Query> > pack_queries(vector<Query> queries); 00087 00091 RPC<void> remove_query(Query query); 00092 00093 00097 RPC<void> replace_query(vector<Name> old_queries, vector<Query> new_queries); 00098 00103 RPC<void> set_query_status(Name name, QueryStatus status); 00104 00108 RPC<void> set_queries_status( vector<Name> name, QueryStatus status); 00109 00115 RPC<void> subscribe(Subscription sub, unsigned int add_or_remove); 00116 00121 RPC< vector<Subscription> > subscribe_many(vector<Subscription> sub, unsigned int add_or_remove); 00122 00123 00128 RPC< vector<Subscription> > get_subscriptions(vector<Name> streams); 00129 00134 RPC<vector<Stats> > get_stats(); 00135 00136 RPC<double> get_sel(); 00137 00141 RPC<void> create_stream(StreamDef stream); 00142 00146 RPC<void> create_cpview(CPViewDescription view_desc, StreamDef streamdef); 00147 00153 RPC<void> update_stream(StreamDef old_sd, StreamDef new_sd); 00154 00158 RPC<void> ack(MedusaID node, StreamID id, string last_tuple); 00159 00163 RPC<void> trim(MedusaID node, StreamID id, string last_tuple); 00164 00168 RPC<void> set_recovery_method(int method); 00169 00173 RPC<void> set_primary_status(bool status); 00174 00179 RPC<void> set_secondaries(vector<MedusaID> secondaries); 00180 00184 RPC<void> set_replicas(vector<MedusaID> replicas); 00185 00189 RPC<void> checkpoint(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim); 00190 00191 00192 NMSTL_RPC_OBJECT(QueryProcessor); 00193 00194 private: // methods 00195 00196 void init(); 00197 void in_thread_init(); 00198 Status do_subscribe(Subscription sub, unsigned int add_or_remove); 00199 Status local_subscribe(Name stream_name, unsigned int add_or_remove, MedusaID fast_dst = MedusaID()); 00200 Status local_typecheck(Query& new_query); 00201 Status local_setup_query(Query& query); 00202 Status local_load_box(string file_path); 00203 Status local_choke_query(Query& query); 00204 Status local_resume_query(Query& query); 00205 Status local_pack_query(Query& query); 00206 Status local_remove_query(Query query); 00207 Status local_replace_query(vector<Name> old_queries, vector<Query> new_queries); 00208 Status local_set_query_status(Name name, QueryStatus status); 00209 Status local_start_query(QueryState& qs); 00210 Status check_dependencies_moved_query(Query query, MedusaID new_location); 00211 Status subscribe_self(MedusaID owner, ptr<StreamState> st, bool with_history); 00212 Status unsubscribe_self(ptr<StreamState> st); 00213 void update_stream_defs(Query& new_query, 00214 map<string,TupleDescription>& simple_stream_defs, 00215 vector<Name>& cp_ids); 00216 void update_stats(Time start_time); 00217 void do_ha_tasks(); 00218 void send_checkpoints(); 00219 void checkpoint_resp(MedusaID secondary, vector<StreamEvent> checkpointed_tuples, RPC<void> result); 00220 void send_downstream(vector<StreamEvent> tuples_to_send); 00221 00222 void handle_subscription_response(ptr<StreamState>, 00223 unsigned int add_or_remove, 00224 RPC<void>); 00225 00226 Status delete_query_state(ptr<QueryState> qs); 00227 Status remove_from_engine(string as_xml); 00228 Status update_local_streams(Query& query); 00229 00233 void send_acks(); 00234 00235 void send_ack(MedusaID node, StreamID stream_id, string tuple); 00236 00237 void send_queue_trimming_message(MedusaID node, StreamID stream_id, string tuple); 00238 00242 void handle_ack_response(MedusaID node, StreamID id, string last_tuple, RPC<void> result); 00243 void handle_trim_response(MedusaID node, StreamID id, string last_tuple, RPC<void> result); 00244 00245 private: // fields 00246 00248 AuroraNode m_aurora; 00249 bool m_aurora_started; 00250 00252 InetAddress m_data_add; 00253 00255 DataPath m_data_path; 00256 00258 vector<Name> m_injected_streams; 00259 00261 typedef map<Name, ptr<QueryState> > QueryStates; 00262 QueryStates m_queries; 00263 00265 typedef map<Name, ptr<StreamState> > StreamStates; 00266 StreamStates m_streams; 00267 00268 // For load management (measuring load) 00269 Time m_start_period; 00270 00275 vector<MedusaID> m_secondary_ids; 00276 bool m_clear_secondary_endpoints; 00277 00282 map<MedusaID,bool> m_pending_ack; 00283 map<MedusaID,bool> m_pending_trim; 00284 00289 int m_ha_interval; 00290 00291 Time m_next_time_stats_computed; 00292 00293 typedef vector< pair< ptr<StreamState> ,string> > TrimPairs; 00294 00295 friend class ConsistencyMngr; 00296 00298 ConsistencyMngr m_constman; 00299 00300 }; 00301 00302 BOREALIS_NAMESPACE_END; 00303 00304 #endif

Generated on Fri Nov 12 15:15:21 2004 for Borealis by doxygen 1.3.8