Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

rpc_QueryProcessor.h

Go to the documentation of this file.
00001 /***** 00002 * 00003 * ../queryProcessor/rpc_QueryProcessor.h 00004 * 00005 * Include this file to gain access to the following classes: 00006 * 00007 * - Remote<QueryProcessor> 00008 * 00009 * This file is automatically generated by nmstl-rpcgen. 00010 * Do not modify it! 00011 * 00012 *****/ 00013 00014 #ifndef ___QUERYPROCESSOR_RPC_QUERYPROCESSOR_H 00015 #define ___QUERYPROCESSOR_RPC_QUERYPROCESSOR_H 00016 00017 #include <NMSTL/rpc> 00018 #include "BoxPackage.h" 00019 #include "MTuple.h" 00020 #include "Name.h" 00021 #include "Query.h" 00022 #include "Recovery.h" 00023 #include "Stats.h" 00024 #include "StreamEvent.h" 00025 00026 namespace Borealis { class QueryProcessor; }; 00027 00028 00029 NMSTL_NAMESPACE_BEGIN; 00030 00031 #ifdef DOXYGEN_SKIP 00032 00033 00034 00035 00036 /***** 00037 * 00038 * Following are the public interfaces to RPC objects. 00039 * 00040 *****/ 00041 00042 template<typename T> class Remote; 00043 class Remote<Borealis::QueryProcessor> { 00044 public: 00047 void typecheck(const Callback<void, RPC< Query > >& completion, 00048 Query query); 00049 00051 RPC< Query > typecheck(Query query); 00052 00055 void setup_query(const Callback<void, RPC< void > >& completion, 00056 Query query); 00057 00059 RPC< void > setup_query(Query query); 00060 00063 void typecheck_and_setup(const Callback<void, RPC< Query > >& completion, 00064 Query query); 00065 00067 RPC< Query > typecheck_and_setup(Query query); 00068 00071 void load_box(const Callback<void, RPC< void > >& completion, 00072 string file_path); 00073 00075 RPC< void > load_box(string file_path); 00076 00079 void choke_queries(const Callback<void, RPC< vector<Query> > >& completion, 00080 vector<Query> queries); 00081 00083 RPC< vector<Query> > choke_queries(vector<Query> queries); 00084 00087 void resume_queries(const Callback<void, RPC< void > >& completion, 00088 vector<Query> queries); 00089 00091 RPC< void > resume_queries(vector<Query> queries); 00092 00095 void pack_query(const Callback<void, RPC< Query > >& completion, 00096 Query query); 00097 00099 RPC< Query > pack_query(Query query); 00100 00103 void pack_queries(const Callback<void, RPC< vector<Query> > >& completion, 00104 vector<Query> queries); 00105 00107 RPC< vector<Query> > pack_queries(vector<Query> queries); 00108 00111 void remove_query(const Callback<void, RPC< void > >& completion, 00112 Query query); 00113 00115 RPC< void > remove_query(Query query); 00116 00119 void replace_query(const Callback<void, RPC< void > >& completion, 00120 vector<Name> old_queries, vector<Query> new_queries); 00121 00123 RPC< void > replace_query(vector<Name> old_queries, vector<Query> new_queries); 00124 00127 void set_query_status(const Callback<void, RPC< void > >& completion, 00128 Name name, QueryStatus status); 00129 00131 RPC< void > set_query_status(Name name, QueryStatus status); 00132 00135 void set_queries_status(const Callback<void, RPC< void > >& completion, 00136 vector<Name> name, QueryStatus status); 00137 00139 RPC< void > set_queries_status(vector<Name> name, QueryStatus status); 00140 00143 void subscribe(const Callback<void, RPC< void > >& completion, 00144 Subscription sub, unsigned int add_or_remove); 00145 00147 RPC< void > subscribe(Subscription sub, unsigned int add_or_remove); 00148 00151 void subscribe_many(const Callback<void, RPC< vector<Subscription> > >& completion, 00152 vector<Subscription> sub, unsigned int add_or_remove); 00153 00155 RPC< vector<Subscription> > subscribe_many(vector<Subscription> sub, unsigned int add_or_remove); 00156 00159 void get_subscriptions(const Callback<void, RPC< vector<Subscription> > >& completion, 00160 vector<Name> streams); 00161 00163 RPC< vector<Subscription> > get_subscriptions(vector<Name> streams); 00164 00167 void get_stats(const Callback<void, RPC< vector<Stats> > >& completion); 00168 00170 RPC< vector<Stats> > get_stats(); 00171 00174 void get_sel(const Callback<void, RPC< double > >& completion); 00175 00177 RPC< double > get_sel(); 00178 00181 void create_stream(const Callback<void, RPC< void > >& completion, 00182 StreamDef stream); 00183 00185 RPC< void > create_stream(StreamDef stream); 00186 00189 void create_cpview(const Callback<void, RPC< void > >& completion, 00190 CPViewDescription view_desc, StreamDef streamdef); 00191 00193 RPC< void > create_cpview(CPViewDescription view_desc, StreamDef streamdef); 00194 00197 void update_stream(const Callback<void, RPC< void > >& completion, 00198 StreamDef old_sd, StreamDef new_sd); 00199 00201 RPC< void > update_stream(StreamDef old_sd, StreamDef new_sd); 00202 00205 void ack(const Callback<void, RPC< void > >& completion, 00206 MedusaID node, StreamID id, string last_tuple); 00207 00209 RPC< void > ack(MedusaID node, StreamID id, string last_tuple); 00210 00213 void trim(const Callback<void, RPC< void > >& completion, 00214 MedusaID node, StreamID id, string last_tuple); 00215 00217 RPC< void > trim(MedusaID node, StreamID id, string last_tuple); 00218 00221 void set_recovery_method(const Callback<void, RPC< void > >& completion, 00222 int method); 00223 00225 RPC< void > set_recovery_method(int method); 00226 00229 void set_primary_status(const Callback<void, RPC< void > >& completion, 00230 bool status); 00231 00233 RPC< void > set_primary_status(bool status); 00234 00237 void set_secondaries(const Callback<void, RPC< void > >& completion, 00238 vector<MedusaID> secondaries); 00239 00241 RPC< void > set_secondaries(vector<MedusaID> secondaries); 00242 00245 void set_replicas(const Callback<void, RPC< void > >& completion, 00246 vector<MedusaID> replicas); 00247 00249 RPC< void > set_replicas(vector<MedusaID> replicas); 00250 00253 void checkpoint(const Callback<void, RPC< void > >& completion, 00254 vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim); 00255 00257 RPC< void > checkpoint(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim); 00258 00259 }; 00260 00261 00262 /***** 00263 * 00264 * You can safely ignore everything after here. 00265 * 00266 *****/ 00267 00268 00269 00270 00271 00272 #else // !defined(DOXYGEN_SKIP) 00273 00274 namespace __nmstl_tmp_rpc_defs { 00275 template<typename X> struct RPCDefs; 00276 template<typename X> struct Remote; 00277 using namespace Borealis; 00278 template<> 00279 struct RPCDefs<Borealis::QueryProcessor> { 00280 static const int typecheck_METHOD_ID = 20000; 00281 struct typecheck_IN : public rpc_request_1< Query > { 00282 Query query; 00283 typecheck_IN() {} 00284 typecheck_IN(Query query) : query(query) {} 00285 rpc_message *clone() const { return new typecheck_IN(*this); } 00286 string as_string() const { 00287 string out; 00288 out << "typecheck_IN{" << "query=" << query << "}"; 00289 return out; 00290 } 00291 NMSTL_SIMPLY_SERIALIZABLE(typecheck_IN, << Serial::tag("param") << query ); 00292 NMSTL_SERIAL_TAG(typecheck_IN, "params"); 00293 }; 00294 class typecheck_OUT { 00295 RPCStatus m_stat; 00296 Query m_val; 00297 public: 00298 operator const void * () const { return m_stat; } 00299 const RPCStatus& stat() const { return m_stat; } 00300 Query get() const { return m_val; } 00301 }; 00302 static const int setup_query_METHOD_ID = 20001; 00303 struct setup_query_IN : public rpc_request_1< void > { 00304 Query query; 00305 setup_query_IN() {} 00306 setup_query_IN(Query query) : query(query) {} 00307 rpc_message *clone() const { return new setup_query_IN(*this); } 00308 string as_string() const { 00309 string out; 00310 out << "setup_query_IN{" << "query=" << query << "}"; 00311 return out; 00312 } 00313 NMSTL_SIMPLY_SERIALIZABLE(setup_query_IN, << Serial::tag("param") << query ); 00314 NMSTL_SERIAL_TAG(setup_query_IN, "params"); 00315 }; 00316 class setup_query_OUT { 00317 RPCStatus m_stat; 00318 public: 00319 operator const void * () const { return m_stat; } 00320 const RPCStatus& stat() const { return m_stat; } 00321 bool get() const { return true; } 00322 }; 00323 static const int typecheck_and_setup_METHOD_ID = 20002; 00324 struct typecheck_and_setup_IN : public rpc_request_1< Query > { 00325 Query query; 00326 typecheck_and_setup_IN() {} 00327 typecheck_and_setup_IN(Query query) : query(query) {} 00328 rpc_message *clone() const { return new typecheck_and_setup_IN(*this); } 00329 string as_string() const { 00330 string out; 00331 out << "typecheck_and_setup_IN{" << "query=" << query << "}"; 00332 return out; 00333 } 00334 NMSTL_SIMPLY_SERIALIZABLE(typecheck_and_setup_IN, << Serial::tag("param") << query ); 00335 NMSTL_SERIAL_TAG(typecheck_and_setup_IN, "params"); 00336 }; 00337 class typecheck_and_setup_OUT { 00338 RPCStatus m_stat; 00339 Query m_val; 00340 public: 00341 operator const void * () const { return m_stat; } 00342 const RPCStatus& stat() const { return m_stat; } 00343 Query get() const { return m_val; } 00344 }; 00345 static const int load_box_METHOD_ID = 20003; 00346 struct load_box_IN : public rpc_request_1< void > { 00347 string file_path; 00348 load_box_IN() {} 00349 load_box_IN(string file_path) : file_path(file_path) {} 00350 rpc_message *clone() const { return new load_box_IN(*this); } 00351 string as_string() const { 00352 string out; 00353 out << "load_box_IN{" << "file_path=" << file_path << "}"; 00354 return out; 00355 } 00356 NMSTL_SIMPLY_SERIALIZABLE(load_box_IN, << Serial::tag("param") << file_path ); 00357 NMSTL_SERIAL_TAG(load_box_IN, "params"); 00358 }; 00359 class load_box_OUT { 00360 RPCStatus m_stat; 00361 public: 00362 operator const void * () const { return m_stat; } 00363 const RPCStatus& stat() const { return m_stat; } 00364 bool get() const { return true; } 00365 }; 00366 static const int choke_queries_METHOD_ID = 20004; 00367 struct choke_queries_IN : public rpc_request_1< vector<Query> > { 00368 vector<Query> queries; 00369 choke_queries_IN() {} 00370 choke_queries_IN(vector<Query> queries) : queries(queries) {} 00371 rpc_message *clone() const { return new choke_queries_IN(*this); } 00372 string as_string() const { 00373 string out; 00374 out << "choke_queries_IN{" << "queries=" << queries << "}"; 00375 return out; 00376 } 00377 NMSTL_SIMPLY_SERIALIZABLE(choke_queries_IN, << Serial::tag("param") << queries ); 00378 NMSTL_SERIAL_TAG(choke_queries_IN, "params"); 00379 }; 00380 class choke_queries_OUT { 00381 RPCStatus m_stat; 00382 vector<Query> m_val; 00383 public: 00384 operator const void * () const { return m_stat; } 00385 const RPCStatus& stat() const { return m_stat; } 00386 vector<Query> get() const { return m_val; } 00387 }; 00388 static const int resume_queries_METHOD_ID = 20005; 00389 struct resume_queries_IN : public rpc_request_1< void > { 00390 vector<Query> queries; 00391 resume_queries_IN() {} 00392 resume_queries_IN(vector<Query> queries) : queries(queries) {} 00393 rpc_message *clone() const { return new resume_queries_IN(*this); } 00394 string as_string() const { 00395 string out; 00396 out << "resume_queries_IN{" << "queries=" << queries << "}"; 00397 return out; 00398 } 00399 NMSTL_SIMPLY_SERIALIZABLE(resume_queries_IN, << Serial::tag("param") << queries ); 00400 NMSTL_SERIAL_TAG(resume_queries_IN, "params"); 00401 }; 00402 class resume_queries_OUT { 00403 RPCStatus m_stat; 00404 public: 00405 operator const void * () const { return m_stat; } 00406 const RPCStatus& stat() const { return m_stat; } 00407 bool get() const { return true; } 00408 }; 00409 static const int pack_query_METHOD_ID = 20006; 00410 struct pack_query_IN : public rpc_request_1< Query > { 00411 Query query; 00412 pack_query_IN() {} 00413 pack_query_IN(Query query) : query(query) {} 00414 rpc_message *clone() const { return new pack_query_IN(*this); } 00415 string as_string() const { 00416 string out; 00417 out << "pack_query_IN{" << "query=" << query << "}"; 00418 return out; 00419 } 00420 NMSTL_SIMPLY_SERIALIZABLE(pack_query_IN, << Serial::tag("param") << query ); 00421 NMSTL_SERIAL_TAG(pack_query_IN, "params"); 00422 }; 00423 class pack_query_OUT { 00424 RPCStatus m_stat; 00425 Query m_val; 00426 public: 00427 operator const void * () const { return m_stat; } 00428 const RPCStatus& stat() const { return m_stat; } 00429 Query get() const { return m_val; } 00430 }; 00431 static const int pack_queries_METHOD_ID = 20007; 00432 struct pack_queries_IN : public rpc_request_1< vector<Query> > { 00433 vector<Query> queries; 00434 pack_queries_IN() {} 00435 pack_queries_IN(vector<Query> queries) : queries(queries) {} 00436 rpc_message *clone() const { return new pack_queries_IN(*this); } 00437 string as_string() const { 00438 string out; 00439 out << "pack_queries_IN{" << "queries=" << queries << "}"; 00440 return out; 00441 } 00442 NMSTL_SIMPLY_SERIALIZABLE(pack_queries_IN, << Serial::tag("param") << queries ); 00443 NMSTL_SERIAL_TAG(pack_queries_IN, "params"); 00444 }; 00445 class pack_queries_OUT { 00446 RPCStatus m_stat; 00447 vector<Query> m_val; 00448 public: 00449 operator const void * () const { return m_stat; } 00450 const RPCStatus& stat() const { return m_stat; } 00451 vector<Query> get() const { return m_val; } 00452 }; 00453 static const int remove_query_METHOD_ID = 20008; 00454 struct remove_query_IN : public rpc_request_1< void > { 00455 Query query; 00456 remove_query_IN() {} 00457 remove_query_IN(Query query) : query(query) {} 00458 rpc_message *clone() const { return new remove_query_IN(*this); } 00459 string as_string() const { 00460 string out; 00461 out << "remove_query_IN{" << "query=" << query << "}"; 00462 return out; 00463 } 00464 NMSTL_SIMPLY_SERIALIZABLE(remove_query_IN, << Serial::tag("param") << query ); 00465 NMSTL_SERIAL_TAG(remove_query_IN, "params"); 00466 }; 00467 class remove_query_OUT { 00468 RPCStatus m_stat; 00469 public: 00470 operator const void * () const { return m_stat; } 00471 const RPCStatus& stat() const { return m_stat; } 00472 bool get() const { return true; } 00473 }; 00474 static const int replace_query_METHOD_ID = 20009; 00475 struct replace_query_IN : public rpc_request_1< void > { 00476 vector<Name> old_queries; vector<Query> new_queries; 00477 replace_query_IN() {} 00478 replace_query_IN(vector<Name> old_queries, vector<Query> new_queries) : old_queries(old_queries), new_queries(new_queries) {} 00479 rpc_message *clone() const { return new replace_query_IN(*this); } 00480 string as_string() const { 00481 string out; 00482 out << "replace_query_IN{" << "old_queries=" << old_queries << ", new_queries=" << new_queries << "}"; 00483 return out; 00484 } 00485 NMSTL_SIMPLY_SERIALIZABLE(replace_query_IN, << Serial::tag("param") << old_queries << new_queries ); 00486 NMSTL_SERIAL_TAG(replace_query_IN, "params"); 00487 }; 00488 class replace_query_OUT { 00489 RPCStatus m_stat; 00490 public: 00491 operator const void * () const { return m_stat; } 00492 const RPCStatus& stat() const { return m_stat; } 00493 bool get() const { return true; } 00494 }; 00495 static const int set_query_status_METHOD_ID = 20010; 00496 struct set_query_status_IN : public rpc_request_1< void > { 00497 Name name; QueryStatus status; 00498 set_query_status_IN() {} 00499 set_query_status_IN(Name name, QueryStatus status) : name(name), status(status) {} 00500 rpc_message *clone() const { return new set_query_status_IN(*this); } 00501 string as_string() const { 00502 string out; 00503 out << "set_query_status_IN{" << "name=" << name << ", status=" << status << "}"; 00504 return out; 00505 } 00506 NMSTL_SIMPLY_SERIALIZABLE(set_query_status_IN, << Serial::tag("param") << name << status ); 00507 NMSTL_SERIAL_TAG(set_query_status_IN, "params"); 00508 }; 00509 class set_query_status_OUT { 00510 RPCStatus m_stat; 00511 public: 00512 operator const void * () const { return m_stat; } 00513 const RPCStatus& stat() const { return m_stat; } 00514 bool get() const { return true; } 00515 }; 00516 static const int set_queries_status_METHOD_ID = 20011; 00517 struct set_queries_status_IN : public rpc_request_1< void > { 00518 vector<Name> name; QueryStatus status; 00519 set_queries_status_IN() {} 00520 set_queries_status_IN(vector<Name> name, QueryStatus status) : name(name), status(status) {} 00521 rpc_message *clone() const { return new set_queries_status_IN(*this); } 00522 string as_string() const { 00523 string out; 00524 out << "set_queries_status_IN{" << "name=" << name << ", status=" << status << "}"; 00525 return out; 00526 } 00527 NMSTL_SIMPLY_SERIALIZABLE(set_queries_status_IN, << Serial::tag("param") << name << status ); 00528 NMSTL_SERIAL_TAG(set_queries_status_IN, "params"); 00529 }; 00530 class set_queries_status_OUT { 00531 RPCStatus m_stat; 00532 public: 00533 operator const void * () const { return m_stat; } 00534 const RPCStatus& stat() const { return m_stat; } 00535 bool get() const { return true; } 00536 }; 00537 static const int subscribe_METHOD_ID = 20012; 00538 struct subscribe_IN : public rpc_request_1< void > { 00539 Subscription sub; unsigned int add_or_remove; 00540 subscribe_IN() {} 00541 subscribe_IN(Subscription sub, unsigned int add_or_remove) : sub(sub), add_or_remove(add_or_remove) {} 00542 rpc_message *clone() const { return new subscribe_IN(*this); } 00543 string as_string() const { 00544 string out; 00545 out << "subscribe_IN{" << "sub=" << sub << ", add_or_remove=" << add_or_remove << "}"; 00546 return out; 00547 } 00548 NMSTL_SIMPLY_SERIALIZABLE(subscribe_IN, << Serial::tag("param") << sub << add_or_remove ); 00549 NMSTL_SERIAL_TAG(subscribe_IN, "params"); 00550 }; 00551 class subscribe_OUT { 00552 RPCStatus m_stat; 00553 public: 00554 operator const void * () const { return m_stat; } 00555 const RPCStatus& stat() const { return m_stat; } 00556 bool get() const { return true; } 00557 }; 00558 static const int subscribe_many_METHOD_ID = 20013; 00559 struct subscribe_many_IN : public rpc_request_1< vector<Subscription> > { 00560 vector<Subscription> sub; unsigned int add_or_remove; 00561 subscribe_many_IN() {} 00562 subscribe_many_IN(vector<Subscription> sub, unsigned int add_or_remove) : sub(sub), add_or_remove(add_or_remove) {} 00563 rpc_message *clone() const { return new subscribe_many_IN(*this); } 00564 string as_string() const { 00565 string out; 00566 out << "subscribe_many_IN{" << "sub=" << sub << ", add_or_remove=" << add_or_remove << "}"; 00567 return out; 00568 } 00569 NMSTL_SIMPLY_SERIALIZABLE(subscribe_many_IN, << Serial::tag("param") << sub << add_or_remove ); 00570 NMSTL_SERIAL_TAG(subscribe_many_IN, "params"); 00571 }; 00572 class subscribe_many_OUT { 00573 RPCStatus m_stat; 00574 vector<Subscription> m_val; 00575 public: 00576 operator const void * () const { return m_stat; } 00577 const RPCStatus& stat() const { return m_stat; } 00578 vector<Subscription> get() const { return m_val; } 00579 }; 00580 static const int get_subscriptions_METHOD_ID = 20014; 00581 struct get_subscriptions_IN : public rpc_request_1< vector<Subscription> > { 00582 vector<Name> streams; 00583 get_subscriptions_IN() {} 00584 get_subscriptions_IN(vector<Name> streams) : streams(streams) {} 00585 rpc_message *clone() const { return new get_subscriptions_IN(*this); } 00586 string as_string() const { 00587 string out; 00588 out << "get_subscriptions_IN{" << "streams=" << streams << "}"; 00589 return out; 00590 } 00591 NMSTL_SIMPLY_SERIALIZABLE(get_subscriptions_IN, << Serial::tag("param") << streams ); 00592 NMSTL_SERIAL_TAG(get_subscriptions_IN, "params"); 00593 }; 00594 class get_subscriptions_OUT { 00595 RPCStatus m_stat; 00596 vector<Subscription> m_val; 00597 public: 00598 operator const void * () const { return m_stat; } 00599 const RPCStatus& stat() const { return m_stat; } 00600 vector<Subscription> get() const { return m_val; } 00601 }; 00602 static const int get_stats_METHOD_ID = 20015; 00603 struct get_stats_IN : public rpc_request_1< vector<Stats> > { 00604 00605 get_stats_IN() {} 00606 rpc_message *clone() const { return new get_stats_IN(*this); } 00607 string as_string() const { 00608 string out; 00609 out << "get_stats_IN{" << "}"; 00610 return out; 00611 } 00612 NMSTL_SIMPLY_SERIALIZABLE(get_stats_IN, << Serial::tag("param") ); 00613 NMSTL_SERIAL_TAG(get_stats_IN, "params"); 00614 }; 00615 class get_stats_OUT { 00616 RPCStatus m_stat; 00617 vector<Stats> m_val; 00618 public: 00619 operator const void * () const { return m_stat; } 00620 const RPCStatus& stat() const { return m_stat; } 00621 vector<Stats> get() const { return m_val; } 00622 }; 00623 static const int get_sel_METHOD_ID = 20016; 00624 struct get_sel_IN : public rpc_request_1< double > { 00625 00626 get_sel_IN() {} 00627 rpc_message *clone() const { return new get_sel_IN(*this); } 00628 string as_string() const { 00629 string out; 00630 out << "get_sel_IN{" << "}"; 00631 return out; 00632 } 00633 NMSTL_SIMPLY_SERIALIZABLE(get_sel_IN, << Serial::tag("param") ); 00634 NMSTL_SERIAL_TAG(get_sel_IN, "params"); 00635 }; 00636 class get_sel_OUT { 00637 RPCStatus m_stat; 00638 double m_val; 00639 public: 00640 operator const void * () const { return m_stat; } 00641 const RPCStatus& stat() const { return m_stat; } 00642 double get() const { return m_val; } 00643 }; 00644 static const int create_stream_METHOD_ID = 20017; 00645 struct create_stream_IN : public rpc_request_1< void > { 00646 StreamDef stream; 00647 create_stream_IN() {} 00648 create_stream_IN(StreamDef stream) : stream(stream) {} 00649 rpc_message *clone() const { return new create_stream_IN(*this); } 00650 string as_string() const { 00651 string out; 00652 out << "create_stream_IN{" << "stream=" << stream << "}"; 00653 return out; 00654 } 00655 NMSTL_SIMPLY_SERIALIZABLE(create_stream_IN, << Serial::tag("param") << stream ); 00656 NMSTL_SERIAL_TAG(create_stream_IN, "params"); 00657 }; 00658 class create_stream_OUT { 00659 RPCStatus m_stat; 00660 public: 00661 operator const void * () const { return m_stat; } 00662 const RPCStatus& stat() const { return m_stat; } 00663 bool get() const { return true; } 00664 }; 00665 static const int create_cpview_METHOD_ID = 20018; 00666 struct create_cpview_IN : public rpc_request_1< void > { 00667 CPViewDescription view_desc; StreamDef streamdef; 00668 create_cpview_IN() {} 00669 create_cpview_IN(CPViewDescription view_desc, StreamDef streamdef) : view_desc(view_desc), streamdef(streamdef) {} 00670 rpc_message *clone() const { return new create_cpview_IN(*this); } 00671 string as_string() const { 00672 string out; 00673 out << "create_cpview_IN{" << "view_desc=" << view_desc << ", streamdef=" << streamdef << "}"; 00674 return out; 00675 } 00676 NMSTL_SIMPLY_SERIALIZABLE(create_cpview_IN, << Serial::tag("param") << view_desc << streamdef ); 00677 NMSTL_SERIAL_TAG(create_cpview_IN, "params"); 00678 }; 00679 class create_cpview_OUT { 00680 RPCStatus m_stat; 00681 public: 00682 operator const void * () const { return m_stat; } 00683 const RPCStatus& stat() const { return m_stat; } 00684 bool get() const { return true; } 00685 }; 00686 static const int update_stream_METHOD_ID = 20019; 00687 struct update_stream_IN : public rpc_request_1< void > { 00688 StreamDef old_sd; StreamDef new_sd; 00689 update_stream_IN() {} 00690 update_stream_IN(StreamDef old_sd, StreamDef new_sd) : old_sd(old_sd), new_sd(new_sd) {} 00691 rpc_message *clone() const { return new update_stream_IN(*this); } 00692 string as_string() const { 00693 string out; 00694 out << "update_stream_IN{" << "old_sd=" << old_sd << ", new_sd=" << new_sd << "}"; 00695 return out; 00696 } 00697 NMSTL_SIMPLY_SERIALIZABLE(update_stream_IN, << Serial::tag("param") << old_sd << new_sd ); 00698 NMSTL_SERIAL_TAG(update_stream_IN, "params"); 00699 }; 00700 class update_stream_OUT { 00701 RPCStatus m_stat; 00702 public: 00703 operator const void * () const { return m_stat; } 00704 const RPCStatus& stat() const { return m_stat; } 00705 bool get() const { return true; } 00706 }; 00707 static const int ack_METHOD_ID = 20020; 00708 struct ack_IN : public rpc_request_1< void > { 00709 MedusaID node; StreamID id; string last_tuple; 00710 ack_IN() {} 00711 ack_IN(MedusaID node, StreamID id, string last_tuple) : node(node), id(id), last_tuple(last_tuple) {} 00712 rpc_message *clone() const { return new ack_IN(*this); } 00713 string as_string() const { 00714 string out; 00715 out << "ack_IN{" << "node=" << node << ", id=" << id << ", last_tuple=" << last_tuple << "}"; 00716 return out; 00717 } 00718 NMSTL_SIMPLY_SERIALIZABLE(ack_IN, << Serial::tag("param") << node << id << last_tuple ); 00719 NMSTL_SERIAL_TAG(ack_IN, "params"); 00720 }; 00721 class ack_OUT { 00722 RPCStatus m_stat; 00723 public: 00724 operator const void * () const { return m_stat; } 00725 const RPCStatus& stat() const { return m_stat; } 00726 bool get() const { return true; } 00727 }; 00728 static const int trim_METHOD_ID = 20021; 00729 struct trim_IN : public rpc_request_1< void > { 00730 MedusaID node; StreamID id; string last_tuple; 00731 trim_IN() {} 00732 trim_IN(MedusaID node, StreamID id, string last_tuple) : node(node), id(id), last_tuple(last_tuple) {} 00733 rpc_message *clone() const { return new trim_IN(*this); } 00734 string as_string() const { 00735 string out; 00736 out << "trim_IN{" << "node=" << node << ", id=" << id << ", last_tuple=" << last_tuple << "}"; 00737 return out; 00738 } 00739 NMSTL_SIMPLY_SERIALIZABLE(trim_IN, << Serial::tag("param") << node << id << last_tuple ); 00740 NMSTL_SERIAL_TAG(trim_IN, "params"); 00741 }; 00742 class trim_OUT { 00743 RPCStatus m_stat; 00744 public: 00745 operator const void * () const { return m_stat; } 00746 const RPCStatus& stat() const { return m_stat; } 00747 bool get() const { return true; } 00748 }; 00749 static const int set_recovery_method_METHOD_ID = 20022; 00750 struct set_recovery_method_IN : public rpc_request_1< void > { 00751 int method; 00752 set_recovery_method_IN() {} 00753 set_recovery_method_IN(int method) : method(method) {} 00754 rpc_message *clone() const { return new set_recovery_method_IN(*this); } 00755 string as_string() const { 00756 string out; 00757 out << "set_recovery_method_IN{" << "method=" << method << "}"; 00758 return out; 00759 } 00760 NMSTL_SIMPLY_SERIALIZABLE(set_recovery_method_IN, << Serial::tag("param") << method ); 00761 NMSTL_SERIAL_TAG(set_recovery_method_IN, "params"); 00762 }; 00763 class set_recovery_method_OUT { 00764 RPCStatus m_stat; 00765 public: 00766 operator const void * () const { return m_stat; } 00767 const RPCStatus& stat() const { return m_stat; } 00768 bool get() const { return true; } 00769 }; 00770 static const int set_primary_status_METHOD_ID = 20023; 00771 struct set_primary_status_IN : public rpc_request_1< void > { 00772 bool status; 00773 set_primary_status_IN() {} 00774 set_primary_status_IN(bool status) : status(status) {} 00775 rpc_message *clone() const { return new set_primary_status_IN(*this); } 00776 string as_string() const { 00777 string out; 00778 out << "set_primary_status_IN{" << "status=" << status << "}"; 00779 return out; 00780 } 00781 NMSTL_SIMPLY_SERIALIZABLE(set_primary_status_IN, << Serial::tag("param") << status ); 00782 NMSTL_SERIAL_TAG(set_primary_status_IN, "params"); 00783 }; 00784 class set_primary_status_OUT { 00785 RPCStatus m_stat; 00786 public: 00787 operator const void * () const { return m_stat; } 00788 const RPCStatus& stat() const { return m_stat; } 00789 bool get() const { return true; } 00790 }; 00791 static const int set_secondaries_METHOD_ID = 20024; 00792 struct set_secondaries_IN : public rpc_request_1< void > { 00793 vector<MedusaID> secondaries; 00794 set_secondaries_IN() {} 00795 set_secondaries_IN(vector<MedusaID> secondaries) : secondaries(secondaries) {} 00796 rpc_message *clone() const { return new set_secondaries_IN(*this); } 00797 string as_string() const { 00798 string out; 00799 out << "set_secondaries_IN{" << "secondaries=" << secondaries << "}"; 00800 return out; 00801 } 00802 NMSTL_SIMPLY_SERIALIZABLE(set_secondaries_IN, << Serial::tag("param") << secondaries ); 00803 NMSTL_SERIAL_TAG(set_secondaries_IN, "params"); 00804 }; 00805 class set_secondaries_OUT { 00806 RPCStatus m_stat; 00807 public: 00808 operator const void * () const { return m_stat; } 00809 const RPCStatus& stat() const { return m_stat; } 00810 bool get() const { return true; } 00811 }; 00812 static const int set_replicas_METHOD_ID = 20025; 00813 struct set_replicas_IN : public rpc_request_1< void > { 00814 vector<MedusaID> replicas; 00815 set_replicas_IN() {} 00816 set_replicas_IN(vector<MedusaID> replicas) : replicas(replicas) {} 00817 rpc_message *clone() const { return new set_replicas_IN(*this); } 00818 string as_string() const { 00819 string out; 00820 out << "set_replicas_IN{" << "replicas=" << replicas << "}"; 00821 return out; 00822 } 00823 NMSTL_SIMPLY_SERIALIZABLE(set_replicas_IN, << Serial::tag("param") << replicas ); 00824 NMSTL_SERIAL_TAG(set_replicas_IN, "params"); 00825 }; 00826 class set_replicas_OUT { 00827 RPCStatus m_stat; 00828 public: 00829 operator const void * () const { return m_stat; } 00830 const RPCStatus& stat() const { return m_stat; } 00831 bool get() const { return true; } 00832 }; 00833 static const int checkpoint_METHOD_ID = 20026; 00834 struct checkpoint_IN : public rpc_request_1< void > { 00835 vector<StreamEvent> tuples_to_checkpoint; map<StreamID,string> tuples_to_trim; 00836 checkpoint_IN() {} 00837 checkpoint_IN(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim) : tuples_to_checkpoint(tuples_to_checkpoint), tuples_to_trim(tuples_to_trim) {} 00838 rpc_message *clone() const { return new checkpoint_IN(*this); } 00839 string as_string() const { 00840 string out; 00841 out << "checkpoint_IN{" << "tuples_to_checkpoint=" << tuples_to_checkpoint << ", tuples_to_trim=" << tuples_to_trim << "}"; 00842 return out; 00843 } 00844 NMSTL_SIMPLY_SERIALIZABLE(checkpoint_IN, << Serial::tag("param") << tuples_to_checkpoint << tuples_to_trim ); 00845 NMSTL_SERIAL_TAG(checkpoint_IN, "params"); 00846 }; 00847 class checkpoint_OUT { 00848 RPCStatus m_stat; 00849 public: 00850 operator const void * () const { return m_stat; } 00851 const RPCStatus& stat() const { return m_stat; } 00852 bool get() const { return true; } 00853 }; 00854 static const vector<pair<string, int> >& method_list() { 00855 static const pair<string, int> methods[] = { 00856 pair<string, int>("typecheck", 20000), 00857 pair<string, int>("setup_query", 20001), 00858 pair<string, int>("typecheck_and_setup", 20002), 00859 pair<string, int>("load_box", 20003), 00860 pair<string, int>("choke_queries", 20004), 00861 pair<string, int>("resume_queries", 20005), 00862 pair<string, int>("pack_query", 20006), 00863 pair<string, int>("pack_queries", 20007), 00864 pair<string, int>("remove_query", 20008), 00865 pair<string, int>("replace_query", 20009), 00866 pair<string, int>("set_query_status", 20010), 00867 pair<string, int>("set_queries_status", 20011), 00868 pair<string, int>("subscribe", 20012), 00869 pair<string, int>("subscribe_many", 20013), 00870 pair<string, int>("get_subscriptions", 20014), 00871 pair<string, int>("get_stats", 20015), 00872 pair<string, int>("get_sel", 20016), 00873 pair<string, int>("create_stream", 20017), 00874 pair<string, int>("create_cpview", 20018), 00875 pair<string, int>("update_stream", 20019), 00876 pair<string, int>("ack", 20020), 00877 pair<string, int>("trim", 20021), 00878 pair<string, int>("set_recovery_method", 20022), 00879 pair<string, int>("set_primary_status", 20023), 00880 pair<string, int>("set_secondaries", 20024), 00881 pair<string, int>("set_replicas", 20025), 00882 pair<string, int>("checkpoint", 20026) 00883 }; 00884 static const vector<pair<string, int> > ret(methods, methods + sizeof methods / sizeof methods[0]); 00885 return ret; 00886 }; 00887 static const map<string, int>& method_id_by_name() { 00888 static const map<string, int> ret(method_list().begin(), method_list().end()); 00889 return ret; 00890 } 00891 static const map<int, string>& method_name_by_id() { 00892 static const map<int, string> ret = RPCObject::reverse_map(method_id_by_name()); 00893 return ret; 00894 } 00895 }; 00896 template<> 00897 class Remote<Borealis::QueryProcessor> : public RemoteObject { 00898 friend class RPCClient; 00899 protected: 00900 Remote<Borealis::QueryProcessor>(RPCClient& cli, rpc_object_id object_id) : 00901 RemoteObject(cli, object_id) {} 00902 00903 public: 00904 Remote<Borealis::QueryProcessor>() {} 00905 void typecheck(const Callback<void, RPC< Query > >& completion, Query query) const { 00906 RPCDefs<Borealis::QueryProcessor>::typecheck_IN *req = 00907 new RPCDefs<Borealis::QueryProcessor>::typecheck_IN(query); 00908 req->object_id = object_id(); 00909 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID; 00910 req->method_name = "typecheck"; 00911 req->cb = completion; 00912 client().send_request(ptr<rpc_request>(req)); 00913 } 00914 RPC< Query > typecheck(Query query) const { 00915 RPCDefs<Borealis::QueryProcessor>::typecheck_IN *req = 00916 new RPCDefs<Borealis::QueryProcessor>::typecheck_IN(query); 00917 ptr<rpc_request> preq(req); 00918 req->object_id = object_id(); 00919 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID; 00920 req->method_name = "typecheck"; 00921 bool done = false; RPC< Query > ret; 00922 req->cb = wrap(&rpc_sync_completion< Query >, &done, &ret); 00923 client().send_request(preq); 00924 while (!done && client().block()) ; 00925 req->cb = Callback<void, RPC< Query > >(); 00926 return ret; 00927 } 00928 typedef const RPCDefs<Borealis::QueryProcessor>::typecheck_OUT &typecheck_result; 00929 void setup_query(const Callback<void, RPC< void > >& completion, Query query) const { 00930 RPCDefs<Borealis::QueryProcessor>::setup_query_IN *req = 00931 new RPCDefs<Borealis::QueryProcessor>::setup_query_IN(query); 00932 req->object_id = object_id(); 00933 req->method_id = RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID; 00934 req->method_name = "setup_query"; 00935 req->cb = completion; 00936 client().send_request(ptr<rpc_request>(req)); 00937 } 00938 RPC< void > setup_query(Query query) const { 00939 RPCDefs<Borealis::QueryProcessor>::setup_query_IN *req = 00940 new RPCDefs<Borealis::QueryProcessor>::setup_query_IN(query); 00941 ptr<rpc_request> preq(req); 00942 req->object_id = object_id(); 00943 req->method_id = RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID; 00944 req->method_name = "setup_query"; 00945 bool done = false; RPC< void > ret; 00946 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 00947 client().send_request(preq); 00948 while (!done && client().block()) ; 00949 req->cb = Callback<void, RPC< void > >(); 00950 return ret; 00951 } 00952 typedef const RPCDefs<Borealis::QueryProcessor>::setup_query_OUT &setup_query_result; 00953 void typecheck_and_setup(const Callback<void, RPC< Query > >& completion, Query query) const { 00954 RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN *req = 00955 new RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN(query); 00956 req->object_id = object_id(); 00957 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID; 00958 req->method_name = "typecheck_and_setup"; 00959 req->cb = completion; 00960 client().send_request(ptr<rpc_request>(req)); 00961 } 00962 RPC< Query > typecheck_and_setup(Query query) const { 00963 RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN *req = 00964 new RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN(query); 00965 ptr<rpc_request> preq(req); 00966 req->object_id = object_id(); 00967 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID; 00968 req->method_name = "typecheck_and_setup"; 00969 bool done = false; RPC< Query > ret; 00970 req->cb = wrap(&rpc_sync_completion< Query >, &done, &ret); 00971 client().send_request(preq); 00972 while (!done && client().block()) ; 00973 req->cb = Callback<void, RPC< Query > >(); 00974 return ret; 00975 } 00976 typedef const RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_OUT &typecheck_and_setup_result; 00977 void load_box(const Callback<void, RPC< void > >& completion, string file_path) const { 00978 RPCDefs<Borealis::QueryProcessor>::load_box_IN *req = 00979 new RPCDefs<Borealis::QueryProcessor>::load_box_IN(file_path); 00980 req->object_id = object_id(); 00981 req->method_id = RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID; 00982 req->method_name = "load_box"; 00983 req->cb = completion; 00984 client().send_request(ptr<rpc_request>(req)); 00985 } 00986 RPC< void > load_box(string file_path) const { 00987 RPCDefs<Borealis::QueryProcessor>::load_box_IN *req = 00988 new RPCDefs<Borealis::QueryProcessor>::load_box_IN(file_path); 00989 ptr<rpc_request> preq(req); 00990 req->object_id = object_id(); 00991 req->method_id = RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID; 00992 req->method_name = "load_box"; 00993 bool done = false; RPC< void > ret; 00994 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 00995 client().send_request(preq); 00996 while (!done && client().block()) ; 00997 req->cb = Callback<void, RPC< void > >(); 00998 return ret; 00999 } 01000 typedef const RPCDefs<Borealis::QueryProcessor>::load_box_OUT &load_box_result; 01001 void choke_queries(const Callback<void, RPC< vector<Query> > >& completion, vector<Query> queries) const { 01002 RPCDefs<Borealis::QueryProcessor>::choke_queries_IN *req = 01003 new RPCDefs<Borealis::QueryProcessor>::choke_queries_IN(queries); 01004 req->object_id = object_id(); 01005 req->method_id = RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID; 01006 req->method_name = "choke_queries"; 01007 req->cb = completion; 01008 client().send_request(ptr<rpc_request>(req)); 01009 } 01010 RPC< vector<Query> > choke_queries(vector<Query> queries) const { 01011 RPCDefs<Borealis::QueryProcessor>::choke_queries_IN *req = 01012 new RPCDefs<Borealis::QueryProcessor>::choke_queries_IN(queries); 01013 ptr<rpc_request> preq(req); 01014 req->object_id = object_id(); 01015 req->method_id = RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID; 01016 req->method_name = "choke_queries"; 01017 bool done = false; RPC< vector<Query> > ret; 01018 req->cb = wrap(&rpc_sync_completion< vector<Query> >, &done, &ret); 01019 client().send_request(preq); 01020 while (!done && client().block()) ; 01021 req->cb = Callback<void, RPC< vector<Query> > >(); 01022 return ret; 01023 } 01024 typedef const RPCDefs<Borealis::QueryProcessor>::choke_queries_OUT &choke_queries_result; 01025 void resume_queries(const Callback<void, RPC< void > >& completion, vector<Query> queries) const { 01026 RPCDefs<Borealis::QueryProcessor>::resume_queries_IN *req = 01027 new RPCDefs<Borealis::QueryProcessor>::resume_queries_IN(queries); 01028 req->object_id = object_id(); 01029 req->method_id = RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID; 01030 req->method_name = "resume_queries"; 01031 req->cb = completion; 01032 client().send_request(ptr<rpc_request>(req)); 01033 } 01034 RPC< void > resume_queries(vector<Query> queries) const { 01035 RPCDefs<Borealis::QueryProcessor>::resume_queries_IN *req = 01036 new RPCDefs<Borealis::QueryProcessor>::resume_queries_IN(queries); 01037 ptr<rpc_request> preq(req); 01038 req->object_id = object_id(); 01039 req->method_id = RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID; 01040 req->method_name = "resume_queries"; 01041 bool done = false; RPC< void > ret; 01042 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01043 client().send_request(preq); 01044 while (!done && client().block()) ; 01045 req->cb = Callback<void, RPC< void > >(); 01046 return ret; 01047 } 01048 typedef const RPCDefs<Borealis::QueryProcessor>::resume_queries_OUT &resume_queries_result; 01049 void pack_query(const Callback<void, RPC< Query > >& completion, Query query) const { 01050 RPCDefs<Borealis::QueryProcessor>::pack_query_IN *req = 01051 new RPCDefs<Borealis::QueryProcessor>::pack_query_IN(query); 01052 req->object_id = object_id(); 01053 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID; 01054 req->method_name = "pack_query"; 01055 req->cb = completion; 01056 client().send_request(ptr<rpc_request>(req)); 01057 } 01058 RPC< Query > pack_query(Query query) const { 01059 RPCDefs<Borealis::QueryProcessor>::pack_query_IN *req = 01060 new RPCDefs<Borealis::QueryProcessor>::pack_query_IN(query); 01061 ptr<rpc_request> preq(req); 01062 req->object_id = object_id(); 01063 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID; 01064 req->method_name = "pack_query"; 01065 bool done = false; RPC< Query > ret; 01066 req->cb = wrap(&rpc_sync_completion< Query >, &done, &ret); 01067 client().send_request(preq); 01068 while (!done && client().block()) ; 01069 req->cb = Callback<void, RPC< Query > >(); 01070 return ret; 01071 } 01072 typedef const RPCDefs<Borealis::QueryProcessor>::pack_query_OUT &pack_query_result; 01073 void pack_queries(const Callback<void, RPC< vector<Query> > >& completion, vector<Query> queries) const { 01074 RPCDefs<Borealis::QueryProcessor>::pack_queries_IN *req = 01075 new RPCDefs<Borealis::QueryProcessor>::pack_queries_IN(queries); 01076 req->object_id = object_id(); 01077 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID; 01078 req->method_name = "pack_queries"; 01079 req->cb = completion; 01080 client().send_request(ptr<rpc_request>(req)); 01081 } 01082 RPC< vector<Query> > pack_queries(vector<Query> queries) const { 01083 RPCDefs<Borealis::QueryProcessor>::pack_queries_IN *req = 01084 new RPCDefs<Borealis::QueryProcessor>::pack_queries_IN(queries); 01085 ptr<rpc_request> preq(req); 01086 req->object_id = object_id(); 01087 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID; 01088 req->method_name = "pack_queries"; 01089 bool done = false; RPC< vector<Query> > ret; 01090 req->cb = wrap(&rpc_sync_completion< vector<Query> >, &done, &ret); 01091 client().send_request(preq); 01092 while (!done && client().block()) ; 01093 req->cb = Callback<void, RPC< vector<Query> > >(); 01094 return ret; 01095 } 01096 typedef const RPCDefs<Borealis::QueryProcessor>::pack_queries_OUT &pack_queries_result; 01097 void remove_query(const Callback<void, RPC< void > >& completion, Query query) const { 01098 RPCDefs<Borealis::QueryProcessor>::remove_query_IN *req = 01099 new RPCDefs<Borealis::QueryProcessor>::remove_query_IN(query); 01100 req->object_id = object_id(); 01101 req->method_id = RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID; 01102 req->method_name = "remove_query"; 01103 req->cb = completion; 01104 client().send_request(ptr<rpc_request>(req)); 01105 } 01106 RPC< void > remove_query(Query query) const { 01107 RPCDefs<Borealis::QueryProcessor>::remove_query_IN *req = 01108 new RPCDefs<Borealis::QueryProcessor>::remove_query_IN(query); 01109 ptr<rpc_request> preq(req); 01110 req->object_id = object_id(); 01111 req->method_id = RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID; 01112 req->method_name = "remove_query"; 01113 bool done = false; RPC< void > ret; 01114 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01115 client().send_request(preq); 01116 while (!done && client().block()) ; 01117 req->cb = Callback<void, RPC< void > >(); 01118 return ret; 01119 } 01120 typedef const RPCDefs<Borealis::QueryProcessor>::remove_query_OUT &remove_query_result; 01121 void replace_query(const Callback<void, RPC< void > >& completion, vector<Name> old_queries, vector<Query> new_queries) const { 01122 RPCDefs<Borealis::QueryProcessor>::replace_query_IN *req = 01123 new RPCDefs<Borealis::QueryProcessor>::replace_query_IN(old_queries, new_queries); 01124 req->object_id = object_id(); 01125 req->method_id = RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID; 01126 req->method_name = "replace_query"; 01127 req->cb = completion; 01128 client().send_request(ptr<rpc_request>(req)); 01129 } 01130 RPC< void > replace_query(vector<Name> old_queries, vector<Query> new_queries) const { 01131 RPCDefs<Borealis::QueryProcessor>::replace_query_IN *req = 01132 new RPCDefs<Borealis::QueryProcessor>::replace_query_IN(old_queries, new_queries); 01133 ptr<rpc_request> preq(req); 01134 req->object_id = object_id(); 01135 req->method_id = RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID; 01136 req->method_name = "replace_query"; 01137 bool done = false; RPC< void > ret; 01138 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01139 client().send_request(preq); 01140 while (!done && client().block()) ; 01141 req->cb = Callback<void, RPC< void > >(); 01142 return ret; 01143 } 01144 typedef const RPCDefs<Borealis::QueryProcessor>::replace_query_OUT &replace_query_result; 01145 void set_query_status(const Callback<void, RPC< void > >& completion, Name name, QueryStatus status) const { 01146 RPCDefs<Borealis::QueryProcessor>::set_query_status_IN *req = 01147 new RPCDefs<Borealis::QueryProcessor>::set_query_status_IN(name, status); 01148 req->object_id = object_id(); 01149 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID; 01150 req->method_name = "set_query_status"; 01151 req->cb = completion; 01152 client().send_request(ptr<rpc_request>(req)); 01153 } 01154 RPC< void > set_query_status(Name name, QueryStatus status) const { 01155 RPCDefs<Borealis::QueryProcessor>::set_query_status_IN *req = 01156 new RPCDefs<Borealis::QueryProcessor>::set_query_status_IN(name, status); 01157 ptr<rpc_request> preq(req); 01158 req->object_id = object_id(); 01159 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID; 01160 req->method_name = "set_query_status"; 01161 bool done = false; RPC< void > ret; 01162 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01163 client().send_request(preq); 01164 while (!done && client().block()) ; 01165 req->cb = Callback<void, RPC< void > >(); 01166 return ret; 01167 } 01168 typedef const RPCDefs<Borealis::QueryProcessor>::set_query_status_OUT &set_query_status_result; 01169 void set_queries_status(const Callback<void, RPC< void > >& completion, vector<Name> name, QueryStatus status) const { 01170 RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN *req = 01171 new RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN(name, status); 01172 req->object_id = object_id(); 01173 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID; 01174 req->method_name = "set_queries_status"; 01175 req->cb = completion; 01176 client().send_request(ptr<rpc_request>(req)); 01177 } 01178 RPC< void > set_queries_status(vector<Name> name, QueryStatus status) const { 01179 RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN *req = 01180 new RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN(name, status); 01181 ptr<rpc_request> preq(req); 01182 req->object_id = object_id(); 01183 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID; 01184 req->method_name = "set_queries_status"; 01185 bool done = false; RPC< void > ret; 01186 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01187 client().send_request(preq); 01188 while (!done && client().block()) ; 01189 req->cb = Callback<void, RPC< void > >(); 01190 return ret; 01191 } 01192 typedef const RPCDefs<Borealis::QueryProcessor>::set_queries_status_OUT &set_queries_status_result; 01193 void subscribe(const Callback<void, RPC< void > >& completion, Subscription sub, unsigned int add_or_remove) const { 01194 RPCDefs<Borealis::QueryProcessor>::subscribe_IN *req = 01195 new RPCDefs<Borealis::QueryProcessor>::subscribe_IN(sub, add_or_remove); 01196 req->object_id = object_id(); 01197 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID; 01198 req->method_name = "subscribe"; 01199 req->cb = completion; 01200 client().send_request(ptr<rpc_request>(req)); 01201 } 01202 RPC< void > subscribe(Subscription sub, unsigned int add_or_remove) const { 01203 RPCDefs<Borealis::QueryProcessor>::subscribe_IN *req = 01204 new RPCDefs<Borealis::QueryProcessor>::subscribe_IN(sub, add_or_remove); 01205 ptr<rpc_request> preq(req); 01206 req->object_id = object_id(); 01207 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID; 01208 req->method_name = "subscribe"; 01209 bool done = false; RPC< void > ret; 01210 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01211 client().send_request(preq); 01212 while (!done && client().block()) ; 01213 req->cb = Callback<void, RPC< void > >(); 01214 return ret; 01215 } 01216 typedef const RPCDefs<Borealis::QueryProcessor>::subscribe_OUT &subscribe_result; 01217 void subscribe_many(const Callback<void, RPC< vector<Subscription> > >& completion, vector<Subscription> sub, unsigned int add_or_remove) const { 01218 RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN *req = 01219 new RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN(sub, add_or_remove); 01220 req->object_id = object_id(); 01221 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID; 01222 req->method_name = "subscribe_many"; 01223 req->cb = completion; 01224 client().send_request(ptr<rpc_request>(req)); 01225 } 01226 RPC< vector<Subscription> > subscribe_many(vector<Subscription> sub, unsigned int add_or_remove) const { 01227 RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN *req = 01228 new RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN(sub, add_or_remove); 01229 ptr<rpc_request> preq(req); 01230 req->object_id = object_id(); 01231 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID; 01232 req->method_name = "subscribe_many"; 01233 bool done = false; RPC< vector<Subscription> > ret; 01234 req->cb = wrap(&rpc_sync_completion< vector<Subscription> >, &done, &ret); 01235 client().send_request(preq); 01236 while (!done && client().block()) ; 01237 req->cb = Callback<void, RPC< vector<Subscription> > >(); 01238 return ret; 01239 } 01240 typedef const RPCDefs<Borealis::QueryProcessor>::subscribe_many_OUT &subscribe_many_result; 01241 void get_subscriptions(const Callback<void, RPC< vector<Subscription> > >& completion, vector<Name> streams) const { 01242 RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN *req = 01243 new RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN(streams); 01244 req->object_id = object_id(); 01245 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID; 01246 req->method_name = "get_subscriptions"; 01247 req->cb = completion; 01248 client().send_request(ptr<rpc_request>(req)); 01249 } 01250 RPC< vector<Subscription> > get_subscriptions(vector<Name> streams) const { 01251 RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN *req = 01252 new RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN(streams); 01253 ptr<rpc_request> preq(req); 01254 req->object_id = object_id(); 01255 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID; 01256 req->method_name = "get_subscriptions"; 01257 bool done = false; RPC< vector<Subscription> > ret; 01258 req->cb = wrap(&rpc_sync_completion< vector<Subscription> >, &done, &ret); 01259 client().send_request(preq); 01260 while (!done && client().block()) ; 01261 req->cb = Callback<void, RPC< vector<Subscription> > >(); 01262 return ret; 01263 } 01264 typedef const RPCDefs<Borealis::QueryProcessor>::get_subscriptions_OUT &get_subscriptions_result; 01265 void get_stats(const Callback<void, RPC< vector<Stats> > >& completion) const { 01266 RPCDefs<Borealis::QueryProcessor>::get_stats_IN *req = 01267 new RPCDefs<Borealis::QueryProcessor>::get_stats_IN(); 01268 req->object_id = object_id(); 01269 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID; 01270 req->method_name = "get_stats"; 01271 req->cb = completion; 01272 client().send_request(ptr<rpc_request>(req)); 01273 } 01274 RPC< vector<Stats> > get_stats() const { 01275 RPCDefs<Borealis::QueryProcessor>::get_stats_IN *req = 01276 new RPCDefs<Borealis::QueryProcessor>::get_stats_IN(); 01277 ptr<rpc_request> preq(req); 01278 req->object_id = object_id(); 01279 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID; 01280 req->method_name = "get_stats"; 01281 bool done = false; RPC< vector<Stats> > ret; 01282 req->cb = wrap(&rpc_sync_completion< vector<Stats> >, &done, &ret); 01283 client().send_request(preq); 01284 while (!done && client().block()) ; 01285 req->cb = Callback<void, RPC< vector<Stats> > >(); 01286 return ret; 01287 } 01288 typedef const RPCDefs<Borealis::QueryProcessor>::get_stats_OUT &get_stats_result; 01289 void get_sel(const Callback<void, RPC< double > >& completion) const { 01290 RPCDefs<Borealis::QueryProcessor>::get_sel_IN *req = 01291 new RPCDefs<Borealis::QueryProcessor>::get_sel_IN(); 01292 req->object_id = object_id(); 01293 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID; 01294 req->method_name = "get_sel"; 01295 req->cb = completion; 01296 client().send_request(ptr<rpc_request>(req)); 01297 } 01298 RPC< double > get_sel() const { 01299 RPCDefs<Borealis::QueryProcessor>::get_sel_IN *req = 01300 new RPCDefs<Borealis::QueryProcessor>::get_sel_IN(); 01301 ptr<rpc_request> preq(req); 01302 req->object_id = object_id(); 01303 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID; 01304 req->method_name = "get_sel"; 01305 bool done = false; RPC< double > ret; 01306 req->cb = wrap(&rpc_sync_completion< double >, &done, &ret); 01307 client().send_request(preq); 01308 while (!done && client().block()) ; 01309 req->cb = Callback<void, RPC< double > >(); 01310 return ret; 01311 } 01312 typedef const RPCDefs<Borealis::QueryProcessor>::get_sel_OUT &get_sel_result; 01313 void create_stream(const Callback<void, RPC< void > >& completion, StreamDef stream) const { 01314 RPCDefs<Borealis::QueryProcessor>::create_stream_IN *req = 01315 new RPCDefs<Borealis::QueryProcessor>::create_stream_IN(stream); 01316 req->object_id = object_id(); 01317 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID; 01318 req->method_name = "create_stream"; 01319 req->cb = completion; 01320 client().send_request(ptr<rpc_request>(req)); 01321 } 01322 RPC< void > create_stream(StreamDef stream) const { 01323 RPCDefs<Borealis::QueryProcessor>::create_stream_IN *req = 01324 new RPCDefs<Borealis::QueryProcessor>::create_stream_IN(stream); 01325 ptr<rpc_request> preq(req); 01326 req->object_id = object_id(); 01327 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID; 01328 req->method_name = "create_stream"; 01329 bool done = false; RPC< void > ret; 01330 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01331 client().send_request(preq); 01332 while (!done && client().block()) ; 01333 req->cb = Callback<void, RPC< void > >(); 01334 return ret; 01335 } 01336 typedef const RPCDefs<Borealis::QueryProcessor>::create_stream_OUT &create_stream_result; 01337 void create_cpview(const Callback<void, RPC< void > >& completion, CPViewDescription view_desc, StreamDef streamdef) const { 01338 RPCDefs<Borealis::QueryProcessor>::create_cpview_IN *req = 01339 new RPCDefs<Borealis::QueryProcessor>::create_cpview_IN(view_desc, streamdef); 01340 req->object_id = object_id(); 01341 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID; 01342 req->method_name = "create_cpview"; 01343 req->cb = completion; 01344 client().send_request(ptr<rpc_request>(req)); 01345 } 01346 RPC< void > create_cpview(CPViewDescription view_desc, StreamDef streamdef) const { 01347 RPCDefs<Borealis::QueryProcessor>::create_cpview_IN *req = 01348 new RPCDefs<Borealis::QueryProcessor>::create_cpview_IN(view_desc, streamdef); 01349 ptr<rpc_request> preq(req); 01350 req->object_id = object_id(); 01351 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID; 01352 req->method_name = "create_cpview"; 01353 bool done = false; RPC< void > ret; 01354 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01355 client().send_request(preq); 01356 while (!done && client().block()) ; 01357 req->cb = Callback<void, RPC< void > >(); 01358 return ret; 01359 } 01360 typedef const RPCDefs<Borealis::QueryProcessor>::create_cpview_OUT &create_cpview_result; 01361 void update_stream(const Callback<void, RPC< void > >& completion, StreamDef old_sd, StreamDef new_sd) const { 01362 RPCDefs<Borealis::QueryProcessor>::update_stream_IN *req = 01363 new RPCDefs<Borealis::QueryProcessor>::update_stream_IN(old_sd, new_sd); 01364 req->object_id = object_id(); 01365 req->method_id = RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID; 01366 req->method_name = "update_stream"; 01367 req->cb = completion; 01368 client().send_request(ptr<rpc_request>(req)); 01369 } 01370 RPC< void > update_stream(StreamDef old_sd, StreamDef new_sd) const { 01371 RPCDefs<Borealis::QueryProcessor>::update_stream_IN *req = 01372 new RPCDefs<Borealis::QueryProcessor>::update_stream_IN(old_sd, new_sd); 01373 ptr<rpc_request> preq(req); 01374 req->object_id = object_id(); 01375 req->method_id = RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID; 01376 req->method_name = "update_stream"; 01377 bool done = false; RPC< void > ret; 01378 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01379 client().send_request(preq); 01380 while (!done && client().block()) ; 01381 req->cb = Callback<void, RPC< void > >(); 01382 return ret; 01383 } 01384 typedef const RPCDefs<Borealis::QueryProcessor>::update_stream_OUT &update_stream_result; 01385 void ack(const Callback<void, RPC< void > >& completion, MedusaID node, StreamID id, string last_tuple) const { 01386 RPCDefs<Borealis::QueryProcessor>::ack_IN *req = 01387 new RPCDefs<Borealis::QueryProcessor>::ack_IN(node, id, last_tuple); 01388 req->object_id = object_id(); 01389 req->method_id = RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID; 01390 req->method_name = "ack"; 01391 req->cb = completion; 01392 client().send_request(ptr<rpc_request>(req)); 01393 } 01394 RPC< void > ack(MedusaID node, StreamID id, string last_tuple) const { 01395 RPCDefs<Borealis::QueryProcessor>::ack_IN *req = 01396 new RPCDefs<Borealis::QueryProcessor>::ack_IN(node, id, last_tuple); 01397 ptr<rpc_request> preq(req); 01398 req->object_id = object_id(); 01399 req->method_id = RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID; 01400 req->method_name = "ack"; 01401 bool done = false; RPC< void > ret; 01402 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01403 client().send_request(preq); 01404 while (!done && client().block()) ; 01405 req->cb = Callback<void, RPC< void > >(); 01406 return ret; 01407 } 01408 typedef const RPCDefs<Borealis::QueryProcessor>::ack_OUT &ack_result; 01409 void trim(const Callback<void, RPC< void > >& completion, MedusaID node, StreamID id, string last_tuple) const { 01410 RPCDefs<Borealis::QueryProcessor>::trim_IN *req = 01411 new RPCDefs<Borealis::QueryProcessor>::trim_IN(node, id, last_tuple); 01412 req->object_id = object_id(); 01413 req->method_id = RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID; 01414 req->method_name = "trim"; 01415 req->cb = completion; 01416 client().send_request(ptr<rpc_request>(req)); 01417 } 01418 RPC< void > trim(MedusaID node, StreamID id, string last_tuple) const { 01419 RPCDefs<Borealis::QueryProcessor>::trim_IN *req = 01420 new RPCDefs<Borealis::QueryProcessor>::trim_IN(node, id, last_tuple); 01421 ptr<rpc_request> preq(req); 01422 req->object_id = object_id(); 01423 req->method_id = RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID; 01424 req->method_name = "trim"; 01425 bool done = false; RPC< void > ret; 01426 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01427 client().send_request(preq); 01428 while (!done && client().block()) ; 01429 req->cb = Callback<void, RPC< void > >(); 01430 return ret; 01431 } 01432 typedef const RPCDefs<Borealis::QueryProcessor>::trim_OUT &trim_result; 01433 void set_recovery_method(const Callback<void, RPC< void > >& completion, int method) const { 01434 RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN *req = 01435 new RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN(method); 01436 req->object_id = object_id(); 01437 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID; 01438 req->method_name = "set_recovery_method"; 01439 req->cb = completion; 01440 client().send_request(ptr<rpc_request>(req)); 01441 } 01442 RPC< void > set_recovery_method(int method) const { 01443 RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN *req = 01444 new RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN(method); 01445 ptr<rpc_request> preq(req); 01446 req->object_id = object_id(); 01447 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID; 01448 req->method_name = "set_recovery_method"; 01449 bool done = false; RPC< void > ret; 01450 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01451 client().send_request(preq); 01452 while (!done && client().block()) ; 01453 req->cb = Callback<void, RPC< void > >(); 01454 return ret; 01455 } 01456 typedef const RPCDefs<Borealis::QueryProcessor>::set_recovery_method_OUT &set_recovery_method_result; 01457 void set_primary_status(const Callback<void, RPC< void > >& completion, bool status) const { 01458 RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN *req = 01459 new RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN(status); 01460 req->object_id = object_id(); 01461 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID; 01462 req->method_name = "set_primary_status"; 01463 req->cb = completion; 01464 client().send_request(ptr<rpc_request>(req)); 01465 } 01466 RPC< void > set_primary_status(bool status) const { 01467 RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN *req = 01468 new RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN(status); 01469 ptr<rpc_request> preq(req); 01470 req->object_id = object_id(); 01471 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID; 01472 req->method_name = "set_primary_status"; 01473 bool done = false; RPC< void > ret; 01474 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01475 client().send_request(preq); 01476 while (!done && client().block()) ; 01477 req->cb = Callback<void, RPC< void > >(); 01478 return ret; 01479 } 01480 typedef const RPCDefs<Borealis::QueryProcessor>::set_primary_status_OUT &set_primary_status_result; 01481 void set_secondaries(const Callback<void, RPC< void > >& completion, vector<MedusaID> secondaries) const { 01482 RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN *req = 01483 new RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN(secondaries); 01484 req->object_id = object_id(); 01485 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID; 01486 req->method_name = "set_secondaries"; 01487 req->cb = completion; 01488 client().send_request(ptr<rpc_request>(req)); 01489 } 01490 RPC< void > set_secondaries(vector<MedusaID> secondaries) const { 01491 RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN *req = 01492 new RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN(secondaries); 01493 ptr<rpc_request> preq(req); 01494 req->object_id = object_id(); 01495 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID; 01496 req->method_name = "set_secondaries"; 01497 bool done = false; RPC< void > ret; 01498 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01499 client().send_request(preq); 01500 while (!done && client().block()) ; 01501 req->cb = Callback<void, RPC< void > >(); 01502 return ret; 01503 } 01504 typedef const RPCDefs<Borealis::QueryProcessor>::set_secondaries_OUT &set_secondaries_result; 01505 void set_replicas(const Callback<void, RPC< void > >& completion, vector<MedusaID> replicas) const { 01506 RPCDefs<Borealis::QueryProcessor>::set_replicas_IN *req = 01507 new RPCDefs<Borealis::QueryProcessor>::set_replicas_IN(replicas); 01508 req->object_id = object_id(); 01509 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID; 01510 req->method_name = "set_replicas"; 01511 req->cb = completion; 01512 client().send_request(ptr<rpc_request>(req)); 01513 } 01514 RPC< void > set_replicas(vector<MedusaID> replicas) const { 01515 RPCDefs<Borealis::QueryProcessor>::set_replicas_IN *req = 01516 new RPCDefs<Borealis::QueryProcessor>::set_replicas_IN(replicas); 01517 ptr<rpc_request> preq(req); 01518 req->object_id = object_id(); 01519 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID; 01520 req->method_name = "set_replicas"; 01521 bool done = false; RPC< void > ret; 01522 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01523 client().send_request(preq); 01524 while (!done && client().block()) ; 01525 req->cb = Callback<void, RPC< void > >(); 01526 return ret; 01527 } 01528 typedef const RPCDefs<Borealis::QueryProcessor>::set_replicas_OUT &set_replicas_result; 01529 void checkpoint(const Callback<void, RPC< void > >& completion, vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim) const { 01530 RPCDefs<Borealis::QueryProcessor>::checkpoint_IN *req = 01531 new RPCDefs<Borealis::QueryProcessor>::checkpoint_IN(tuples_to_checkpoint, tuples_to_trim); 01532 req->object_id = object_id(); 01533 req->method_id = RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID; 01534 req->method_name = "checkpoint"; 01535 req->cb = completion; 01536 client().send_request(ptr<rpc_request>(req)); 01537 } 01538 RPC< void > checkpoint(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim) const { 01539 RPCDefs<Borealis::QueryProcessor>::checkpoint_IN *req = 01540 new RPCDefs<Borealis::QueryProcessor>::checkpoint_IN(tuples_to_checkpoint, tuples_to_trim); 01541 ptr<rpc_request> preq(req); 01542 req->object_id = object_id(); 01543 req->method_id = RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID; 01544 req->method_name = "checkpoint"; 01545 bool done = false; RPC< void > ret; 01546 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret); 01547 client().send_request(preq); 01548 while (!done && client().block()) ; 01549 req->cb = Callback<void, RPC< void > >(); 01550 return ret; 01551 } 01552 typedef const RPCDefs<Borealis::QueryProcessor>::checkpoint_OUT &checkpoint_result; 01553 }; 01554 }; 01555 template<> struct RPCDefs<Borealis::QueryProcessor> : __nmstl_tmp_rpc_defs::RPCDefs<Borealis::QueryProcessor> {}; 01556 template<> struct Remote<Borealis::QueryProcessor> : __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor> { 01557 Remote<Borealis::QueryProcessor>() {} 01558 Remote<Borealis::QueryProcessor>(RPCClient& cli, rpc_object_id object_id) : 01559 __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor>(cli, object_id) {} 01560 01561 Remote<Borealis::QueryProcessor>(const __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor>& x) : 01562 __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor>(x) {} 01563 }; 01564 01565 01566 #endif // !defined(DOXYGEN_SKIP) 01567 01568 NMSTL_NAMESPACE_END; 01569 01570 #endif // !defined(___QUERYPROCESSOR_RPC_QUERYPROCESSOR_H) 01571 01572 01573 01574 01575 01576 #ifdef NMSTL_RPC_DEFINE 01577 01578 #include <string> 01579 #include <map> 01580 #include <NMSTL/rpc> 01581 01582 void Borealis::QueryProcessor::handle_request(const NMSTL::RPCObject::response_cb& cb, const NMSTL::ptr<NMSTL::rpc_request>& req) { 01583 switch(req->method_id) { 01584 case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID: 01585 { 01586 const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN* inp = 01587 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN*>(req.get()); 01588 01589 if (!inp) { 01590 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01591 " to Borealis::QueryProcessor::typecheck; expected " + 01592 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN); 01593 WARN << err; 01594 cb(err, 0, true); 01595 return; 01596 } 01597 01598 const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN& in = *inp; 01599 RPC< Query > result = typecheck(in.query); 01600 if (result.valid()) { 01601 NMSTL::rpc_message_1< Query > ret(*result); 01602 cb(true, &ret, true); 01603 } else { 01604 cb(result.stat(), 0, true); 01605 } 01606 break; 01607 } 01608 case NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID: 01609 { 01610 const NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN* inp = 01611 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN*>(req.get()); 01612 01613 if (!inp) { 01614 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01615 " to Borealis::QueryProcessor::setup_query; expected " + 01616 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN); 01617 WARN << err; 01618 cb(err, 0, true); 01619 return; 01620 } 01621 01622 const NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN& in = *inp; 01623 NMSTL::AsyncRPC< void > pending = setup_query(in.query); 01624 pending.set_response_handler(cb); 01625 break; 01626 } 01627 case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID: 01628 { 01629 const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN* inp = 01630 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN*>(req.get()); 01631 01632 if (!inp) { 01633 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01634 " to Borealis::QueryProcessor::typecheck_and_setup; expected " + 01635 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN); 01636 WARN << err; 01637 cb(err, 0, true); 01638 return; 01639 } 01640 01641 const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN& in = *inp; 01642 RPC< Query > result = typecheck_and_setup(in.query); 01643 if (result.valid()) { 01644 NMSTL::rpc_message_1< Query > ret(*result); 01645 cb(true, &ret, true); 01646 } else { 01647 cb(result.stat(), 0, true); 01648 } 01649 break; 01650 } 01651 case NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID: 01652 { 01653 const NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN* inp = 01654 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN*>(req.get()); 01655 01656 if (!inp) { 01657 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01658 " to Borealis::QueryProcessor::load_box; expected " + 01659 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN); 01660 WARN << err; 01661 cb(err, 0, true); 01662 return; 01663 } 01664 01665 const NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN& in = *inp; 01666 NMSTL::AsyncRPC< void > pending = load_box(in.file_path); 01667 pending.set_response_handler(cb); 01668 break; 01669 } 01670 case NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID: 01671 { 01672 const NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN* inp = 01673 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN*>(req.get()); 01674 01675 if (!inp) { 01676 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01677 " to Borealis::QueryProcessor::choke_queries; expected " + 01678 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN); 01679 WARN << err; 01680 cb(err, 0, true); 01681 return; 01682 } 01683 01684 const NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN& in = *inp; 01685 RPC< vector<Query> > result = choke_queries(in.queries); 01686 if (result.valid()) { 01687 NMSTL::rpc_message_1< vector<Query> > ret(*result); 01688 cb(true, &ret, true); 01689 } else { 01690 cb(result.stat(), 0, true); 01691 } 01692 break; 01693 } 01694 case NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID: 01695 { 01696 const NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN* inp = 01697 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN*>(req.get()); 01698 01699 if (!inp) { 01700 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01701 " to Borealis::QueryProcessor::resume_queries; expected " + 01702 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN); 01703 WARN << err; 01704 cb(err, 0, true); 01705 return; 01706 } 01707 01708 const NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN& in = *inp; 01709 RPC< void > result = resume_queries(in.queries); 01710 if (result.valid()) { 01711 NMSTL::rpc_message_1< void > ret; 01712 cb(true, &ret, true); 01713 } else { 01714 cb(result.stat(), 0, true); 01715 } 01716 break; 01717 } 01718 case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID: 01719 { 01720 const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN* inp = 01721 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN*>(req.get()); 01722 01723 if (!inp) { 01724 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01725 " to Borealis::QueryProcessor::pack_query; expected " + 01726 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN); 01727 WARN << err; 01728 cb(err, 0, true); 01729 return; 01730 } 01731 01732 const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN& in = *inp; 01733 RPC< Query > result = pack_query(in.query); 01734 if (result.valid()) { 01735 NMSTL::rpc_message_1< Query > ret(*result); 01736 cb(true, &ret, true); 01737 } else { 01738 cb(result.stat(), 0, true); 01739 } 01740 break; 01741 } 01742 case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID: 01743 { 01744 const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN* inp = 01745 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN*>(req.get()); 01746 01747 if (!inp) { 01748 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01749 " to Borealis::QueryProcessor::pack_queries; expected " + 01750 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN); 01751 WARN << err; 01752 cb(err, 0, true); 01753 return; 01754 } 01755 01756 const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN& in = *inp; 01757 RPC< vector<Query> > result = pack_queries(in.queries); 01758 if (result.valid()) { 01759 NMSTL::rpc_message_1< vector<Query> > ret(*result); 01760 cb(true, &ret, true); 01761 } else { 01762 cb(result.stat(), 0, true); 01763 } 01764 break; 01765 } 01766 case NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID: 01767 { 01768 const NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN* inp = 01769 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN*>(req.get()); 01770 01771 if (!inp) { 01772 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01773 " to Borealis::QueryProcessor::remove_query; expected " + 01774 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN); 01775 WARN << err; 01776 cb(err, 0, true); 01777 return; 01778 } 01779 01780 const NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN& in = *inp; 01781 RPC< void > result = remove_query(in.query); 01782 if (result.valid()) { 01783 NMSTL::rpc_message_1< void > ret; 01784 cb(true, &ret, true); 01785 } else { 01786 cb(result.stat(), 0, true); 01787 } 01788 break; 01789 } 01790 case NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID: 01791 { 01792 const NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN* inp = 01793 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN*>(req.get()); 01794 01795 if (!inp) { 01796 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01797 " to Borealis::QueryProcessor::replace_query; expected " + 01798 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN); 01799 WARN << err; 01800 cb(err, 0, true); 01801 return; 01802 } 01803 01804 const NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN& in = *inp; 01805 RPC< void > result = replace_query(in.old_queries, in.new_queries); 01806 if (result.valid()) { 01807 NMSTL::rpc_message_1< void > ret; 01808 cb(true, &ret, true); 01809 } else { 01810 cb(result.stat(), 0, true); 01811 } 01812 break; 01813 } 01814 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID: 01815 { 01816 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN* inp = 01817 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN*>(req.get()); 01818 01819 if (!inp) { 01820 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01821 " to Borealis::QueryProcessor::set_query_status; expected " + 01822 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN); 01823 WARN << err; 01824 cb(err, 0, true); 01825 return; 01826 } 01827 01828 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN& in = *inp; 01829 RPC< void > result = set_query_status(in.name, in.status); 01830 if (result.valid()) { 01831 NMSTL::rpc_message_1< void > ret; 01832 cb(true, &ret, true); 01833 } else { 01834 cb(result.stat(), 0, true); 01835 } 01836 break; 01837 } 01838 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID: 01839 { 01840 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN* inp = 01841 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN*>(req.get()); 01842 01843 if (!inp) { 01844 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01845 " to Borealis::QueryProcessor::set_queries_status; expected " + 01846 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN); 01847 WARN << err; 01848 cb(err, 0, true); 01849 return; 01850 } 01851 01852 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN& in = *inp; 01853 RPC< void > result = set_queries_status(in.name, in.status); 01854 if (result.valid()) { 01855 NMSTL::rpc_message_1< void > ret; 01856 cb(true, &ret, true); 01857 } else { 01858 cb(result.stat(), 0, true); 01859 } 01860 break; 01861 } 01862 case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID: 01863 { 01864 const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN* inp = 01865 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN*>(req.get()); 01866 01867 if (!inp) { 01868 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01869 " to Borealis::QueryProcessor::subscribe; expected " + 01870 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN); 01871 WARN << err; 01872 cb(err, 0, true); 01873 return; 01874 } 01875 01876 const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN& in = *inp; 01877 RPC< void > result = subscribe(in.sub, in.add_or_remove); 01878 if (result.valid()) { 01879 NMSTL::rpc_message_1< void > ret; 01880 cb(true, &ret, true); 01881 } else { 01882 cb(result.stat(), 0, true); 01883 } 01884 break; 01885 } 01886 case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID: 01887 { 01888 const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN* inp = 01889 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN*>(req.get()); 01890 01891 if (!inp) { 01892 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01893 " to Borealis::QueryProcessor::subscribe_many; expected " + 01894 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN); 01895 WARN << err; 01896 cb(err, 0, true); 01897 return; 01898 } 01899 01900 const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN& in = *inp; 01901 RPC< vector<Subscription> > result = subscribe_many(in.sub, in.add_or_remove); 01902 if (result.valid()) { 01903 NMSTL::rpc_message_1< vector<Subscription> > ret(*result); 01904 cb(true, &ret, true); 01905 } else { 01906 cb(result.stat(), 0, true); 01907 } 01908 break; 01909 } 01910 case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID: 01911 { 01912 const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN* inp = 01913 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN*>(req.get()); 01914 01915 if (!inp) { 01916 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01917 " to Borealis::QueryProcessor::get_subscriptions; expected " + 01918 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN); 01919 WARN << err; 01920 cb(err, 0, true); 01921 return; 01922 } 01923 01924 const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN& in = *inp; 01925 RPC< vector<Subscription> > result = get_subscriptions(in.streams); 01926 if (result.valid()) { 01927 NMSTL::rpc_message_1< vector<Subscription> > ret(*result); 01928 cb(true, &ret, true); 01929 } else { 01930 cb(result.stat(), 0, true); 01931 } 01932 break; 01933 } 01934 case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID: 01935 { 01936 const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN* inp = 01937 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN*>(req.get()); 01938 01939 if (!inp) { 01940 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01941 " to Borealis::QueryProcessor::get_stats; expected " + 01942 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN); 01943 WARN << err; 01944 cb(err, 0, true); 01945 return; 01946 } 01947 01948 RPC< vector<Stats> > result = get_stats(); 01949 if (result.valid()) { 01950 NMSTL::rpc_message_1< vector<Stats> > ret(*result); 01951 cb(true, &ret, true); 01952 } else { 01953 cb(result.stat(), 0, true); 01954 } 01955 break; 01956 } 01957 case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID: 01958 { 01959 const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN* inp = 01960 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN*>(req.get()); 01961 01962 if (!inp) { 01963 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01964 " to Borealis::QueryProcessor::get_sel; expected " + 01965 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN); 01966 WARN << err; 01967 cb(err, 0, true); 01968 return; 01969 } 01970 01971 RPC< double > result = get_sel(); 01972 if (result.valid()) { 01973 NMSTL::rpc_message_1< double > ret(*result); 01974 cb(true, &ret, true); 01975 } else { 01976 cb(result.stat(), 0, true); 01977 } 01978 break; 01979 } 01980 case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID: 01981 { 01982 const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN* inp = 01983 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN*>(req.get()); 01984 01985 if (!inp) { 01986 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 01987 " to Borealis::QueryProcessor::create_stream; expected " + 01988 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN); 01989 WARN << err; 01990 cb(err, 0, true); 01991 return; 01992 } 01993 01994 const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN& in = *inp; 01995 RPC< void > result = create_stream(in.stream); 01996 if (result.valid()) { 01997 NMSTL::rpc_message_1< void > ret; 01998 cb(true, &ret, true); 01999 } else { 02000 cb(result.stat(), 0, true); 02001 } 02002 break; 02003 } 02004 case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID: 02005 { 02006 const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN* inp = 02007 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN*>(req.get()); 02008 02009 if (!inp) { 02010 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02011 " to Borealis::QueryProcessor::create_cpview; expected " + 02012 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN); 02013 WARN << err; 02014 cb(err, 0, true); 02015 return; 02016 } 02017 02018 const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN& in = *inp; 02019 RPC< void > result = create_cpview(in.view_desc, in.streamdef); 02020 if (result.valid()) { 02021 NMSTL::rpc_message_1< void > ret; 02022 cb(true, &ret, true); 02023 } else { 02024 cb(result.stat(), 0, true); 02025 } 02026 break; 02027 } 02028 case NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID: 02029 { 02030 const NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN* inp = 02031 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN*>(req.get()); 02032 02033 if (!inp) { 02034 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02035 " to Borealis::QueryProcessor::update_stream; expected " + 02036 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN); 02037 WARN << err; 02038 cb(err, 0, true); 02039 return; 02040 } 02041 02042 const NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN& in = *inp; 02043 RPC< void > result = update_stream(in.old_sd, in.new_sd); 02044 if (result.valid()) { 02045 NMSTL::rpc_message_1< void > ret; 02046 cb(true, &ret, true); 02047 } else { 02048 cb(result.stat(), 0, true); 02049 } 02050 break; 02051 } 02052 case NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID: 02053 { 02054 const NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN* inp = 02055 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN*>(req.get()); 02056 02057 if (!inp) { 02058 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02059 " to Borealis::QueryProcessor::ack; expected " + 02060 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN); 02061 WARN << err; 02062 cb(err, 0, true); 02063 return; 02064 } 02065 02066 const NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN& in = *inp; 02067 RPC< void > result = ack(in.node, in.id, in.last_tuple); 02068 if (result.valid()) { 02069 NMSTL::rpc_message_1< void > ret; 02070 cb(true, &ret, true); 02071 } else { 02072 cb(result.stat(), 0, true); 02073 } 02074 break; 02075 } 02076 case NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID: 02077 { 02078 const NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN* inp = 02079 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN*>(req.get()); 02080 02081 if (!inp) { 02082 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02083 " to Borealis::QueryProcessor::trim; expected " + 02084 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN); 02085 WARN << err; 02086 cb(err, 0, true); 02087 return; 02088 } 02089 02090 const NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN& in = *inp; 02091 RPC< void > result = trim(in.node, in.id, in.last_tuple); 02092 if (result.valid()) { 02093 NMSTL::rpc_message_1< void > ret; 02094 cb(true, &ret, true); 02095 } else { 02096 cb(result.stat(), 0, true); 02097 } 02098 break; 02099 } 02100 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID: 02101 { 02102 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN* inp = 02103 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN*>(req.get()); 02104 02105 if (!inp) { 02106 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02107 " to Borealis::QueryProcessor::set_recovery_method; expected " + 02108 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN); 02109 WARN << err; 02110 cb(err, 0, true); 02111 return; 02112 } 02113 02114 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN& in = *inp; 02115 RPC< void > result = set_recovery_method(in.method); 02116 if (result.valid()) { 02117 NMSTL::rpc_message_1< void > ret; 02118 cb(true, &ret, true); 02119 } else { 02120 cb(result.stat(), 0, true); 02121 } 02122 break; 02123 } 02124 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID: 02125 { 02126 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN* inp = 02127 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN*>(req.get()); 02128 02129 if (!inp) { 02130 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02131 " to Borealis::QueryProcessor::set_primary_status; expected " + 02132 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN); 02133 WARN << err; 02134 cb(err, 0, true); 02135 return; 02136 } 02137 02138 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN& in = *inp; 02139 RPC< void > result = set_primary_status(in.status); 02140 if (result.valid()) { 02141 NMSTL::rpc_message_1< void > ret; 02142 cb(true, &ret, true); 02143 } else { 02144 cb(result.stat(), 0, true); 02145 } 02146 break; 02147 } 02148 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID: 02149 { 02150 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN* inp = 02151 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN*>(req.get()); 02152 02153 if (!inp) { 02154 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02155 " to Borealis::QueryProcessor::set_secondaries; expected " + 02156 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN); 02157 WARN << err; 02158 cb(err, 0, true); 02159 return; 02160 } 02161 02162 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN& in = *inp; 02163 RPC< void > result = set_secondaries(in.secondaries); 02164 if (result.valid()) { 02165 NMSTL::rpc_message_1< void > ret; 02166 cb(true, &ret, true); 02167 } else { 02168 cb(result.stat(), 0, true); 02169 } 02170 break; 02171 } 02172 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID: 02173 { 02174 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN* inp = 02175 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN*>(req.get()); 02176 02177 if (!inp) { 02178 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02179 " to Borealis::QueryProcessor::set_replicas; expected " + 02180 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN); 02181 WARN << err; 02182 cb(err, 0, true); 02183 return; 02184 } 02185 02186 const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN& in = *inp; 02187 RPC< void > result = set_replicas(in.replicas); 02188 if (result.valid()) { 02189 NMSTL::rpc_message_1< void > ret; 02190 cb(true, &ret, true); 02191 } else { 02192 cb(result.stat(), 0, true); 02193 } 02194 break; 02195 } 02196 case NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID: 02197 { 02198 const NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN* inp = 02199 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN*>(req.get()); 02200 02201 if (!inp) { 02202 string err = string("Invalid RPC request type ") + typeid(*(req.get())) + 02203 " to Borealis::QueryProcessor::checkpoint; expected " + 02204 typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN); 02205 WARN << err; 02206 cb(err, 0, true); 02207 return; 02208 } 02209 02210 const NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN& in = *inp; 02211 RPC< void > result = checkpoint(in.tuples_to_checkpoint, in.tuples_to_trim); 02212 if (result.valid()) { 02213 NMSTL::rpc_message_1< void > ret; 02214 cb(true, &ret, true); 02215 } else { 02216 cb(result.stat(), 0, true); 02217 } 02218 break; 02219 } 02220 default: 02221 string err = string("Unknown method_id ") + req->method_id + " in Borealis::QueryProcessor::handle_request"; 02222 WARN << err; 02223 cb(err, 0, true); 02224 } 02225 } 02226 void Borealis::QueryProcessor::handle_request(const response_cb& cb, rpc_id method_id, ISerial& is) { 02227 switch(method_id) { 02228 case RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID: 02229 { 02230 RPCDefs<Borealis::QueryProcessor>::typecheck_IN in; 02231 is >> in; 02232 if (!is) { 02233 WARN << "Invalid RPC request to Borealis::QueryProcessor::typecheck: " << is.stat(); 02234 cb(is.stat(), 0, true); 02235 return; 02236 } 02237 DEBUG << "Handling typecheck"; 02238 RPC< Query > result = typecheck(in.query); 02239 if (result.valid()) { 02240 NMSTL::rpc_message_1< Query > ret(*result); 02241 cb(true, &ret, true); 02242 } else { 02243 cb(result.stat(), 0, true); 02244 } 02245 break; 02246 } 02247 case RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID: 02248 { 02249 RPCDefs<Borealis::QueryProcessor>::setup_query_IN in; 02250 is >> in; 02251 if (!is) { 02252 WARN << "Invalid RPC request to Borealis::QueryProcessor::setup_query: " << is.stat(); 02253 cb(is.stat(), 0, true); 02254 return; 02255 } 02256 DEBUG << "Handling setup_query"; 02257 AsyncRPC< void > pending = setup_query(in.query); 02258 pending.set_response_handler(cb); 02259 break; 02260 } 02261 case RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID: 02262 { 02263 RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN in; 02264 is >> in; 02265 if (!is) { 02266 WARN << "Invalid RPC request to Borealis::QueryProcessor::typecheck_and_setup: " << is.stat(); 02267 cb(is.stat(), 0, true); 02268 return; 02269 } 02270 DEBUG << "Handling typecheck_and_setup"; 02271 RPC< Query > result = typecheck_and_setup(in.query); 02272 if (result.valid()) { 02273 NMSTL::rpc_message_1< Query > ret(*result); 02274 cb(true, &ret, true); 02275 } else { 02276 cb(result.stat(), 0, true); 02277 } 02278 break; 02279 } 02280 case RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID: 02281 { 02282 RPCDefs<Borealis::QueryProcessor>::load_box_IN in; 02283 is >> in; 02284 if (!is) { 02285 WARN << "Invalid RPC request to Borealis::QueryProcessor::load_box: " << is.stat(); 02286 cb(is.stat(), 0, true); 02287 return; 02288 } 02289 DEBUG << "Handling load_box"; 02290 AsyncRPC< void > pending = load_box(in.file_path); 02291 pending.set_response_handler(cb); 02292 break; 02293 } 02294 case RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID: 02295 { 02296 RPCDefs<Borealis::QueryProcessor>::choke_queries_IN in; 02297 is >> in; 02298 if (!is) { 02299 WARN << "Invalid RPC request to Borealis::QueryProcessor::choke_queries: " << is.stat(); 02300 cb(is.stat(), 0, true); 02301 return; 02302 } 02303 DEBUG << "Handling choke_queries"; 02304 RPC< vector<Query> > result = choke_queries(in.queries); 02305 if (result.valid()) { 02306 NMSTL::rpc_message_1< vector<Query> > ret(*result); 02307 cb(true, &ret, true); 02308 } else { 02309 cb(result.stat(), 0, true); 02310 } 02311 break; 02312 } 02313 case RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID: 02314 { 02315 RPCDefs<Borealis::QueryProcessor>::resume_queries_IN in; 02316 is >> in; 02317 if (!is) { 02318 WARN << "Invalid RPC request to Borealis::QueryProcessor::resume_queries: " << is.stat(); 02319 cb(is.stat(), 0, true); 02320 return; 02321 } 02322 DEBUG << "Handling resume_queries"; 02323 RPC< void > result = resume_queries(in.queries); 02324 if (result.valid()) { 02325 NMSTL::rpc_message_1< void > ret; 02326 cb(true, &ret, true); 02327 } else { 02328 cb(result.stat(), 0, true); 02329 } 02330 break; 02331 } 02332 case RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID: 02333 { 02334 RPCDefs<Borealis::QueryProcessor>::pack_query_IN in; 02335 is >> in; 02336 if (!is) { 02337 WARN << "Invalid RPC request to Borealis::QueryProcessor::pack_query: " << is.stat(); 02338 cb(is.stat(), 0, true); 02339 return; 02340 } 02341 DEBUG << "Handling pack_query"; 02342 RPC< Query > result = pack_query(in.query); 02343 if (result.valid()) { 02344 NMSTL::rpc_message_1< Query > ret(*result); 02345 cb(true, &ret, true); 02346 } else { 02347 cb(result.stat(), 0, true); 02348 } 02349 break; 02350 } 02351 case RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID: 02352 { 02353 RPCDefs<Borealis::QueryProcessor>::pack_queries_IN in; 02354 is >> in; 02355 if (!is) { 02356 WARN << "Invalid RPC request to Borealis::QueryProcessor::pack_queries: " << is.stat(); 02357 cb(is.stat(), 0, true); 02358 return; 02359 } 02360 DEBUG << "Handling pack_queries"; 02361 RPC< vector<Query> > result = pack_queries(in.queries); 02362 if (result.valid()) { 02363 NMSTL::rpc_message_1< vector<Query> > ret(*result); 02364 cb(true, &ret, true); 02365 } else { 02366 cb(result.stat(), 0, true); 02367 } 02368 break; 02369 } 02370 case RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID: 02371 { 02372 RPCDefs<Borealis::QueryProcessor>::remove_query_IN in; 02373 is >> in; 02374 if (!is) { 02375 WARN << "Invalid RPC request to Borealis::QueryProcessor::remove_query: " << is.stat(); 02376 cb(is.stat(), 0, true); 02377 return; 02378 } 02379 DEBUG << "Handling remove_query"; 02380 RPC< void > result = remove_query(in.query); 02381 if (result.valid()) { 02382 NMSTL::rpc_message_1< void > ret; 02383 cb(true, &ret, true); 02384 } else { 02385 cb(result.stat(), 0, true); 02386 } 02387 break; 02388 } 02389 case RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID: 02390 { 02391 RPCDefs<Borealis::QueryProcessor>::replace_query_IN in; 02392 is >> in; 02393 if (!is) { 02394 WARN << "Invalid RPC request to Borealis::QueryProcessor::replace_query: " << is.stat(); 02395 cb(is.stat(), 0, true); 02396 return; 02397 } 02398 DEBUG << "Handling replace_query"; 02399 RPC< void > result = replace_query(in.old_queries, in.new_queries); 02400 if (result.valid()) { 02401 NMSTL::rpc_message_1< void > ret; 02402 cb(true, &ret, true); 02403 } else { 02404 cb(result.stat(), 0, true); 02405 } 02406 break; 02407 } 02408 case RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID: 02409 { 02410 RPCDefs<Borealis::QueryProcessor>::set_query_status_IN in; 02411 is >> in; 02412 if (!is) { 02413 WARN << "Invalid RPC request to Borealis::QueryProcessor::set_query_status: " << is.stat(); 02414 cb(is.stat(), 0, true); 02415 return; 02416 } 02417 DEBUG << "Handling set_query_status"; 02418 RPC< void > result = set_query_status(in.name, in.status); 02419 if (result.valid()) { 02420 NMSTL::rpc_message_1< void > ret; 02421 cb(true, &ret, true); 02422 } else { 02423 cb(result.stat(), 0, true); 02424 } 02425 break; 02426 } 02427 case RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID: 02428 { 02429 RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN in; 02430 is >> in; 02431 if (!is) { 02432 WARN << "Invalid RPC request to Borealis::QueryProcessor::set_queries_status: " << is.stat(); 02433 cb(is.stat(), 0, true); 02434 return; 02435 } 02436 DEBUG << "Handling set_queries_status"; 02437 RPC< void > result = set_queries_status(in.name, in.status); 02438 if (result.valid()) { 02439 NMSTL::rpc_message_1< void > ret; 02440 cb(true, &ret, true); 02441 } else { 02442 cb(result.stat(), 0, true); 02443 } 02444 break; 02445 } 02446 case RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID: 02447 { 02448 RPCDefs<Borealis::QueryProcessor>::subscribe_IN in; 02449 is >> in; 02450 if (!is) { 02451 WARN << "Invalid RPC request to Borealis::QueryProcessor::subscribe: " << is.stat(); 02452 cb(is.stat(), 0, true); 02453 return; 02454 } 02455 DEBUG << "Handling subscribe"; 02456 RPC< void > result = subscribe(in.sub, in.add_or_remove); 02457 if (result.valid()) { 02458 NMSTL::rpc_message_1< void > ret; 02459 cb(true, &ret, true); 02460 } else { 02461 cb(result.stat(), 0, true); 02462 } 02463 break; 02464 } 02465 case RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID: 02466 { 02467 RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN in; 02468 is >> in; 02469 if (!is) { 02470 WARN << "Invalid RPC request to Borealis::QueryProcessor::subscribe_many: " << is.stat(); 02471 cb(is.stat(), 0, true); 02472 return; 02473 } 02474 DEBUG << "Handling subscribe_many"; 02475 RPC< vector<Subscription> > result = subscribe_many(in.sub, in.add_or_remove); 02476 if (result.valid()) { 02477 NMSTL::rpc_message_1< vector<Subscription> > ret(*result); 02478 cb(true, &ret, true); 02479 } else { 02480 cb(result.stat(), 0, true); 02481 } 02482 break; 02483 } 02484 case RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID: 02485 { 02486 RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN in; 02487 is >> in; 02488 if (!is) { 02489 WARN << "Invalid RPC request to Borealis::QueryProcessor::get_subscriptions: " << is.stat(); 02490 cb(is.stat(), 0, true); 02491 return; 02492 } 02493 DEBUG << "Handling get_subscriptions"; 02494 RPC< vector<Subscription> > result = get_subscriptions(in.streams); 02495 if (result.valid()) { 02496 NMSTL::rpc_message_1< vector<Subscription> > ret(*result); 02497 cb(true, &ret, true); 02498 } else { 02499 cb(result.stat(), 0, true); 02500 } 02501 break; 02502 } 02503 case RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID: 02504 { 02505 RPCDefs<Borealis::QueryProcessor>::get_stats_IN in; 02506 is >> in; 02507 if (!is) { 02508 WARN << "Invalid RPC request to Borealis::QueryProcessor::get_stats: " << is.stat(); 02509 cb(is.stat(), 0, true); 02510 return; 02511 } 02512 DEBUG << "Handling get_stats"; 02513 RPC< vector<Stats> > result = get_stats(); 02514 if (result.valid()) { 02515 NMSTL::rpc_message_1< vector<Stats> > ret(*result); 02516 cb(true, &ret, true); 02517 } else { 02518 cb(result.stat(), 0, true); 02519 } 02520 break; 02521 } 02522 case RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID: 02523 { 02524 RPCDefs<Borealis::QueryProcessor>::get_sel_IN in; 02525 is >> in; 02526 if (!is) { 02527 WARN << "Invalid RPC request to Borealis::QueryProcessor::get_sel: " << is.stat(); 02528 cb(is.stat(), 0, true); 02529 return; 02530 } 02531 DEBUG << "Handling get_sel"; 02532 RPC< double > result = get_sel(); 02533 if (result.valid()) { 02534 NMSTL::rpc_message_1< double > ret(*result); 02535 cb(true, &ret, true); 02536 } else { 02537 cb(result.stat(), 0, true); 02538 } 02539 break; 02540 } 02541 case RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID: 02542 { 02543 RPCDefs<Borealis::QueryProcessor>::create_stream_IN in; 02544 is >> in; 02545 if (!is) { 02546 WARN << "Invalid RPC request to Borealis::QueryProcessor::create_stream: " << is.stat(); 02547 cb(is.stat(), 0, true); 02548 return; 02549 } 02550 DEBUG << "Handling create_stream"; 02551 RPC< void > result = create_stream(in.stream); 02552 if (result.valid()) { 02553 NMSTL::rpc_message_1< void > ret; 02554 cb(true, &ret, true); 02555 } else { 02556 cb(result.stat(), 0, true); 02557 } 02558 break; 02559 } 02560 case RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID: 02561 { 02562 RPCDefs<Borealis::QueryProcessor>::create_cpview_IN in; 02563 is >> in; 02564 if (!is) { 02565 WARN << "Invalid RPC request to Borealis::QueryProcessor::create_cpview: " << is.stat(); 02566 cb(is.stat(), 0, true); 02567 return; 02568 } 02569 DEBUG << "Handling create_cpview"; 02570 RPC< void > result = create_cpview(in.view_desc, in.streamdef); 02571 if (result.valid()) { 02572 NMSTL::rpc_message_1< void > ret; 02573 cb(true, &ret, true); 02574 } else { 02575 cb(result.stat(), 0, true); 02576 } 02577 break; 02578 } 02579 case RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID: 02580 { 02581 RPCDefs<Borealis::QueryProcessor>::update_stream_IN in; 02582 is >> in; 02583 if (!is) { 02584 WARN << "Invalid RPC request to Borealis::QueryProcessor::update_stream: " << is.stat(); 02585 cb(is.stat(), 0, true); 02586 return; 02587 } 02588 DEBUG << "Handling update_stream"; 02589 RPC< void > result = update_stream(in.old_sd, in.new_sd); 02590 if (result.valid()) { 02591 NMSTL::rpc_message_1< void > ret; 02592 cb(true, &ret, true); 02593 } else { 02594 cb(result.stat(), 0, true); 02595 } 02596 break; 02597 } 02598 case RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID: 02599 { 02600 RPCDefs<Borealis::QueryProcessor>::ack_IN in; 02601 is >> in; 02602 if (!is) { 02603 WARN << "Invalid RPC request to Borealis::QueryProcessor::ack: " << is.stat(); 02604 cb(is.stat(), 0, true); 02605 return; 02606 } 02607 DEBUG << "Handling ack"; 02608 RPC< void > result = ack(in.node, in.id, in.last_tuple); 02609 if (result.valid()) { 02610 NMSTL::rpc_message_1< void > ret; 02611 cb(true, &ret, true); 02612 } else { 02613 cb(result.stat(), 0, true); 02614 } 02615 break; 02616 } 02617 case RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID: 02618 { 02619 RPCDefs<Borealis::QueryProcessor>::trim_IN in; 02620 is >> in; 02621 if (!is) { 02622 WARN << "Invalid RPC request to Borealis::QueryProcessor::trim: " << is.stat(); 02623 cb(is.stat(), 0, true); 02624 return; 02625 } 02626 DEBUG << "Handling trim"; 02627 RPC< void > result = trim(in.node, in.id, in.last_tuple); 02628 if (result.valid()) { 02629 NMSTL::rpc_message_1< void > ret; 02630 cb(true, &ret, true); 02631 } else { 02632 cb(result.stat(), 0, true); 02633 } 02634 break; 02635 } 02636 case RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID: 02637 { 02638 RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN in; 02639 is >> in; 02640 if (!is) { 02641 WARN << "Invalid RPC request to Borealis::QueryProcessor::set_recovery_method: " << is.stat(); 02642 cb(is.stat(), 0, true); 02643 return; 02644 } 02645 DEBUG << "Handling set_recovery_method"; 02646 RPC< void > result = set_recovery_method(in.method); 02647 if (result.valid()) { 02648 NMSTL::rpc_message_1< void > ret; 02649 cb(true, &ret, true); 02650 } else { 02651 cb(result.stat(), 0, true); 02652 } 02653 break; 02654 } 02655 case RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID: 02656 { 02657 RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN in; 02658 is >> in; 02659 if (!is) { 02660 WARN << "Invalid RPC request to Borealis::QueryProcessor::set_primary_status: " << is.stat(); 02661 cb(is.stat(), 0, true); 02662 return; 02663 } 02664 DEBUG << "Handling set_primary_status"; 02665 RPC< void > result = set_primary_status(in.status); 02666 if (result.valid()) { 02667 NMSTL::rpc_message_1< void > ret; 02668 cb(true, &ret, true); 02669 } else { 02670 cb(result.stat(), 0, true); 02671 } 02672 break; 02673 } 02674 case RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID: 02675 { 02676 RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN in; 02677 is >> in; 02678 if (!is) { 02679 WARN << "Invalid RPC request to Borealis::QueryProcessor::set_secondaries: " << is.stat(); 02680 cb(is.stat(), 0, true); 02681 return; 02682 } 02683 DEBUG << "Handling set_secondaries"; 02684 RPC< void > result = set_secondaries(in.secondaries); 02685 if (result.valid()) { 02686 NMSTL::rpc_message_1< void > ret; 02687 cb(true, &ret, true); 02688 } else { 02689 cb(result.stat(), 0, true); 02690 } 02691 break; 02692 } 02693 case RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID: 02694 { 02695 RPCDefs<Borealis::QueryProcessor>::set_replicas_IN in; 02696 is >> in; 02697 if (!is) { 02698 WARN << "Invalid RPC request to Borealis::QueryProcessor::set_replicas: " << is.stat(); 02699 cb(is.stat(), 0, true); 02700 return; 02701 } 02702 DEBUG << "Handling set_replicas"; 02703 RPC< void > result = set_replicas(in.replicas); 02704 if (result.valid()) { 02705 NMSTL::rpc_message_1< void > ret; 02706 cb(true, &ret, true); 02707 } else { 02708 cb(result.stat(), 0, true); 02709 } 02710 break; 02711 } 02712 case RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID: 02713 { 02714 RPCDefs<Borealis::QueryProcessor>::checkpoint_IN in; 02715 is >> in; 02716 if (!is) { 02717 WARN << "Invalid RPC request to Borealis::QueryProcessor::checkpoint: " << is.stat(); 02718 cb(is.stat(), 0, true); 02719 return; 02720 } 02721 DEBUG << "Handling checkpoint"; 02722 RPC< void > result = checkpoint(in.tuples_to_checkpoint, in.tuples_to_trim); 02723 if (result.valid()) { 02724 NMSTL::rpc_message_1< void > ret; 02725 cb(true, &ret, true); 02726 } else { 02727 cb(result.stat(), 0, true); 02728 } 02729 break; 02730 } 02731 default: 02732 string err = string("Unknown method_id ") + method_id + " in Borealis::QueryProcessor::handle_request"; 02733 WARN << err; 02734 cb(err, 0, true); 02735 } 02736 } 02737 NMSTL::rpc_id Borealis::QueryProcessor::method_id_for_name(string method_name) { 02738 map<string, int>::const_iterator i = RPCDefs<Borealis::QueryProcessor>::method_id_by_name().find(method_name); 02739 return (i == RPCDefs<Borealis::QueryProcessor>::method_id_by_name().end()) ? rpc_id() : i->second; 02740 } 02741 string Borealis::QueryProcessor::method_name_for_id(NMSTL::rpc_id method_id) { 02742 map<int, string>::const_iterator i = RPCDefs<Borealis::QueryProcessor>::method_name_by_id().find(method_id); 02743 return (i == RPCDefs<Borealis::QueryProcessor>::method_name_by_id().end()) ? string() : i->second; 02744 } 02745 NMSTL::ptr<NMSTL::rpc_request> Borealis::QueryProcessor::make_request(NMSTL::rpc_id method_id, NMSTL::ISerial& is) { 02746 switch(method_id) { 02747 case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID: 02748 { 02749 NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN; 02750 NMSTL::ptr<NMSTL::rpc_request> req(in); 02751 if (is >> *in) return req; 02752 } 02753 case NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID: 02754 { 02755 NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN; 02756 NMSTL::ptr<NMSTL::rpc_request> req(in); 02757 if (is >> *in) return req; 02758 } 02759 case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID: 02760 { 02761 NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN; 02762 NMSTL::ptr<NMSTL::rpc_request> req(in); 02763 if (is >> *in) return req; 02764 } 02765 case NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID: 02766 { 02767 NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN; 02768 NMSTL::ptr<NMSTL::rpc_request> req(in); 02769 if (is >> *in) return req; 02770 } 02771 case NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID: 02772 { 02773 NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN; 02774 NMSTL::ptr<NMSTL::rpc_request> req(in); 02775 if (is >> *in) return req; 02776 } 02777 case NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID: 02778 { 02779 NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN; 02780 NMSTL::ptr<NMSTL::rpc_request> req(in); 02781 if (is >> *in) return req; 02782 } 02783 case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID: 02784 { 02785 NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN; 02786 NMSTL::ptr<NMSTL::rpc_request> req(in); 02787 if (is >> *in) return req; 02788 } 02789 case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID: 02790 { 02791 NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN; 02792 NMSTL::ptr<NMSTL::rpc_request> req(in); 02793 if (is >> *in) return req; 02794 } 02795 case NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID: 02796 { 02797 NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN; 02798 NMSTL::ptr<NMSTL::rpc_request> req(in); 02799 if (is >> *in) return req; 02800 } 02801 case NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID: 02802 { 02803 NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN; 02804 NMSTL::ptr<NMSTL::rpc_request> req(in); 02805 if (is >> *in) return req; 02806 } 02807 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID: 02808 { 02809 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN; 02810 NMSTL::ptr<NMSTL::rpc_request> req(in); 02811 if (is >> *in) return req; 02812 } 02813 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID: 02814 { 02815 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN; 02816 NMSTL::ptr<NMSTL::rpc_request> req(in); 02817 if (is >> *in) return req; 02818 } 02819 case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID: 02820 { 02821 NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN; 02822 NMSTL::ptr<NMSTL::rpc_request> req(in); 02823 if (is >> *in) return req; 02824 } 02825 case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID: 02826 { 02827 NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN; 02828 NMSTL::ptr<NMSTL::rpc_request> req(in); 02829 if (is >> *in) return req; 02830 } 02831 case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID: 02832 { 02833 NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN; 02834 NMSTL::ptr<NMSTL::rpc_request> req(in); 02835 if (is >> *in) return req; 02836 } 02837 case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID: 02838 { 02839 NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN; 02840 NMSTL::ptr<NMSTL::rpc_request> req(in); 02841 if (is >> *in) return req; 02842 } 02843 case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID: 02844 { 02845 NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN; 02846 NMSTL::ptr<NMSTL::rpc_request> req(in); 02847 if (is >> *in) return req; 02848 } 02849 case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID: 02850 { 02851 NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN; 02852 NMSTL::ptr<NMSTL::rpc_request> req(in); 02853 if (is >> *in) return req; 02854 } 02855 case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID: 02856 { 02857 NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN; 02858 NMSTL::ptr<NMSTL::rpc_request> req(in); 02859 if (is >> *in) return req; 02860 } 02861 case NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID: 02862 { 02863 NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN; 02864 NMSTL::ptr<NMSTL::rpc_request> req(in); 02865 if (is >> *in) return req; 02866 } 02867 case NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID: 02868 { 02869 NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN; 02870 NMSTL::ptr<NMSTL::rpc_request> req(in); 02871 if (is >> *in) return req; 02872 } 02873 case NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID: 02874 { 02875 NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN; 02876 NMSTL::ptr<NMSTL::rpc_request> req(in); 02877 if (is >> *in) return req; 02878 } 02879 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID: 02880 { 02881 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN; 02882 NMSTL::ptr<NMSTL::rpc_request> req(in); 02883 if (is >> *in) return req; 02884 } 02885 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID: 02886 { 02887 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN; 02888 NMSTL::ptr<NMSTL::rpc_request> req(in); 02889 if (is >> *in) return req; 02890 } 02891 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID: 02892 { 02893 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN; 02894 NMSTL::ptr<NMSTL::rpc_request> req(in); 02895 if (is >> *in) return req; 02896 } 02897 case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID: 02898 { 02899 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN; 02900 NMSTL::ptr<NMSTL::rpc_request> req(in); 02901 if (is >> *in) return req; 02902 } 02903 case NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID: 02904 { 02905 NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN *in = new NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN; 02906 NMSTL::ptr<NMSTL::rpc_request> req(in); 02907 if (is >> *in) return req; 02908 } 02909 } 02910 return NMSTL::ptr<NMSTL::rpc_request>(); 02911 } 02912 02913 02914 #endif 02915

Generated on Fri Nov 12 15:15:21 2004 for Borealis by doxygen 1.3.8