Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

Admin.h

Go to the documentation of this file.
00001 00002 #ifndef BOREALIS_ADMIN_H 00003 #define BOREALIS_ADMIN_H 00004 00005 #include "BasicComponent.h" 00006 #include "Schema.h" 00007 #include "Name.h" 00008 #include "StreamDef.h" 00009 #include "PendingQuery.h" 00010 #include "Query.h" 00011 #include "CPViewDescription.h" 00012 00013 BOREALIS_NAMESPACE_BEGIN 00014 00015 // create_query: the actual request from the clients. Sends lookups 00016 // for input streams; lookup responses go to create_query_2. 00017 // 00018 // create_query_2: handles a lookup response. When all responses 00019 // are received, calls distribute_query 00020 // 00021 // distribute_query: sends the query to the local QP for schema 00022 // resolution 00023 // 00024 // distribute_query_2: handles result of schema resolution. 00025 // Sets-up query locally (no automatic distribution here) 00026 // 00027 // 00028 class Admin : public BasicComponent { 00029 public: 00030 00031 Admin(string id) : BasicComponent(id,"Admin") {} 00032 ~Admin() {} 00033 00034 // RPC: #include "Schema.h" 00035 // RPC: #include "StreamDef.h" 00036 // RPC: #include "Query.h" 00037 // RPC: #include "Name.h" 00038 // RPC: #include "Stats.h" 00039 // RPC: #include "CPViewDescription.h" 00040 00041 // Client interface 00042 AsyncRPC<void> create_schema(Schema schema); 00043 AsyncRPC<void> create_stream(StreamDef streamdef); 00044 AsyncRPC<void> create_cp(CPViewDescription view_desc, StreamDef streamdef); 00045 AsyncRPC<void> create_query(Query query); 00046 AsyncRPC<void> create_query_xml(string query); 00048 AsyncRPC<void> set_query_status(Name name, QueryStatus status); 00049 AsyncRPC<void> set_queries_status(vector<Name> name, QueryStatus status); 00050 00051 // (Un)subscribe to receive events on a stream 00052 AsyncRPC<void> subscribe(Subscription sub, unsigned int add_or_remove); 00053 AsyncRPC<void> subscribe_many(vector<Subscription> sub, unsigned int add_or_remove); 00054 00055 // The two methods below are for batch processing 00056 AsyncRPC<void> create_and_start_queries(vector<Query> queries); 00057 AsyncRPC<void> queries_and_subscriptions(vector<Query> queries,vector<Subscription> subs, 00058 unsigned int add_or_remove); 00059 // Move load described by boxes to partner 00060 AsyncRPC<void> move_load(vector<Name> query_names, string partner); 00061 00062 // Hunk management for fine-grained box movement. 00063 AsyncRPC<void> split_query(Name query_hunk_name, map<Name, vector<Name> > boxes_per_hunk); 00064 AsyncRPC<void> merge_query(vector<Name> existing_query_hunk_names, Name new_query_hunk_name); 00065 00066 00075 MultiRPC<InputStreamInfo> add_stream_observer(); 00076 00077 MultiRPC<Query> add_query_observer(); 00078 00079 MultiRPC< vector<Subscription> > add_sub_observer(); 00080 00081 NMSTL_RPC_OBJECT(Admin); 00082 00083 protected: 00084 void in_thread_init() {} 00085 00086 private: 00087 00088 void empty_cleanup(); 00089 00090 void create_stream_2(AsyncRPC<void> completion, ptr<StreamDef> sd, RPC<void> result); 00091 00093 ptr<StreamDef> Admin::best_match(vector< ptr<Object> > list, StreamDef& target); 00094 void create_query_2(ptr<PendingQuery> pending, vector< ptr<StreamDef> > incomplete_stream_defs, 00095 RPC< vector< vector< ptr<Object> > > > streamdefs); 00096 void start_created_query_from_batch(ptr<PendingBatch> batch_queries, 00097 Name name, RPC<void> result); 00098 void created_query_from_batch(ptr<PendingBatch> batch_queries, 00099 Name name, RPC<void> result); 00100 void queries_and_subscriptions_2(AsyncRPC<void> completion, vector<Subscription> subs, 00101 unsigned int add_or_remove, RPC<void> result); 00102 00103 void distribute_query(ptr<PendingQuery>); 00104 void distribute_query_2(ptr<PendingQuery> pending, RPC<Query> typed_query); 00105 void register_query_streams_2(ptr<PendingQuery> pending, RPC<void> result); 00106 void register_responsability_response(ptr<Query> query, RPC<void> result); 00107 void remove_responsability_response(Name item_name, RPC<void> result); 00108 void remove_responsabilities_response(vector<Name> item_names, RPC<void> result); 00109 void subscribe_remote(AsyncRPC<void> completion, Subscription sub, 00110 unsigned int cmd,RPC< vector< ptr<Object> > > obj); 00111 void subscribe_2(AsyncRPC<void> completion, unsigned int cmd, int nb_subscriptions, 00112 RPC<vector<Subscription> > successful); 00113 void subscribe_3(AsyncRPC<void> completion, int nb_subscriptions, RPC<void> res); 00114 00115 struct MoveData { 00116 vector<Name> query_names; 00117 ptr< vector<Query> > queries; 00118 ptr< vector<Subscription> > subs; 00119 MedusaID dst; 00120 MoveData(vector<Name> qn, ptr< vector<Query> > q, ptr< vector<Subscription> > s, MedusaID d) 00121 : query_names(qn), queries(q), subs(s), dst(d) {} 00122 00123 }; 00124 00125 // Move a set of queries to another node 00126 void move_load_2(MoveData data, AsyncRPC<void> completion, RPC< vector<Query> > choked_queries); 00127 void move_load_3(MoveData data, AsyncRPC<void> completion, RPC< vector<Query> > updated_queries); 00128 void move_load_4(MoveData data, AsyncRPC<void> completion, RPC<void> result); 00129 void move_load_5(MoveData data, AsyncRPC<void> completion, RPC<void> result); 00130 void move_load_6(MoveData data, AsyncRPC<void> completion, RPC<void> result); 00131 00132 // Split a query 00133 void split_query2(Name query_hunk_name, vector<ptr<Query> > new_hunk_ptrs, AsyncRPC<void> completion, RPC<void> result); 00134 00135 // Merge a query 00136 void merge_query2(vector<Name> existing_query_hunk_names, ptr<Query> new_query_hunk, AsyncRPC<void> completion, RPC<void> result); 00137 00141 void notify_stream_observers(ptr<StreamDef> sd, vector< ptr<Object> > alts = vector< ptr<Object> >()); 00142 00146 void notify_query_observers(ptr<Query> query); 00147 00151 void notify_sub_observers(vector<Subscription> subs); 00152 00153 public: 00154 typedef map<Name, ptr<PendingQuery> > PendingQueries; 00155 typedef map<Name, ptr<StreamDef> > StreamMap; 00156 typedef map<Name, vector< ptr<StreamDef> > > StreamVersionsMap; 00157 typedef map<Name, ptr<Query> > LocalQueries; 00158 00159 // For each stream name, we hold a map of subscription names to subscriptions themselves 00160 typedef map<Name, ptr<Subscription> > SubscriptionMap; 00161 typedef map<Name, SubscriptionMap > Subscriptions; 00162 00163 typedef map<Name, ptr<CPViewDescription> > CPViewDescriptionMap; 00164 00165 private: 00166 // Queries that are (being) setup. Once, setup, we remove queries from this data structure 00167 PendingQueries pending_queries; 00168 00169 // Once set-up, we only remember the hunks that are running locally 00170 LocalQueries m_local_queries; 00171 00172 // Cache of locally owned streams 00173 StreamMap m_local_streams; 00174 00175 // locally owned cps 00176 CPViewDescriptionMap m_local_cpviews; 00177 00178 // As a first try we will keep info about stream versions in a separate structure 00179 StreamVersionsMap local_stream_versions; 00180 00181 // Cache of responsabilities 00182 Subscriptions m_subscriptions; 00183 00184 // List of Observers for new input streams 00185 typedef vector< MultiRPC<InputStreamInfo> > StreamObserversList; 00186 StreamObserversList m_stream_observers; 00187 00188 // List of Observers for new queries 00189 typedef vector< MultiRPC<Query> > QueryObserversList; 00190 QueryObserversList m_query_observers; 00191 00192 // List of Observers for new subscriptions 00193 typedef vector< MultiRPC< vector<Subscription> > > SubObserversList; 00194 SubObserversList m_sub_observers; 00195 00196 }; 00197 00198 00199 typedef Callback< void > CleanUpMethod; 00200 00207 class ParallelActions { 00208 public: 00209 ParallelActions(AsyncRPC<void> completion, 00210 int nb_parallel_tracks, 00211 string msg, 00212 CleanUpMethod clean_up = CleanUpMethod()) 00213 : m_completion(completion), 00214 m_pending(nb_parallel_tracks), 00215 m_msg(msg), 00216 m_already_failed(false), 00217 m_actions_results(nb_parallel_tracks, true), 00218 m_clean_up_method(clean_up) {} 00219 00220 void clean_up() { 00221 WARN << "Should invoke a callback to clean-up state for " 00222 << m_msg << " and result vector " << m_actions_results; 00223 if ( m_clean_up_method ) 00224 m_clean_up_method(); 00225 } 00226 00227 string as_string() const { 00228 return m_msg; 00229 } 00230 00231 AsyncRPC<void> m_completion; 00232 int m_pending; 00233 string m_msg; 00234 bool m_already_failed; 00235 vector<bool> m_actions_results; 00236 CleanUpMethod m_clean_up_method; 00237 }; 00238 00239 NMSTL_TO_STRING(ParallelActions); 00240 00241 00249 class ParallelActionsMgr { 00250 public: 00251 static void action_result(ptr<ParallelActions> p, int action_nb, RPC<void> result) { 00252 00253 WARN << "Number pending is " << p->m_pending; 00254 p->m_pending--; 00255 00256 // Action has failed 00257 if (!result.valid() ) { 00258 00259 // Make a note of the failure in the vector of results 00260 p->m_actions_results[action_nb] = false; 00261 00262 // Return failure message to client 00263 if ( ! p->m_already_failed ) { 00264 p->m_already_failed = true; 00265 ERROR << "One of several actions failed in group " << p->m_msg; 00266 p->m_completion.post(RPCFault(p->m_msg + result.stat())); 00267 } 00268 } 00269 // If all actions have succeeded, we can return success message to client 00270 else if ( p->m_pending <= 0 ) { 00271 p->m_completion.post(true); 00272 } 00273 00274 // If we're all done and something failed, then we need to clean-up 00275 if ( (p->m_pending <= 0) && (p->m_already_failed) ) { 00276 p->clean_up(); 00277 } 00278 } 00279 }; 00280 00281 00282 BOREALIS_NAMESPACE_END 00283 00284 #endif

Generated on Fri Nov 12 15:15:20 2004 for Borealis by doxygen 1.3.8