00001
00002
#ifndef BOREALIS_QUERYPROCESSOR_H
00003
#define BOREALIS_QUERYPROCESSOR_H
00004
00005
#include "AuroraNode.h"
00006
#include "BasicComponent.h"
00007
#include "DataPath.h"
00008
#include "StreamState.h"
00009
#include "QueryState.h"
00010
#include "Recovery.h"
00011
#include "ConsistencyMngr.h"
00012
00013
#include "QBoxState.h"
00014
00015
#include "CP.h"
00016
00017 BOREALIS_NAMESPACE_BEGIN;
00018
00022 class QueryProcessor :
public BasicComponent {
00023
00024
public:
00025
00026
QueryProcessor(string
id, InetAddress data_add,
00027
int ha_interval = HA_INTERVAL,
00028
bool no_dups =
false);
00029
virtual ~QueryProcessor();
00030
00031
00032
00033
00034
00035
00036
00037
00038
00045 RPC<Query>
typecheck(Query query);
00046
00052 AsyncRPC<void>
setup_query(Query query);
00053
00058 RPC<Query>
typecheck_and_setup(Query query);
00059
00060
00064 AsyncRPC<void>
load_box(string file_path);
00065
00070 RPC< vector<Query> >
choke_queries(vector<Query> queries);
00071
00075 RPC<void>
resume_queries(vector<Query> queries);
00076
00080 RPC<Query>
pack_query(Query query);
00081
00086 RPC< vector<Query> >
pack_queries(vector<Query> queries);
00087
00091 RPC<void>
remove_query(Query query);
00092
00093
00097 RPC<void>
replace_query(vector<Name> old_queries, vector<Query> new_queries);
00098
00103 RPC<void>
set_query_status(Name name, QueryStatus status);
00104
00108 RPC<void>
set_queries_status( vector<Name> name, QueryStatus status);
00109
00115 RPC<void>
subscribe(Subscription sub,
unsigned int add_or_remove);
00116
00121 RPC< vector<Subscription> >
subscribe_many(vector<Subscription> sub,
unsigned int add_or_remove);
00122
00123
00128 RPC< vector<Subscription> >
get_subscriptions(vector<Name> streams);
00129
00134 RPC<vector<Stats> >
get_stats();
00135
00136 RPC<double>
get_sel();
00137
00141 RPC<void>
create_stream(StreamDef stream);
00142
00146 RPC<void>
create_cpview(CPViewDescription view_desc, StreamDef streamdef);
00147
00153 RPC<void>
update_stream(StreamDef old_sd, StreamDef new_sd);
00154
00158 RPC<void>
ack(MedusaID node, StreamID
id, string last_tuple);
00159
00163 RPC<void>
trim(MedusaID node, StreamID
id, string last_tuple);
00164
00168 RPC<void>
set_recovery_method(
int method);
00169
00173 RPC<void>
set_primary_status(
bool status);
00174
00179 RPC<void>
set_secondaries(vector<MedusaID> secondaries);
00180
00184 RPC<void>
set_replicas(vector<MedusaID> replicas);
00185
00189 RPC<void>
checkpoint(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim);
00190
00191
00192
NMSTL_RPC_OBJECT(
QueryProcessor);
00193
00194
private:
00195
00196
void init();
00197
void in_thread_init();
00198 Status do_subscribe(Subscription sub,
unsigned int add_or_remove);
00199 Status local_subscribe(Name stream_name,
unsigned int add_or_remove, MedusaID fast_dst = MedusaID());
00200 Status local_typecheck(Query& new_query);
00201 Status local_setup_query(Query& query);
00202 Status local_load_box(string file_path);
00203 Status local_choke_query(Query& query);
00204 Status local_resume_query(Query& query);
00205 Status local_pack_query(Query& query);
00206 Status local_remove_query(Query query);
00207 Status local_replace_query(vector<Name> old_queries, vector<Query> new_queries);
00208 Status local_set_query_status(Name name, QueryStatus status);
00209 Status local_start_query(
QueryState& qs);
00210 Status check_dependencies_moved_query(Query query, MedusaID new_location);
00211 Status subscribe_self(MedusaID owner, ptr<StreamState> st,
bool with_history);
00212 Status unsubscribe_self(ptr<StreamState> st);
00213
void update_stream_defs(Query& new_query,
00214 map<string,TupleDescription>& simple_stream_defs,
00215 vector<Name>& cp_ids);
00216
void update_stats(Time start_time);
00217
void do_ha_tasks();
00218
void send_checkpoints();
00219
void checkpoint_resp(MedusaID secondary, vector<StreamEvent> checkpointed_tuples, RPC<void> result);
00220
void send_downstream(vector<StreamEvent> tuples_to_send);
00221
00222
void handle_subscription_response(ptr<StreamState>,
00223
unsigned int add_or_remove,
00224 RPC<void>);
00225
00226 Status delete_query_state(ptr<QueryState> qs);
00227 Status remove_from_engine(string as_xml);
00228 Status update_local_streams(Query& query);
00229
00233
void send_acks();
00234
00235
void send_ack(MedusaID node, StreamID stream_id, string tuple);
00236
00237
void send_queue_trimming_message(MedusaID node, StreamID stream_id, string tuple);
00238
00242
void handle_ack_response(MedusaID node, StreamID
id, string last_tuple, RPC<void> result);
00243
void handle_trim_response(MedusaID node, StreamID
id, string last_tuple, RPC<void> result);
00244
00245
private:
00246
00248
AuroraNode m_aurora;
00249
bool m_aurora_started;
00250
00252 InetAddress m_data_add;
00253
00255
DataPath m_data_path;
00256
00258 vector<Name> m_injected_streams;
00259
00261
typedef map<Name, ptr<QueryState> > QueryStates;
00262 QueryStates m_queries;
00263
00265
typedef map<Name, ptr<StreamState> > StreamStates;
00266 StreamStates m_streams;
00267
00268
00269 Time m_start_period;
00270
00275 vector<MedusaID> m_secondary_ids;
00276
bool m_clear_secondary_endpoints;
00277
00282 map<MedusaID,bool> m_pending_ack;
00283 map<MedusaID,bool> m_pending_trim;
00284
00289
int m_ha_interval;
00290
00291 Time m_next_time_stats_computed;
00292
00293
typedef vector< pair< ptr<StreamState> ,string> > TrimPairs;
00294
00295
friend class ConsistencyMngr;
00296
00298
ConsistencyMngr m_constman;
00299
00300 };
00301
00302 BOREALIS_NAMESPACE_END;
00303
00304
#endif