00001
00002
#ifndef BOREALIS_ADMIN_H
00003
#define BOREALIS_ADMIN_H
00004
00005
#include "BasicComponent.h"
00006
#include "Schema.h"
00007
#include "Name.h"
00008
#include "StreamDef.h"
00009
#include "PendingQuery.h"
00010
#include "Query.h"
00011
#include "CPViewDescription.h"
00012
00013
BOREALIS_NAMESPACE_BEGIN
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 class Admin :
public BasicComponent {
00029
public:
00030
00031 Admin(string
id) : BasicComponent(id,
"Admin") {}
00032 ~Admin() {}
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 AsyncRPC<void>
create_schema(Schema schema);
00043 AsyncRPC<void>
create_stream(StreamDef streamdef);
00044 AsyncRPC<void>
create_cp(CPViewDescription view_desc, StreamDef streamdef);
00045 AsyncRPC<void>
create_query(Query query);
00046 AsyncRPC<void>
create_query_xml(string query);
00048 AsyncRPC<void>
set_query_status(Name name, QueryStatus status);
00049 AsyncRPC<void>
set_queries_status(vector<Name> name, QueryStatus status);
00050
00051
00052 AsyncRPC<void>
subscribe(Subscription sub,
unsigned int add_or_remove);
00053 AsyncRPC<void>
subscribe_many(vector<Subscription> sub,
unsigned int add_or_remove);
00054
00055
00056 AsyncRPC<void>
create_and_start_queries(vector<Query> queries);
00057 AsyncRPC<void>
queries_and_subscriptions(vector<Query> queries,vector<Subscription> subs,
00058
unsigned int add_or_remove);
00059
00060 AsyncRPC<void>
move_load(vector<Name> query_names, string partner);
00061
00062
00063 AsyncRPC<void>
split_query(Name query_hunk_name, map<Name, vector<Name> > boxes_per_hunk);
00064 AsyncRPC<void>
merge_query(vector<Name> existing_query_hunk_names, Name new_query_hunk_name);
00065
00066
00075 MultiRPC<InputStreamInfo>
add_stream_observer();
00076
00077 MultiRPC<Query>
add_query_observer();
00078
00079 MultiRPC< vector<Subscription> >
add_sub_observer();
00080
00081
NMSTL_RPC_OBJECT(
Admin);
00082
00083
protected:
00084 void in_thread_init() {}
00085
00086
private:
00087
00088
void empty_cleanup();
00089
00090
void create_stream_2(AsyncRPC<void> completion, ptr<StreamDef> sd, RPC<void> result);
00091
00093 ptr<StreamDef> Admin::best_match(vector< ptr<Object> > list, StreamDef& target);
00094
void create_query_2(ptr<PendingQuery> pending, vector< ptr<StreamDef> > incomplete_stream_defs,
00095 RPC< vector< vector< ptr<Object> > > > streamdefs);
00096
void start_created_query_from_batch(ptr<PendingBatch> batch_queries,
00097 Name name, RPC<void> result);
00098
void created_query_from_batch(ptr<PendingBatch> batch_queries,
00099 Name name, RPC<void> result);
00100
void queries_and_subscriptions_2(AsyncRPC<void> completion, vector<Subscription> subs,
00101
unsigned int add_or_remove, RPC<void> result);
00102
00103
void distribute_query(ptr<PendingQuery>);
00104
void distribute_query_2(ptr<PendingQuery> pending, RPC<Query> typed_query);
00105
void register_query_streams_2(ptr<PendingQuery> pending, RPC<void> result);
00106
void register_responsability_response(ptr<Query> query, RPC<void> result);
00107
void remove_responsability_response(Name item_name, RPC<void> result);
00108
void remove_responsabilities_response(vector<Name> item_names, RPC<void> result);
00109
void subscribe_remote(AsyncRPC<void> completion, Subscription sub,
00110
unsigned int cmd,RPC< vector< ptr<Object> > > obj);
00111
void subscribe_2(AsyncRPC<void> completion,
unsigned int cmd,
int nb_subscriptions,
00112 RPC<vector<Subscription> > successful);
00113
void subscribe_3(AsyncRPC<void> completion,
int nb_subscriptions, RPC<void> res);
00114
00115
struct MoveData {
00116 vector<Name> query_names;
00117 ptr< vector<Query> > queries;
00118 ptr< vector<Subscription> > subs;
00119 MedusaID dst;
00120 MoveData(vector<Name> qn, ptr< vector<Query> > q, ptr< vector<Subscription> > s, MedusaID d)
00121 : query_names(qn), queries(q), subs(s), dst(d) {}
00122
00123 };
00124
00125
00126
void move_load_2(MoveData data, AsyncRPC<void> completion, RPC< vector<Query> > choked_queries);
00127
void move_load_3(MoveData data, AsyncRPC<void> completion, RPC< vector<Query> > updated_queries);
00128
void move_load_4(MoveData data, AsyncRPC<void> completion, RPC<void> result);
00129
void move_load_5(MoveData data, AsyncRPC<void> completion, RPC<void> result);
00130
void move_load_6(MoveData data, AsyncRPC<void> completion, RPC<void> result);
00131
00132
00133
void split_query2(Name query_hunk_name, vector<ptr<Query> > new_hunk_ptrs, AsyncRPC<void> completion, RPC<void> result);
00134
00135
00136
void merge_query2(vector<Name> existing_query_hunk_names, ptr<Query> new_query_hunk, AsyncRPC<void> completion, RPC<void> result);
00137
00141
void notify_stream_observers(ptr<StreamDef> sd, vector< ptr<Object> > alts = vector< ptr<Object> >());
00142
00146
void notify_query_observers(ptr<Query> query);
00147
00151
void notify_sub_observers(vector<Subscription> subs);
00152
00153
public:
00154 typedef map<Name, ptr<PendingQuery> >
PendingQueries;
00155 typedef map<Name, ptr<StreamDef> >
StreamMap;
00156 typedef map<Name, vector< ptr<StreamDef> > >
StreamVersionsMap;
00157 typedef map<Name, ptr<Query> >
LocalQueries;
00158
00159
00160 typedef map<Name, ptr<Subscription> >
SubscriptionMap;
00161 typedef map<Name, SubscriptionMap >
Subscriptions;
00162
00163 typedef map<Name, ptr<CPViewDescription> >
CPViewDescriptionMap;
00164
00165
private:
00166
00167
PendingQueries pending_queries;
00168
00169
00170
LocalQueries m_local_queries;
00171
00172
00173
StreamMap m_local_streams;
00174
00175
00176
CPViewDescriptionMap m_local_cpviews;
00177
00178
00179
StreamVersionsMap local_stream_versions;
00180
00181
00182
Subscriptions m_subscriptions;
00183
00184
00185
typedef vector< MultiRPC<InputStreamInfo> > StreamObserversList;
00186 StreamObserversList m_stream_observers;
00187
00188
00189
typedef vector< MultiRPC<Query> > QueryObserversList;
00190 QueryObserversList m_query_observers;
00191
00192
00193
typedef vector< MultiRPC< vector<Subscription> > > SubObserversList;
00194 SubObserversList m_sub_observers;
00195
00196 };
00197
00198
00199 typedef Callback< void >
CleanUpMethod;
00200
00207 class ParallelActions {
00208
public:
00209 ParallelActions(AsyncRPC<void> completion,
00210
int nb_parallel_tracks,
00211 string msg,
00212 CleanUpMethod clean_up =
CleanUpMethod())
00213 :
m_completion(completion),
00214
m_pending(nb_parallel_tracks),
00215
m_msg(msg),
00216
m_already_failed(false),
00217
m_actions_results(nb_parallel_tracks, true),
00218
m_clean_up_method(clean_up) {}
00219
00220 void clean_up() {
00221 WARN <<
"Should invoke a callback to clean-up state for "
00222 <<
m_msg <<
" and result vector " <<
m_actions_results;
00223
if (
m_clean_up_method )
00224
m_clean_up_method();
00225 }
00226
00227 string
as_string()
const {
00228
return m_msg;
00229 }
00230
00231 AsyncRPC<void>
m_completion;
00232 int m_pending;
00233 string
m_msg;
00234 bool m_already_failed;
00235 vector<bool>
m_actions_results;
00236 CleanUpMethod m_clean_up_method;
00237 };
00238
00239
NMSTL_TO_STRING(
ParallelActions);
00240
00241
00249 class ParallelActionsMgr {
00250
public:
00251 static void action_result(ptr<ParallelActions> p,
int action_nb, RPC<void> result) {
00252
00253 WARN <<
"Number pending is " << p->m_pending;
00254 p->m_pending--;
00255
00256
00257
if (!result.valid() ) {
00258
00259
00260 p->m_actions_results[action_nb] =
false;
00261
00262
00263
if ( ! p->m_already_failed ) {
00264 p->m_already_failed =
true;
00265 ERROR <<
"One of several actions failed in group " << p->m_msg;
00266 p->m_completion.post(RPCFault(p->m_msg + result.stat()));
00267 }
00268 }
00269
00270
else if ( p->m_pending <= 0 ) {
00271 p->m_completion.post(
true);
00272 }
00273
00274
00275
if ( (p->m_pending <= 0) && (p->m_already_failed) ) {
00276 p->clean_up();
00277 }
00278 }
00279 };
00280
00281
00282
BOREALIS_NAMESPACE_END
00283
00284
#endif