00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
#ifndef ___QUERYPROCESSOR_RPC_QUERYPROCESSOR_H
00015
#define ___QUERYPROCESSOR_RPC_QUERYPROCESSOR_H
00016
00017
#include <NMSTL/rpc>
00018
#include "BoxPackage.h"
00019
#include "MTuple.h"
00020
#include "Name.h"
00021
#include "Query.h"
00022
#include "Recovery.h"
00023
#include "Stats.h"
00024
#include "StreamEvent.h"
00025
00026
namespace Borealis {
class QueryProcessor; };
00027
00028
00029 NMSTL_NAMESPACE_BEGIN;
00030
00031
#ifdef DOXYGEN_SKIP
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
template<
typename T>
class Remote;
00043 class Remote<Borealis::
QueryProcessor> {
00044
public:
00047
void typecheck(
const Callback<
void, RPC< Query > >& completion,
00048 Query query);
00049
00051 RPC< Query >
typecheck(Query query);
00052
00055
void setup_query(
const Callback<
void, RPC< void > >& completion,
00056 Query query);
00057
00059 RPC< void >
setup_query(Query query);
00060
00063
void typecheck_and_setup(
const Callback<
void, RPC< Query > >& completion,
00064 Query query);
00065
00067 RPC< Query >
typecheck_and_setup(Query query);
00068
00071
void load_box(
const Callback<
void, RPC< void > >& completion,
00072 string file_path);
00073
00075 RPC< void >
load_box(string file_path);
00076
00079
void choke_queries(
const Callback<
void, RPC< vector<Query> > >& completion,
00080 vector<Query> queries);
00081
00083 RPC< vector<Query> >
choke_queries(vector<Query> queries);
00084
00087
void resume_queries(
const Callback<
void, RPC< void > >& completion,
00088 vector<Query> queries);
00089
00091 RPC< void >
resume_queries(vector<Query> queries);
00092
00095
void pack_query(
const Callback<
void, RPC< Query > >& completion,
00096 Query query);
00097
00099 RPC< Query >
pack_query(Query query);
00100
00103
void pack_queries(
const Callback<
void, RPC< vector<Query> > >& completion,
00104 vector<Query> queries);
00105
00107 RPC< vector<Query> >
pack_queries(vector<Query> queries);
00108
00111
void remove_query(
const Callback<
void, RPC< void > >& completion,
00112 Query query);
00113
00115 RPC< void >
remove_query(Query query);
00116
00119
void replace_query(
const Callback<
void, RPC< void > >& completion,
00120 vector<Name> old_queries, vector<Query> new_queries);
00121
00123 RPC< void >
replace_query(vector<Name> old_queries, vector<Query> new_queries);
00124
00127
void set_query_status(
const Callback<
void, RPC< void > >& completion,
00128 Name name, QueryStatus status);
00129
00131 RPC< void >
set_query_status(Name name, QueryStatus status);
00132
00135
void set_queries_status(
const Callback<
void, RPC< void > >& completion,
00136 vector<Name> name, QueryStatus status);
00137
00139 RPC< void >
set_queries_status(vector<Name> name, QueryStatus status);
00140
00143
void subscribe(
const Callback<
void, RPC< void > >& completion,
00144 Subscription sub,
unsigned int add_or_remove);
00145
00147 RPC< void >
subscribe(Subscription sub,
unsigned int add_or_remove);
00148
00151
void subscribe_many(
const Callback<
void, RPC< vector<Subscription> > >& completion,
00152 vector<Subscription> sub,
unsigned int add_or_remove);
00153
00155 RPC< vector<Subscription> >
subscribe_many(vector<Subscription> sub,
unsigned int add_or_remove);
00156
00159
void get_subscriptions(
const Callback<
void, RPC< vector<Subscription> > >& completion,
00160 vector<Name> streams);
00161
00163 RPC< vector<Subscription> >
get_subscriptions(vector<Name> streams);
00164
00167
void get_stats(
const Callback<
void, RPC< vector<Stats> > >& completion);
00168
00170 RPC< vector<Stats> >
get_stats();
00171
00174
void get_sel(
const Callback<
void, RPC< double > >& completion);
00175
00177 RPC< double >
get_sel();
00178
00181
void create_stream(
const Callback<
void, RPC< void > >& completion,
00182 StreamDef stream);
00183
00185 RPC< void >
create_stream(StreamDef stream);
00186
00189
void create_cpview(
const Callback<
void, RPC< void > >& completion,
00190 CPViewDescription view_desc, StreamDef streamdef);
00191
00193 RPC< void >
create_cpview(CPViewDescription view_desc, StreamDef streamdef);
00194
00197
void update_stream(
const Callback<
void, RPC< void > >& completion,
00198 StreamDef old_sd, StreamDef new_sd);
00199
00201 RPC< void >
update_stream(StreamDef old_sd, StreamDef new_sd);
00202
00205
void ack(
const Callback<
void, RPC< void > >& completion,
00206 MedusaID node, StreamID
id, string last_tuple);
00207
00209 RPC< void >
ack(MedusaID node, StreamID
id, string last_tuple);
00210
00213
void trim(
const Callback<
void, RPC< void > >& completion,
00214 MedusaID node, StreamID
id, string last_tuple);
00215
00217 RPC< void >
trim(MedusaID node, StreamID
id, string last_tuple);
00218
00221
void set_recovery_method(
const Callback<
void, RPC< void > >& completion,
00222
int method);
00223
00225 RPC< void >
set_recovery_method(
int method);
00226
00229
void set_primary_status(
const Callback<
void, RPC< void > >& completion,
00230
bool status);
00231
00233 RPC< void >
set_primary_status(
bool status);
00234
00237
void set_secondaries(
const Callback<
void, RPC< void > >& completion,
00238 vector<MedusaID> secondaries);
00239
00241 RPC< void >
set_secondaries(vector<MedusaID> secondaries);
00242
00245
void set_replicas(
const Callback<
void, RPC< void > >& completion,
00246 vector<MedusaID> replicas);
00247
00249 RPC< void >
set_replicas(vector<MedusaID> replicas);
00250
00253
void checkpoint(
const Callback<
void, RPC< void > >& completion,
00254 vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim);
00255
00257 RPC< void >
checkpoint(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim);
00258
00259 };
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
#else // !defined(DOXYGEN_SKIP)
00273
00274
namespace __nmstl_tmp_rpc_defs {
00275
template<
typename X>
struct RPCDefs;
00276
template<
typename X>
struct Remote;
00277
using namespace Borealis;
00278
template<>
00279
struct RPCDefs<Borealis::
QueryProcessor> {
00280
static const int typecheck_METHOD_ID = 20000;
00281
struct typecheck_IN :
public rpc_request_1< Query > {
00282 Query query;
00283 typecheck_IN() {}
00284 typecheck_IN(Query query) : query(query) {}
00285 rpc_message *clone()
const {
return new typecheck_IN(*
this); }
00286 string as_string()
const {
00287 string out;
00288 out <<
"typecheck_IN{" <<
"query=" << query <<
"}";
00289
return out;
00290 }
00291 NMSTL_SIMPLY_SERIALIZABLE(typecheck_IN, << Serial::tag(
"param") << query );
00292 NMSTL_SERIAL_TAG(typecheck_IN,
"params");
00293 };
00294
class typecheck_OUT {
00295 RPCStatus m_stat;
00296 Query m_val;
00297
public:
00298 operator const void * ()
const {
return m_stat; }
00299
const RPCStatus& stat()
const {
return m_stat; }
00300 Query get()
const {
return m_val; }
00301 };
00302
static const int setup_query_METHOD_ID = 20001;
00303
struct setup_query_IN :
public rpc_request_1< void > {
00304 Query query;
00305 setup_query_IN() {}
00306 setup_query_IN(Query query) : query(query) {}
00307 rpc_message *clone()
const {
return new setup_query_IN(*
this); }
00308 string as_string()
const {
00309 string out;
00310 out <<
"setup_query_IN{" <<
"query=" << query <<
"}";
00311
return out;
00312 }
00313 NMSTL_SIMPLY_SERIALIZABLE(setup_query_IN, << Serial::tag(
"param") << query );
00314 NMSTL_SERIAL_TAG(setup_query_IN,
"params");
00315 };
00316
class setup_query_OUT {
00317 RPCStatus m_stat;
00318
public:
00319 operator const void * ()
const {
return m_stat; }
00320
const RPCStatus& stat()
const {
return m_stat; }
00321
bool get()
const {
return true; }
00322 };
00323
static const int typecheck_and_setup_METHOD_ID = 20002;
00324
struct typecheck_and_setup_IN :
public rpc_request_1< Query > {
00325 Query query;
00326 typecheck_and_setup_IN() {}
00327 typecheck_and_setup_IN(Query query) : query(query) {}
00328 rpc_message *clone()
const {
return new typecheck_and_setup_IN(*
this); }
00329 string as_string()
const {
00330 string out;
00331 out <<
"typecheck_and_setup_IN{" <<
"query=" << query <<
"}";
00332
return out;
00333 }
00334 NMSTL_SIMPLY_SERIALIZABLE(typecheck_and_setup_IN, << Serial::tag(
"param") << query );
00335 NMSTL_SERIAL_TAG(typecheck_and_setup_IN,
"params");
00336 };
00337
class typecheck_and_setup_OUT {
00338 RPCStatus m_stat;
00339 Query m_val;
00340
public:
00341 operator const void * ()
const {
return m_stat; }
00342
const RPCStatus& stat()
const {
return m_stat; }
00343 Query get()
const {
return m_val; }
00344 };
00345
static const int load_box_METHOD_ID = 20003;
00346
struct load_box_IN :
public rpc_request_1< void > {
00347 string file_path;
00348 load_box_IN() {}
00349 load_box_IN(string file_path) : file_path(file_path) {}
00350 rpc_message *clone()
const {
return new load_box_IN(*
this); }
00351 string as_string()
const {
00352 string out;
00353 out <<
"load_box_IN{" <<
"file_path=" << file_path <<
"}";
00354
return out;
00355 }
00356 NMSTL_SIMPLY_SERIALIZABLE(load_box_IN, << Serial::tag(
"param") << file_path );
00357 NMSTL_SERIAL_TAG(load_box_IN,
"params");
00358 };
00359
class load_box_OUT {
00360 RPCStatus m_stat;
00361
public:
00362 operator const void * ()
const {
return m_stat; }
00363
const RPCStatus& stat()
const {
return m_stat; }
00364
bool get()
const {
return true; }
00365 };
00366
static const int choke_queries_METHOD_ID = 20004;
00367
struct choke_queries_IN :
public rpc_request_1< vector<Query> > {
00368 vector<Query> queries;
00369 choke_queries_IN() {}
00370 choke_queries_IN(vector<Query> queries) : queries(queries) {}
00371 rpc_message *clone()
const {
return new choke_queries_IN(*
this); }
00372 string as_string()
const {
00373 string out;
00374 out <<
"choke_queries_IN{" <<
"queries=" << queries <<
"}";
00375
return out;
00376 }
00377 NMSTL_SIMPLY_SERIALIZABLE(choke_queries_IN, << Serial::tag(
"param") << queries );
00378 NMSTL_SERIAL_TAG(choke_queries_IN,
"params");
00379 };
00380
class choke_queries_OUT {
00381 RPCStatus m_stat;
00382 vector<Query> m_val;
00383
public:
00384 operator const void * ()
const {
return m_stat; }
00385
const RPCStatus& stat()
const {
return m_stat; }
00386 vector<Query> get()
const {
return m_val; }
00387 };
00388
static const int resume_queries_METHOD_ID = 20005;
00389
struct resume_queries_IN :
public rpc_request_1< void > {
00390 vector<Query> queries;
00391 resume_queries_IN() {}
00392 resume_queries_IN(vector<Query> queries) : queries(queries) {}
00393 rpc_message *clone()
const {
return new resume_queries_IN(*
this); }
00394 string as_string()
const {
00395 string out;
00396 out <<
"resume_queries_IN{" <<
"queries=" << queries <<
"}";
00397
return out;
00398 }
00399 NMSTL_SIMPLY_SERIALIZABLE(resume_queries_IN, << Serial::tag(
"param") << queries );
00400 NMSTL_SERIAL_TAG(resume_queries_IN,
"params");
00401 };
00402
class resume_queries_OUT {
00403 RPCStatus m_stat;
00404
public:
00405 operator const void * ()
const {
return m_stat; }
00406
const RPCStatus& stat()
const {
return m_stat; }
00407
bool get()
const {
return true; }
00408 };
00409
static const int pack_query_METHOD_ID = 20006;
00410
struct pack_query_IN :
public rpc_request_1< Query > {
00411 Query query;
00412 pack_query_IN() {}
00413 pack_query_IN(Query query) : query(query) {}
00414 rpc_message *clone()
const {
return new pack_query_IN(*
this); }
00415 string as_string()
const {
00416 string out;
00417 out <<
"pack_query_IN{" <<
"query=" << query <<
"}";
00418
return out;
00419 }
00420 NMSTL_SIMPLY_SERIALIZABLE(pack_query_IN, << Serial::tag(
"param") << query );
00421 NMSTL_SERIAL_TAG(pack_query_IN,
"params");
00422 };
00423
class pack_query_OUT {
00424 RPCStatus m_stat;
00425 Query m_val;
00426
public:
00427 operator const void * ()
const {
return m_stat; }
00428
const RPCStatus& stat()
const {
return m_stat; }
00429 Query get()
const {
return m_val; }
00430 };
00431
static const int pack_queries_METHOD_ID = 20007;
00432
struct pack_queries_IN :
public rpc_request_1< vector<Query> > {
00433 vector<Query> queries;
00434 pack_queries_IN() {}
00435 pack_queries_IN(vector<Query> queries) : queries(queries) {}
00436 rpc_message *clone()
const {
return new pack_queries_IN(*
this); }
00437 string as_string()
const {
00438 string out;
00439 out <<
"pack_queries_IN{" <<
"queries=" << queries <<
"}";
00440
return out;
00441 }
00442 NMSTL_SIMPLY_SERIALIZABLE(pack_queries_IN, << Serial::tag(
"param") << queries );
00443 NMSTL_SERIAL_TAG(pack_queries_IN,
"params");
00444 };
00445
class pack_queries_OUT {
00446 RPCStatus m_stat;
00447 vector<Query> m_val;
00448
public:
00449 operator const void * ()
const {
return m_stat; }
00450
const RPCStatus& stat()
const {
return m_stat; }
00451 vector<Query> get()
const {
return m_val; }
00452 };
00453
static const int remove_query_METHOD_ID = 20008;
00454
struct remove_query_IN :
public rpc_request_1< void > {
00455 Query query;
00456 remove_query_IN() {}
00457 remove_query_IN(Query query) : query(query) {}
00458 rpc_message *clone()
const {
return new remove_query_IN(*
this); }
00459 string as_string()
const {
00460 string out;
00461 out <<
"remove_query_IN{" <<
"query=" << query <<
"}";
00462
return out;
00463 }
00464 NMSTL_SIMPLY_SERIALIZABLE(remove_query_IN, << Serial::tag(
"param") << query );
00465 NMSTL_SERIAL_TAG(remove_query_IN,
"params");
00466 };
00467
class remove_query_OUT {
00468 RPCStatus m_stat;
00469
public:
00470 operator const void * ()
const {
return m_stat; }
00471
const RPCStatus& stat()
const {
return m_stat; }
00472
bool get()
const {
return true; }
00473 };
00474
static const int replace_query_METHOD_ID = 20009;
00475
struct replace_query_IN :
public rpc_request_1< void > {
00476 vector<Name> old_queries; vector<Query> new_queries;
00477 replace_query_IN() {}
00478 replace_query_IN(vector<Name> old_queries, vector<Query> new_queries) : old_queries(old_queries), new_queries(new_queries) {}
00479 rpc_message *clone()
const {
return new replace_query_IN(*
this); }
00480 string as_string()
const {
00481 string out;
00482 out <<
"replace_query_IN{" <<
"old_queries=" << old_queries <<
", new_queries=" << new_queries <<
"}";
00483
return out;
00484 }
00485 NMSTL_SIMPLY_SERIALIZABLE(replace_query_IN, << Serial::tag(
"param") << old_queries << new_queries );
00486 NMSTL_SERIAL_TAG(replace_query_IN,
"params");
00487 };
00488
class replace_query_OUT {
00489 RPCStatus m_stat;
00490
public:
00491 operator const void * ()
const {
return m_stat; }
00492
const RPCStatus& stat()
const {
return m_stat; }
00493
bool get()
const {
return true; }
00494 };
00495
static const int set_query_status_METHOD_ID = 20010;
00496
struct set_query_status_IN :
public rpc_request_1< void > {
00497 Name name; QueryStatus status;
00498 set_query_status_IN() {}
00499 set_query_status_IN(Name name, QueryStatus status) : name(name), status(status) {}
00500 rpc_message *clone()
const {
return new set_query_status_IN(*
this); }
00501 string as_string()
const {
00502 string out;
00503 out <<
"set_query_status_IN{" <<
"name=" << name <<
", status=" << status <<
"}";
00504
return out;
00505 }
00506 NMSTL_SIMPLY_SERIALIZABLE(set_query_status_IN, << Serial::tag(
"param") << name << status );
00507 NMSTL_SERIAL_TAG(set_query_status_IN,
"params");
00508 };
00509
class set_query_status_OUT {
00510 RPCStatus m_stat;
00511
public:
00512 operator const void * ()
const {
return m_stat; }
00513
const RPCStatus& stat()
const {
return m_stat; }
00514
bool get()
const {
return true; }
00515 };
00516
static const int set_queries_status_METHOD_ID = 20011;
00517
struct set_queries_status_IN :
public rpc_request_1< void > {
00518 vector<Name> name; QueryStatus status;
00519 set_queries_status_IN() {}
00520 set_queries_status_IN(vector<Name> name, QueryStatus status) : name(name), status(status) {}
00521 rpc_message *clone()
const {
return new set_queries_status_IN(*
this); }
00522 string as_string()
const {
00523 string out;
00524 out <<
"set_queries_status_IN{" <<
"name=" << name <<
", status=" << status <<
"}";
00525
return out;
00526 }
00527 NMSTL_SIMPLY_SERIALIZABLE(set_queries_status_IN, << Serial::tag(
"param") << name << status );
00528 NMSTL_SERIAL_TAG(set_queries_status_IN,
"params");
00529 };
00530
class set_queries_status_OUT {
00531 RPCStatus m_stat;
00532
public:
00533 operator const void * ()
const {
return m_stat; }
00534
const RPCStatus& stat()
const {
return m_stat; }
00535
bool get()
const {
return true; }
00536 };
00537
static const int subscribe_METHOD_ID = 20012;
00538
struct subscribe_IN :
public rpc_request_1< void > {
00539 Subscription sub;
unsigned int add_or_remove;
00540 subscribe_IN() {}
00541 subscribe_IN(Subscription sub,
unsigned int add_or_remove) : sub(sub), add_or_remove(add_or_remove) {}
00542 rpc_message *clone()
const {
return new subscribe_IN(*
this); }
00543 string as_string()
const {
00544 string out;
00545 out <<
"subscribe_IN{" <<
"sub=" << sub <<
", add_or_remove=" << add_or_remove <<
"}";
00546
return out;
00547 }
00548 NMSTL_SIMPLY_SERIALIZABLE(subscribe_IN, << Serial::tag(
"param") << sub << add_or_remove );
00549 NMSTL_SERIAL_TAG(subscribe_IN,
"params");
00550 };
00551
class subscribe_OUT {
00552 RPCStatus m_stat;
00553
public:
00554 operator const void * ()
const {
return m_stat; }
00555
const RPCStatus& stat()
const {
return m_stat; }
00556
bool get()
const {
return true; }
00557 };
00558
static const int subscribe_many_METHOD_ID = 20013;
00559
struct subscribe_many_IN :
public rpc_request_1< vector<Subscription> > {
00560 vector<Subscription> sub;
unsigned int add_or_remove;
00561 subscribe_many_IN() {}
00562 subscribe_many_IN(vector<Subscription> sub,
unsigned int add_or_remove) : sub(sub), add_or_remove(add_or_remove) {}
00563 rpc_message *clone()
const {
return new subscribe_many_IN(*
this); }
00564 string as_string()
const {
00565 string out;
00566 out <<
"subscribe_many_IN{" <<
"sub=" << sub <<
", add_or_remove=" << add_or_remove <<
"}";
00567
return out;
00568 }
00569 NMSTL_SIMPLY_SERIALIZABLE(subscribe_many_IN, << Serial::tag(
"param") << sub << add_or_remove );
00570 NMSTL_SERIAL_TAG(subscribe_many_IN,
"params");
00571 };
00572
class subscribe_many_OUT {
00573 RPCStatus m_stat;
00574 vector<Subscription> m_val;
00575
public:
00576 operator const void * ()
const {
return m_stat; }
00577
const RPCStatus& stat()
const {
return m_stat; }
00578 vector<Subscription> get()
const {
return m_val; }
00579 };
00580
static const int get_subscriptions_METHOD_ID = 20014;
00581
struct get_subscriptions_IN :
public rpc_request_1< vector<Subscription> > {
00582 vector<Name> streams;
00583 get_subscriptions_IN() {}
00584 get_subscriptions_IN(vector<Name> streams) : streams(streams) {}
00585 rpc_message *clone()
const {
return new get_subscriptions_IN(*
this); }
00586 string as_string()
const {
00587 string out;
00588 out <<
"get_subscriptions_IN{" <<
"streams=" << streams <<
"}";
00589
return out;
00590 }
00591 NMSTL_SIMPLY_SERIALIZABLE(get_subscriptions_IN, << Serial::tag(
"param") << streams );
00592 NMSTL_SERIAL_TAG(get_subscriptions_IN,
"params");
00593 };
00594
class get_subscriptions_OUT {
00595 RPCStatus m_stat;
00596 vector<Subscription> m_val;
00597
public:
00598 operator const void * ()
const {
return m_stat; }
00599
const RPCStatus& stat()
const {
return m_stat; }
00600 vector<Subscription> get()
const {
return m_val; }
00601 };
00602
static const int get_stats_METHOD_ID = 20015;
00603
struct get_stats_IN :
public rpc_request_1< vector<Stats> > {
00604
00605 get_stats_IN() {}
00606 rpc_message *clone()
const {
return new get_stats_IN(*
this); }
00607 string as_string()
const {
00608 string out;
00609 out <<
"get_stats_IN{" <<
"}";
00610
return out;
00611 }
00612 NMSTL_SIMPLY_SERIALIZABLE(get_stats_IN, << Serial::tag(
"param") );
00613 NMSTL_SERIAL_TAG(get_stats_IN,
"params");
00614 };
00615
class get_stats_OUT {
00616 RPCStatus m_stat;
00617 vector<Stats> m_val;
00618
public:
00619 operator const void * ()
const {
return m_stat; }
00620
const RPCStatus& stat()
const {
return m_stat; }
00621 vector<Stats> get()
const {
return m_val; }
00622 };
00623
static const int get_sel_METHOD_ID = 20016;
00624
struct get_sel_IN :
public rpc_request_1< double > {
00625
00626 get_sel_IN() {}
00627 rpc_message *clone()
const {
return new get_sel_IN(*
this); }
00628 string as_string()
const {
00629 string out;
00630 out <<
"get_sel_IN{" <<
"}";
00631
return out;
00632 }
00633 NMSTL_SIMPLY_SERIALIZABLE(get_sel_IN, << Serial::tag(
"param") );
00634 NMSTL_SERIAL_TAG(get_sel_IN,
"params");
00635 };
00636
class get_sel_OUT {
00637 RPCStatus m_stat;
00638
double m_val;
00639
public:
00640 operator const void * ()
const {
return m_stat; }
00641
const RPCStatus& stat()
const {
return m_stat; }
00642
double get()
const {
return m_val; }
00643 };
00644
static const int create_stream_METHOD_ID = 20017;
00645
struct create_stream_IN :
public rpc_request_1< void > {
00646 StreamDef stream;
00647 create_stream_IN() {}
00648 create_stream_IN(StreamDef stream) : stream(stream) {}
00649 rpc_message *clone()
const {
return new create_stream_IN(*
this); }
00650 string as_string()
const {
00651 string out;
00652 out <<
"create_stream_IN{" <<
"stream=" << stream <<
"}";
00653
return out;
00654 }
00655 NMSTL_SIMPLY_SERIALIZABLE(create_stream_IN, << Serial::tag(
"param") << stream );
00656 NMSTL_SERIAL_TAG(create_stream_IN,
"params");
00657 };
00658
class create_stream_OUT {
00659 RPCStatus m_stat;
00660
public:
00661 operator const void * ()
const {
return m_stat; }
00662
const RPCStatus& stat()
const {
return m_stat; }
00663
bool get()
const {
return true; }
00664 };
00665
static const int create_cpview_METHOD_ID = 20018;
00666
struct create_cpview_IN :
public rpc_request_1< void > {
00667 CPViewDescription view_desc; StreamDef streamdef;
00668 create_cpview_IN() {}
00669 create_cpview_IN(CPViewDescription view_desc, StreamDef streamdef) : view_desc(view_desc), streamdef(streamdef) {}
00670 rpc_message *clone()
const {
return new create_cpview_IN(*
this); }
00671 string as_string()
const {
00672 string out;
00673 out <<
"create_cpview_IN{" <<
"view_desc=" << view_desc <<
", streamdef=" << streamdef <<
"}";
00674
return out;
00675 }
00676 NMSTL_SIMPLY_SERIALIZABLE(create_cpview_IN, << Serial::tag(
"param") << view_desc << streamdef );
00677 NMSTL_SERIAL_TAG(create_cpview_IN,
"params");
00678 };
00679
class create_cpview_OUT {
00680 RPCStatus m_stat;
00681
public:
00682 operator const void * ()
const {
return m_stat; }
00683
const RPCStatus& stat()
const {
return m_stat; }
00684
bool get()
const {
return true; }
00685 };
00686
static const int update_stream_METHOD_ID = 20019;
00687
struct update_stream_IN :
public rpc_request_1< void > {
00688 StreamDef old_sd; StreamDef new_sd;
00689 update_stream_IN() {}
00690 update_stream_IN(StreamDef old_sd, StreamDef new_sd) : old_sd(old_sd), new_sd(new_sd) {}
00691 rpc_message *clone()
const {
return new update_stream_IN(*
this); }
00692 string as_string()
const {
00693 string out;
00694 out <<
"update_stream_IN{" <<
"old_sd=" << old_sd <<
", new_sd=" << new_sd <<
"}";
00695
return out;
00696 }
00697 NMSTL_SIMPLY_SERIALIZABLE(update_stream_IN, << Serial::tag(
"param") << old_sd << new_sd );
00698 NMSTL_SERIAL_TAG(update_stream_IN,
"params");
00699 };
00700
class update_stream_OUT {
00701 RPCStatus m_stat;
00702
public:
00703 operator const void * ()
const {
return m_stat; }
00704
const RPCStatus& stat()
const {
return m_stat; }
00705
bool get()
const {
return true; }
00706 };
00707
static const int ack_METHOD_ID = 20020;
00708
struct ack_IN :
public rpc_request_1< void > {
00709 MedusaID node; StreamID
id; string last_tuple;
00710 ack_IN() {}
00711 ack_IN(MedusaID node, StreamID
id, string last_tuple) : node(node), id(id), last_tuple(last_tuple) {}
00712 rpc_message *clone()
const {
return new ack_IN(*
this); }
00713 string as_string()
const {
00714 string out;
00715 out <<
"ack_IN{" <<
"node=" << node <<
", id=" <<
id <<
", last_tuple=" << last_tuple <<
"}";
00716
return out;
00717 }
00718 NMSTL_SIMPLY_SERIALIZABLE(ack_IN, << Serial::tag(
"param") << node <<
id << last_tuple );
00719 NMSTL_SERIAL_TAG(ack_IN,
"params");
00720 };
00721
class ack_OUT {
00722 RPCStatus m_stat;
00723
public:
00724 operator const void * ()
const {
return m_stat; }
00725
const RPCStatus& stat()
const {
return m_stat; }
00726
bool get()
const {
return true; }
00727 };
00728
static const int trim_METHOD_ID = 20021;
00729
struct trim_IN :
public rpc_request_1< void > {
00730 MedusaID node; StreamID
id; string last_tuple;
00731 trim_IN() {}
00732 trim_IN(MedusaID node, StreamID
id, string last_tuple) : node(node), id(id), last_tuple(last_tuple) {}
00733 rpc_message *clone()
const {
return new trim_IN(*
this); }
00734 string as_string()
const {
00735 string out;
00736 out <<
"trim_IN{" <<
"node=" << node <<
", id=" <<
id <<
", last_tuple=" << last_tuple <<
"}";
00737
return out;
00738 }
00739 NMSTL_SIMPLY_SERIALIZABLE(trim_IN, << Serial::tag(
"param") << node <<
id << last_tuple );
00740 NMSTL_SERIAL_TAG(trim_IN,
"params");
00741 };
00742
class trim_OUT {
00743 RPCStatus m_stat;
00744
public:
00745 operator const void * ()
const {
return m_stat; }
00746
const RPCStatus& stat()
const {
return m_stat; }
00747
bool get()
const {
return true; }
00748 };
00749
static const int set_recovery_method_METHOD_ID = 20022;
00750
struct set_recovery_method_IN :
public rpc_request_1< void > {
00751
int method;
00752 set_recovery_method_IN() {}
00753 set_recovery_method_IN(
int method) : method(method) {}
00754 rpc_message *clone()
const {
return new set_recovery_method_IN(*
this); }
00755 string as_string()
const {
00756 string out;
00757 out <<
"set_recovery_method_IN{" <<
"method=" << method <<
"}";
00758
return out;
00759 }
00760 NMSTL_SIMPLY_SERIALIZABLE(set_recovery_method_IN, << Serial::tag(
"param") << method );
00761 NMSTL_SERIAL_TAG(set_recovery_method_IN,
"params");
00762 };
00763
class set_recovery_method_OUT {
00764 RPCStatus m_stat;
00765
public:
00766 operator const void * ()
const {
return m_stat; }
00767
const RPCStatus& stat()
const {
return m_stat; }
00768
bool get()
const {
return true; }
00769 };
00770
static const int set_primary_status_METHOD_ID = 20023;
00771
struct set_primary_status_IN :
public rpc_request_1< void > {
00772
bool status;
00773 set_primary_status_IN() {}
00774 set_primary_status_IN(
bool status) : status(status) {}
00775 rpc_message *clone()
const {
return new set_primary_status_IN(*
this); }
00776 string as_string()
const {
00777 string out;
00778 out <<
"set_primary_status_IN{" <<
"status=" << status <<
"}";
00779
return out;
00780 }
00781 NMSTL_SIMPLY_SERIALIZABLE(set_primary_status_IN, << Serial::tag(
"param") << status );
00782 NMSTL_SERIAL_TAG(set_primary_status_IN,
"params");
00783 };
00784
class set_primary_status_OUT {
00785 RPCStatus m_stat;
00786
public:
00787 operator const void * ()
const {
return m_stat; }
00788
const RPCStatus& stat()
const {
return m_stat; }
00789
bool get()
const {
return true; }
00790 };
00791
static const int set_secondaries_METHOD_ID = 20024;
00792
struct set_secondaries_IN :
public rpc_request_1< void > {
00793 vector<MedusaID> secondaries;
00794 set_secondaries_IN() {}
00795 set_secondaries_IN(vector<MedusaID> secondaries) : secondaries(secondaries) {}
00796 rpc_message *clone()
const {
return new set_secondaries_IN(*
this); }
00797 string as_string()
const {
00798 string out;
00799 out <<
"set_secondaries_IN{" <<
"secondaries=" << secondaries <<
"}";
00800
return out;
00801 }
00802 NMSTL_SIMPLY_SERIALIZABLE(set_secondaries_IN, << Serial::tag(
"param") << secondaries );
00803 NMSTL_SERIAL_TAG(set_secondaries_IN,
"params");
00804 };
00805
class set_secondaries_OUT {
00806 RPCStatus m_stat;
00807
public:
00808 operator const void * ()
const {
return m_stat; }
00809
const RPCStatus& stat()
const {
return m_stat; }
00810
bool get()
const {
return true; }
00811 };
00812
static const int set_replicas_METHOD_ID = 20025;
00813
struct set_replicas_IN :
public rpc_request_1< void > {
00814 vector<MedusaID> replicas;
00815 set_replicas_IN() {}
00816 set_replicas_IN(vector<MedusaID> replicas) : replicas(replicas) {}
00817 rpc_message *clone()
const {
return new set_replicas_IN(*
this); }
00818 string as_string()
const {
00819 string out;
00820 out <<
"set_replicas_IN{" <<
"replicas=" << replicas <<
"}";
00821
return out;
00822 }
00823 NMSTL_SIMPLY_SERIALIZABLE(set_replicas_IN, << Serial::tag(
"param") << replicas );
00824 NMSTL_SERIAL_TAG(set_replicas_IN,
"params");
00825 };
00826
class set_replicas_OUT {
00827 RPCStatus m_stat;
00828
public:
00829 operator const void * ()
const {
return m_stat; }
00830
const RPCStatus& stat()
const {
return m_stat; }
00831
bool get()
const {
return true; }
00832 };
00833
static const int checkpoint_METHOD_ID = 20026;
00834
struct checkpoint_IN :
public rpc_request_1< void > {
00835 vector<StreamEvent> tuples_to_checkpoint; map<StreamID,string> tuples_to_trim;
00836 checkpoint_IN() {}
00837 checkpoint_IN(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim) : tuples_to_checkpoint(tuples_to_checkpoint), tuples_to_trim(tuples_to_trim) {}
00838 rpc_message *clone()
const {
return new checkpoint_IN(*
this); }
00839 string as_string()
const {
00840 string out;
00841 out <<
"checkpoint_IN{" <<
"tuples_to_checkpoint=" << tuples_to_checkpoint <<
", tuples_to_trim=" << tuples_to_trim <<
"}";
00842
return out;
00843 }
00844 NMSTL_SIMPLY_SERIALIZABLE(checkpoint_IN, << Serial::tag(
"param") << tuples_to_checkpoint << tuples_to_trim );
00845 NMSTL_SERIAL_TAG(checkpoint_IN,
"params");
00846 };
00847
class checkpoint_OUT {
00848 RPCStatus m_stat;
00849
public:
00850 operator const void * ()
const {
return m_stat; }
00851
const RPCStatus& stat()
const {
return m_stat; }
00852
bool get()
const {
return true; }
00853 };
00854
static const vector<pair<string, int> >& method_list() {
00855
static const pair<string, int> methods[] = {
00856 pair<string, int>(
"typecheck", 20000),
00857 pair<string, int>(
"setup_query", 20001),
00858 pair<string, int>(
"typecheck_and_setup", 20002),
00859 pair<string, int>(
"load_box", 20003),
00860 pair<string, int>(
"choke_queries", 20004),
00861 pair<string, int>(
"resume_queries", 20005),
00862 pair<string, int>(
"pack_query", 20006),
00863 pair<string, int>(
"pack_queries", 20007),
00864 pair<string, int>(
"remove_query", 20008),
00865 pair<string, int>(
"replace_query", 20009),
00866 pair<string, int>(
"set_query_status", 20010),
00867 pair<string, int>(
"set_queries_status", 20011),
00868 pair<string, int>(
"subscribe", 20012),
00869 pair<string, int>(
"subscribe_many", 20013),
00870 pair<string, int>(
"get_subscriptions", 20014),
00871 pair<string, int>(
"get_stats", 20015),
00872 pair<string, int>(
"get_sel", 20016),
00873 pair<string, int>(
"create_stream", 20017),
00874 pair<string, int>(
"create_cpview", 20018),
00875 pair<string, int>(
"update_stream", 20019),
00876 pair<string, int>(
"ack", 20020),
00877 pair<string, int>(
"trim", 20021),
00878 pair<string, int>(
"set_recovery_method", 20022),
00879 pair<string, int>(
"set_primary_status", 20023),
00880 pair<string, int>(
"set_secondaries", 20024),
00881 pair<string, int>(
"set_replicas", 20025),
00882 pair<string, int>(
"checkpoint", 20026)
00883 };
00884
static const vector<pair<string, int> > ret(methods, methods +
sizeof methods /
sizeof methods[0]);
00885
return ret;
00886 };
00887
static const map<string, int>& method_id_by_name() {
00888
static const map<string, int> ret(method_list().begin(), method_list().end());
00889
return ret;
00890 }
00891
static const map<int, string>& method_name_by_id() {
00892
static const map<int, string> ret = RPCObject::reverse_map(method_id_by_name());
00893
return ret;
00894 }
00895 };
00896
template<>
00897
class Remote<Borealis::
QueryProcessor> :
public RemoteObject {
00898
friend class RPCClient;
00899
protected:
00900 Remote<Borealis::QueryProcessor>(RPCClient& cli, rpc_object_id object_id) :
00901 RemoteObject(cli, object_id) {}
00902
00903
public:
00904 Remote<Borealis::QueryProcessor>() {}
00905
void typecheck(
const Callback<
void, RPC< Query > >& completion, Query query)
const {
00906 RPCDefs<Borealis::QueryProcessor>::typecheck_IN *req =
00907
new RPCDefs<Borealis::QueryProcessor>::typecheck_IN(query);
00908 req->object_id = object_id();
00909 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID;
00910 req->method_name =
"typecheck";
00911 req->cb = completion;
00912 client().send_request(ptr<rpc_request>(req));
00913 }
00914 RPC< Query > typecheck(Query query)
const {
00915 RPCDefs<Borealis::QueryProcessor>::typecheck_IN *req =
00916
new RPCDefs<Borealis::QueryProcessor>::typecheck_IN(query);
00917 ptr<rpc_request> preq(req);
00918 req->object_id = object_id();
00919 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID;
00920 req->method_name =
"typecheck";
00921
bool done =
false; RPC< Query > ret;
00922 req->cb = wrap(&rpc_sync_completion< Query >, &done, &ret);
00923 client().send_request(preq);
00924
while (!done && client().block()) ;
00925 req->cb = Callback<void, RPC< Query > >();
00926
return ret;
00927 }
00928
typedef const RPCDefs<Borealis::QueryProcessor>::typecheck_OUT &typecheck_result;
00929
void setup_query(
const Callback<
void, RPC< void > >& completion, Query query)
const {
00930 RPCDefs<Borealis::QueryProcessor>::setup_query_IN *req =
00931
new RPCDefs<Borealis::QueryProcessor>::setup_query_IN(query);
00932 req->object_id = object_id();
00933 req->method_id = RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID;
00934 req->method_name =
"setup_query";
00935 req->cb = completion;
00936 client().send_request(ptr<rpc_request>(req));
00937 }
00938 RPC< void > setup_query(Query query)
const {
00939 RPCDefs<Borealis::QueryProcessor>::setup_query_IN *req =
00940
new RPCDefs<Borealis::QueryProcessor>::setup_query_IN(query);
00941 ptr<rpc_request> preq(req);
00942 req->object_id = object_id();
00943 req->method_id = RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID;
00944 req->method_name =
"setup_query";
00945
bool done =
false; RPC< void > ret;
00946 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
00947 client().send_request(preq);
00948
while (!done && client().block()) ;
00949 req->cb = Callback<void, RPC< void > >();
00950
return ret;
00951 }
00952
typedef const RPCDefs<Borealis::QueryProcessor>::setup_query_OUT &setup_query_result;
00953
void typecheck_and_setup(
const Callback<
void, RPC< Query > >& completion, Query query)
const {
00954 RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN *req =
00955
new RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN(query);
00956 req->object_id = object_id();
00957 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID;
00958 req->method_name =
"typecheck_and_setup";
00959 req->cb = completion;
00960 client().send_request(ptr<rpc_request>(req));
00961 }
00962 RPC< Query > typecheck_and_setup(Query query)
const {
00963 RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN *req =
00964
new RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN(query);
00965 ptr<rpc_request> preq(req);
00966 req->object_id = object_id();
00967 req->method_id = RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID;
00968 req->method_name =
"typecheck_and_setup";
00969
bool done =
false; RPC< Query > ret;
00970 req->cb = wrap(&rpc_sync_completion< Query >, &done, &ret);
00971 client().send_request(preq);
00972
while (!done && client().block()) ;
00973 req->cb = Callback<void, RPC< Query > >();
00974
return ret;
00975 }
00976
typedef const RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_OUT &typecheck_and_setup_result;
00977
void load_box(
const Callback<
void, RPC< void > >& completion, string file_path)
const {
00978 RPCDefs<Borealis::QueryProcessor>::load_box_IN *req =
00979
new RPCDefs<Borealis::QueryProcessor>::load_box_IN(file_path);
00980 req->object_id = object_id();
00981 req->method_id = RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID;
00982 req->method_name =
"load_box";
00983 req->cb = completion;
00984 client().send_request(ptr<rpc_request>(req));
00985 }
00986 RPC< void > load_box(string file_path)
const {
00987 RPCDefs<Borealis::QueryProcessor>::load_box_IN *req =
00988
new RPCDefs<Borealis::QueryProcessor>::load_box_IN(file_path);
00989 ptr<rpc_request> preq(req);
00990 req->object_id = object_id();
00991 req->method_id = RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID;
00992 req->method_name =
"load_box";
00993
bool done =
false; RPC< void > ret;
00994 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
00995 client().send_request(preq);
00996
while (!done && client().block()) ;
00997 req->cb = Callback<void, RPC< void > >();
00998
return ret;
00999 }
01000
typedef const RPCDefs<Borealis::QueryProcessor>::load_box_OUT &load_box_result;
01001
void choke_queries(
const Callback<
void, RPC< vector<Query> > >& completion, vector<Query> queries)
const {
01002 RPCDefs<Borealis::QueryProcessor>::choke_queries_IN *req =
01003
new RPCDefs<Borealis::QueryProcessor>::choke_queries_IN(queries);
01004 req->object_id = object_id();
01005 req->method_id = RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID;
01006 req->method_name =
"choke_queries";
01007 req->cb = completion;
01008 client().send_request(ptr<rpc_request>(req));
01009 }
01010 RPC< vector<Query> > choke_queries(vector<Query> queries)
const {
01011 RPCDefs<Borealis::QueryProcessor>::choke_queries_IN *req =
01012
new RPCDefs<Borealis::QueryProcessor>::choke_queries_IN(queries);
01013 ptr<rpc_request> preq(req);
01014 req->object_id = object_id();
01015 req->method_id = RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID;
01016 req->method_name =
"choke_queries";
01017
bool done =
false; RPC< vector<Query> > ret;
01018 req->cb = wrap(&rpc_sync_completion< vector<Query> >, &done, &ret);
01019 client().send_request(preq);
01020
while (!done && client().block()) ;
01021 req->cb = Callback<void, RPC< vector<Query> > >();
01022
return ret;
01023 }
01024
typedef const RPCDefs<Borealis::QueryProcessor>::choke_queries_OUT &choke_queries_result;
01025
void resume_queries(
const Callback<
void, RPC< void > >& completion, vector<Query> queries)
const {
01026 RPCDefs<Borealis::QueryProcessor>::resume_queries_IN *req =
01027
new RPCDefs<Borealis::QueryProcessor>::resume_queries_IN(queries);
01028 req->object_id = object_id();
01029 req->method_id = RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID;
01030 req->method_name =
"resume_queries";
01031 req->cb = completion;
01032 client().send_request(ptr<rpc_request>(req));
01033 }
01034 RPC< void > resume_queries(vector<Query> queries)
const {
01035 RPCDefs<Borealis::QueryProcessor>::resume_queries_IN *req =
01036
new RPCDefs<Borealis::QueryProcessor>::resume_queries_IN(queries);
01037 ptr<rpc_request> preq(req);
01038 req->object_id = object_id();
01039 req->method_id = RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID;
01040 req->method_name =
"resume_queries";
01041
bool done =
false; RPC< void > ret;
01042 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01043 client().send_request(preq);
01044
while (!done && client().block()) ;
01045 req->cb = Callback<void, RPC< void > >();
01046
return ret;
01047 }
01048
typedef const RPCDefs<Borealis::QueryProcessor>::resume_queries_OUT &resume_queries_result;
01049
void pack_query(
const Callback<
void, RPC< Query > >& completion, Query query)
const {
01050 RPCDefs<Borealis::QueryProcessor>::pack_query_IN *req =
01051
new RPCDefs<Borealis::QueryProcessor>::pack_query_IN(query);
01052 req->object_id = object_id();
01053 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID;
01054 req->method_name =
"pack_query";
01055 req->cb = completion;
01056 client().send_request(ptr<rpc_request>(req));
01057 }
01058 RPC< Query > pack_query(Query query)
const {
01059 RPCDefs<Borealis::QueryProcessor>::pack_query_IN *req =
01060
new RPCDefs<Borealis::QueryProcessor>::pack_query_IN(query);
01061 ptr<rpc_request> preq(req);
01062 req->object_id = object_id();
01063 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID;
01064 req->method_name =
"pack_query";
01065
bool done =
false; RPC< Query > ret;
01066 req->cb = wrap(&rpc_sync_completion< Query >, &done, &ret);
01067 client().send_request(preq);
01068
while (!done && client().block()) ;
01069 req->cb = Callback<void, RPC< Query > >();
01070
return ret;
01071 }
01072
typedef const RPCDefs<Borealis::QueryProcessor>::pack_query_OUT &pack_query_result;
01073
void pack_queries(
const Callback<
void, RPC< vector<Query> > >& completion, vector<Query> queries)
const {
01074 RPCDefs<Borealis::QueryProcessor>::pack_queries_IN *req =
01075
new RPCDefs<Borealis::QueryProcessor>::pack_queries_IN(queries);
01076 req->object_id = object_id();
01077 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID;
01078 req->method_name =
"pack_queries";
01079 req->cb = completion;
01080 client().send_request(ptr<rpc_request>(req));
01081 }
01082 RPC< vector<Query> > pack_queries(vector<Query> queries)
const {
01083 RPCDefs<Borealis::QueryProcessor>::pack_queries_IN *req =
01084
new RPCDefs<Borealis::QueryProcessor>::pack_queries_IN(queries);
01085 ptr<rpc_request> preq(req);
01086 req->object_id = object_id();
01087 req->method_id = RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID;
01088 req->method_name =
"pack_queries";
01089
bool done =
false; RPC< vector<Query> > ret;
01090 req->cb = wrap(&rpc_sync_completion< vector<Query> >, &done, &ret);
01091 client().send_request(preq);
01092
while (!done && client().block()) ;
01093 req->cb = Callback<void, RPC< vector<Query> > >();
01094
return ret;
01095 }
01096
typedef const RPCDefs<Borealis::QueryProcessor>::pack_queries_OUT &pack_queries_result;
01097
void remove_query(
const Callback<
void, RPC< void > >& completion, Query query)
const {
01098 RPCDefs<Borealis::QueryProcessor>::remove_query_IN *req =
01099
new RPCDefs<Borealis::QueryProcessor>::remove_query_IN(query);
01100 req->object_id = object_id();
01101 req->method_id = RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID;
01102 req->method_name =
"remove_query";
01103 req->cb = completion;
01104 client().send_request(ptr<rpc_request>(req));
01105 }
01106 RPC< void > remove_query(Query query)
const {
01107 RPCDefs<Borealis::QueryProcessor>::remove_query_IN *req =
01108
new RPCDefs<Borealis::QueryProcessor>::remove_query_IN(query);
01109 ptr<rpc_request> preq(req);
01110 req->object_id = object_id();
01111 req->method_id = RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID;
01112 req->method_name =
"remove_query";
01113
bool done =
false; RPC< void > ret;
01114 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01115 client().send_request(preq);
01116
while (!done && client().block()) ;
01117 req->cb = Callback<void, RPC< void > >();
01118
return ret;
01119 }
01120
typedef const RPCDefs<Borealis::QueryProcessor>::remove_query_OUT &remove_query_result;
01121
void replace_query(
const Callback<
void, RPC< void > >& completion, vector<Name> old_queries, vector<Query> new_queries)
const {
01122 RPCDefs<Borealis::QueryProcessor>::replace_query_IN *req =
01123
new RPCDefs<Borealis::QueryProcessor>::replace_query_IN(old_queries, new_queries);
01124 req->object_id = object_id();
01125 req->method_id = RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID;
01126 req->method_name =
"replace_query";
01127 req->cb = completion;
01128 client().send_request(ptr<rpc_request>(req));
01129 }
01130 RPC< void > replace_query(vector<Name> old_queries, vector<Query> new_queries)
const {
01131 RPCDefs<Borealis::QueryProcessor>::replace_query_IN *req =
01132
new RPCDefs<Borealis::QueryProcessor>::replace_query_IN(old_queries, new_queries);
01133 ptr<rpc_request> preq(req);
01134 req->object_id = object_id();
01135 req->method_id = RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID;
01136 req->method_name =
"replace_query";
01137
bool done =
false; RPC< void > ret;
01138 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01139 client().send_request(preq);
01140
while (!done && client().block()) ;
01141 req->cb = Callback<void, RPC< void > >();
01142
return ret;
01143 }
01144
typedef const RPCDefs<Borealis::QueryProcessor>::replace_query_OUT &replace_query_result;
01145
void set_query_status(
const Callback<
void, RPC< void > >& completion, Name name, QueryStatus status)
const {
01146 RPCDefs<Borealis::QueryProcessor>::set_query_status_IN *req =
01147
new RPCDefs<Borealis::QueryProcessor>::set_query_status_IN(name, status);
01148 req->object_id = object_id();
01149 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID;
01150 req->method_name =
"set_query_status";
01151 req->cb = completion;
01152 client().send_request(ptr<rpc_request>(req));
01153 }
01154 RPC< void > set_query_status(Name name, QueryStatus status)
const {
01155 RPCDefs<Borealis::QueryProcessor>::set_query_status_IN *req =
01156
new RPCDefs<Borealis::QueryProcessor>::set_query_status_IN(name, status);
01157 ptr<rpc_request> preq(req);
01158 req->object_id = object_id();
01159 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID;
01160 req->method_name =
"set_query_status";
01161
bool done =
false; RPC< void > ret;
01162 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01163 client().send_request(preq);
01164
while (!done && client().block()) ;
01165 req->cb = Callback<void, RPC< void > >();
01166
return ret;
01167 }
01168
typedef const RPCDefs<Borealis::QueryProcessor>::set_query_status_OUT &set_query_status_result;
01169
void set_queries_status(
const Callback<
void, RPC< void > >& completion, vector<Name> name, QueryStatus status)
const {
01170 RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN *req =
01171
new RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN(name, status);
01172 req->object_id = object_id();
01173 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID;
01174 req->method_name =
"set_queries_status";
01175 req->cb = completion;
01176 client().send_request(ptr<rpc_request>(req));
01177 }
01178 RPC< void > set_queries_status(vector<Name> name, QueryStatus status)
const {
01179 RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN *req =
01180
new RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN(name, status);
01181 ptr<rpc_request> preq(req);
01182 req->object_id = object_id();
01183 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID;
01184 req->method_name =
"set_queries_status";
01185
bool done =
false; RPC< void > ret;
01186 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01187 client().send_request(preq);
01188
while (!done && client().block()) ;
01189 req->cb = Callback<void, RPC< void > >();
01190
return ret;
01191 }
01192
typedef const RPCDefs<Borealis::QueryProcessor>::set_queries_status_OUT &set_queries_status_result;
01193
void subscribe(
const Callback<
void, RPC< void > >& completion, Subscription sub,
unsigned int add_or_remove)
const {
01194 RPCDefs<Borealis::QueryProcessor>::subscribe_IN *req =
01195
new RPCDefs<Borealis::QueryProcessor>::subscribe_IN(sub, add_or_remove);
01196 req->object_id = object_id();
01197 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID;
01198 req->method_name =
"subscribe";
01199 req->cb = completion;
01200 client().send_request(ptr<rpc_request>(req));
01201 }
01202 RPC< void > subscribe(Subscription sub,
unsigned int add_or_remove)
const {
01203 RPCDefs<Borealis::QueryProcessor>::subscribe_IN *req =
01204
new RPCDefs<Borealis::QueryProcessor>::subscribe_IN(sub, add_or_remove);
01205 ptr<rpc_request> preq(req);
01206 req->object_id = object_id();
01207 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID;
01208 req->method_name =
"subscribe";
01209
bool done =
false; RPC< void > ret;
01210 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01211 client().send_request(preq);
01212
while (!done && client().block()) ;
01213 req->cb = Callback<void, RPC< void > >();
01214
return ret;
01215 }
01216
typedef const RPCDefs<Borealis::QueryProcessor>::subscribe_OUT &subscribe_result;
01217
void subscribe_many(
const Callback<
void, RPC< vector<Subscription> > >& completion, vector<Subscription> sub,
unsigned int add_or_remove)
const {
01218 RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN *req =
01219
new RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN(sub, add_or_remove);
01220 req->object_id = object_id();
01221 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID;
01222 req->method_name =
"subscribe_many";
01223 req->cb = completion;
01224 client().send_request(ptr<rpc_request>(req));
01225 }
01226 RPC< vector<Subscription> > subscribe_many(vector<Subscription> sub,
unsigned int add_or_remove)
const {
01227 RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN *req =
01228
new RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN(sub, add_or_remove);
01229 ptr<rpc_request> preq(req);
01230 req->object_id = object_id();
01231 req->method_id = RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID;
01232 req->method_name =
"subscribe_many";
01233
bool done =
false; RPC< vector<Subscription> > ret;
01234 req->cb = wrap(&rpc_sync_completion< vector<Subscription> >, &done, &ret);
01235 client().send_request(preq);
01236
while (!done && client().block()) ;
01237 req->cb = Callback<void, RPC< vector<Subscription> > >();
01238
return ret;
01239 }
01240
typedef const RPCDefs<Borealis::QueryProcessor>::subscribe_many_OUT &subscribe_many_result;
01241
void get_subscriptions(
const Callback<
void, RPC< vector<Subscription> > >& completion, vector<Name> streams)
const {
01242 RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN *req =
01243
new RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN(streams);
01244 req->object_id = object_id();
01245 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID;
01246 req->method_name =
"get_subscriptions";
01247 req->cb = completion;
01248 client().send_request(ptr<rpc_request>(req));
01249 }
01250 RPC< vector<Subscription> > get_subscriptions(vector<Name> streams)
const {
01251 RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN *req =
01252
new RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN(streams);
01253 ptr<rpc_request> preq(req);
01254 req->object_id = object_id();
01255 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID;
01256 req->method_name =
"get_subscriptions";
01257
bool done =
false; RPC< vector<Subscription> > ret;
01258 req->cb = wrap(&rpc_sync_completion< vector<Subscription> >, &done, &ret);
01259 client().send_request(preq);
01260
while (!done && client().block()) ;
01261 req->cb = Callback<void, RPC< vector<Subscription> > >();
01262
return ret;
01263 }
01264
typedef const RPCDefs<Borealis::QueryProcessor>::get_subscriptions_OUT &get_subscriptions_result;
01265
void get_stats(
const Callback<
void, RPC< vector<Stats> > >& completion)
const {
01266 RPCDefs<Borealis::QueryProcessor>::get_stats_IN *req =
01267
new RPCDefs<Borealis::QueryProcessor>::get_stats_IN();
01268 req->object_id = object_id();
01269 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID;
01270 req->method_name =
"get_stats";
01271 req->cb = completion;
01272 client().send_request(ptr<rpc_request>(req));
01273 }
01274 RPC< vector<Stats> > get_stats()
const {
01275 RPCDefs<Borealis::QueryProcessor>::get_stats_IN *req =
01276
new RPCDefs<Borealis::QueryProcessor>::get_stats_IN();
01277 ptr<rpc_request> preq(req);
01278 req->object_id = object_id();
01279 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID;
01280 req->method_name =
"get_stats";
01281
bool done =
false; RPC< vector<Stats> > ret;
01282 req->cb = wrap(&rpc_sync_completion< vector<Stats> >, &done, &ret);
01283 client().send_request(preq);
01284
while (!done && client().block()) ;
01285 req->cb = Callback<void, RPC< vector<Stats> > >();
01286
return ret;
01287 }
01288
typedef const RPCDefs<Borealis::QueryProcessor>::get_stats_OUT &get_stats_result;
01289
void get_sel(
const Callback<
void, RPC< double > >& completion)
const {
01290 RPCDefs<Borealis::QueryProcessor>::get_sel_IN *req =
01291
new RPCDefs<Borealis::QueryProcessor>::get_sel_IN();
01292 req->object_id = object_id();
01293 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID;
01294 req->method_name =
"get_sel";
01295 req->cb = completion;
01296 client().send_request(ptr<rpc_request>(req));
01297 }
01298 RPC< double > get_sel()
const {
01299 RPCDefs<Borealis::QueryProcessor>::get_sel_IN *req =
01300
new RPCDefs<Borealis::QueryProcessor>::get_sel_IN();
01301 ptr<rpc_request> preq(req);
01302 req->object_id = object_id();
01303 req->method_id = RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID;
01304 req->method_name =
"get_sel";
01305
bool done =
false; RPC< double > ret;
01306 req->cb = wrap(&rpc_sync_completion< double >, &done, &ret);
01307 client().send_request(preq);
01308
while (!done && client().block()) ;
01309 req->cb = Callback<void, RPC< double > >();
01310
return ret;
01311 }
01312
typedef const RPCDefs<Borealis::QueryProcessor>::get_sel_OUT &get_sel_result;
01313
void create_stream(
const Callback<
void, RPC< void > >& completion, StreamDef stream)
const {
01314 RPCDefs<Borealis::QueryProcessor>::create_stream_IN *req =
01315
new RPCDefs<Borealis::QueryProcessor>::create_stream_IN(stream);
01316 req->object_id = object_id();
01317 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID;
01318 req->method_name =
"create_stream";
01319 req->cb = completion;
01320 client().send_request(ptr<rpc_request>(req));
01321 }
01322 RPC< void > create_stream(StreamDef stream)
const {
01323 RPCDefs<Borealis::QueryProcessor>::create_stream_IN *req =
01324
new RPCDefs<Borealis::QueryProcessor>::create_stream_IN(stream);
01325 ptr<rpc_request> preq(req);
01326 req->object_id = object_id();
01327 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID;
01328 req->method_name =
"create_stream";
01329
bool done =
false; RPC< void > ret;
01330 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01331 client().send_request(preq);
01332
while (!done && client().block()) ;
01333 req->cb = Callback<void, RPC< void > >();
01334
return ret;
01335 }
01336
typedef const RPCDefs<Borealis::QueryProcessor>::create_stream_OUT &create_stream_result;
01337
void create_cpview(
const Callback<
void, RPC< void > >& completion, CPViewDescription view_desc, StreamDef streamdef)
const {
01338 RPCDefs<Borealis::QueryProcessor>::create_cpview_IN *req =
01339
new RPCDefs<Borealis::QueryProcessor>::create_cpview_IN(view_desc, streamdef);
01340 req->object_id = object_id();
01341 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID;
01342 req->method_name =
"create_cpview";
01343 req->cb = completion;
01344 client().send_request(ptr<rpc_request>(req));
01345 }
01346 RPC< void > create_cpview(CPViewDescription view_desc, StreamDef streamdef)
const {
01347 RPCDefs<Borealis::QueryProcessor>::create_cpview_IN *req =
01348
new RPCDefs<Borealis::QueryProcessor>::create_cpview_IN(view_desc, streamdef);
01349 ptr<rpc_request> preq(req);
01350 req->object_id = object_id();
01351 req->method_id = RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID;
01352 req->method_name =
"create_cpview";
01353
bool done =
false; RPC< void > ret;
01354 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01355 client().send_request(preq);
01356
while (!done && client().block()) ;
01357 req->cb = Callback<void, RPC< void > >();
01358
return ret;
01359 }
01360
typedef const RPCDefs<Borealis::QueryProcessor>::create_cpview_OUT &create_cpview_result;
01361
void update_stream(
const Callback<
void, RPC< void > >& completion, StreamDef old_sd, StreamDef new_sd)
const {
01362 RPCDefs<Borealis::QueryProcessor>::update_stream_IN *req =
01363
new RPCDefs<Borealis::QueryProcessor>::update_stream_IN(old_sd, new_sd);
01364 req->object_id = object_id();
01365 req->method_id = RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID;
01366 req->method_name =
"update_stream";
01367 req->cb = completion;
01368 client().send_request(ptr<rpc_request>(req));
01369 }
01370 RPC< void > update_stream(StreamDef old_sd, StreamDef new_sd)
const {
01371 RPCDefs<Borealis::QueryProcessor>::update_stream_IN *req =
01372
new RPCDefs<Borealis::QueryProcessor>::update_stream_IN(old_sd, new_sd);
01373 ptr<rpc_request> preq(req);
01374 req->object_id = object_id();
01375 req->method_id = RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID;
01376 req->method_name =
"update_stream";
01377
bool done =
false; RPC< void > ret;
01378 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01379 client().send_request(preq);
01380
while (!done && client().block()) ;
01381 req->cb = Callback<void, RPC< void > >();
01382
return ret;
01383 }
01384
typedef const RPCDefs<Borealis::QueryProcessor>::update_stream_OUT &update_stream_result;
01385
void ack(
const Callback<
void, RPC< void > >& completion, MedusaID node, StreamID
id, string last_tuple)
const {
01386 RPCDefs<Borealis::QueryProcessor>::ack_IN *req =
01387
new RPCDefs<Borealis::QueryProcessor>::ack_IN(node,
id, last_tuple);
01388 req->object_id = object_id();
01389 req->method_id = RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID;
01390 req->method_name =
"ack";
01391 req->cb = completion;
01392 client().send_request(ptr<rpc_request>(req));
01393 }
01394 RPC< void > ack(MedusaID node, StreamID
id, string last_tuple)
const {
01395 RPCDefs<Borealis::QueryProcessor>::ack_IN *req =
01396
new RPCDefs<Borealis::QueryProcessor>::ack_IN(node,
id, last_tuple);
01397 ptr<rpc_request> preq(req);
01398 req->object_id = object_id();
01399 req->method_id = RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID;
01400 req->method_name =
"ack";
01401
bool done =
false; RPC< void > ret;
01402 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01403 client().send_request(preq);
01404
while (!done && client().block()) ;
01405 req->cb = Callback<void, RPC< void > >();
01406
return ret;
01407 }
01408
typedef const RPCDefs<Borealis::QueryProcessor>::ack_OUT &ack_result;
01409
void trim(
const Callback<
void, RPC< void > >& completion, MedusaID node, StreamID
id, string last_tuple)
const {
01410 RPCDefs<Borealis::QueryProcessor>::trim_IN *req =
01411
new RPCDefs<Borealis::QueryProcessor>::trim_IN(node,
id, last_tuple);
01412 req->object_id = object_id();
01413 req->method_id = RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID;
01414 req->method_name =
"trim";
01415 req->cb = completion;
01416 client().send_request(ptr<rpc_request>(req));
01417 }
01418 RPC< void > trim(MedusaID node, StreamID
id, string last_tuple)
const {
01419 RPCDefs<Borealis::QueryProcessor>::trim_IN *req =
01420
new RPCDefs<Borealis::QueryProcessor>::trim_IN(node,
id, last_tuple);
01421 ptr<rpc_request> preq(req);
01422 req->object_id = object_id();
01423 req->method_id = RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID;
01424 req->method_name =
"trim";
01425
bool done =
false; RPC< void > ret;
01426 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01427 client().send_request(preq);
01428
while (!done && client().block()) ;
01429 req->cb = Callback<void, RPC< void > >();
01430
return ret;
01431 }
01432
typedef const RPCDefs<Borealis::QueryProcessor>::trim_OUT &trim_result;
01433
void set_recovery_method(
const Callback<
void, RPC< void > >& completion,
int method)
const {
01434 RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN *req =
01435
new RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN(method);
01436 req->object_id = object_id();
01437 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID;
01438 req->method_name =
"set_recovery_method";
01439 req->cb = completion;
01440 client().send_request(ptr<rpc_request>(req));
01441 }
01442 RPC< void > set_recovery_method(
int method)
const {
01443 RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN *req =
01444
new RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN(method);
01445 ptr<rpc_request> preq(req);
01446 req->object_id = object_id();
01447 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID;
01448 req->method_name =
"set_recovery_method";
01449
bool done =
false; RPC< void > ret;
01450 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01451 client().send_request(preq);
01452
while (!done && client().block()) ;
01453 req->cb = Callback<void, RPC< void > >();
01454
return ret;
01455 }
01456
typedef const RPCDefs<Borealis::QueryProcessor>::set_recovery_method_OUT &set_recovery_method_result;
01457
void set_primary_status(
const Callback<
void, RPC< void > >& completion,
bool status)
const {
01458 RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN *req =
01459
new RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN(status);
01460 req->object_id = object_id();
01461 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID;
01462 req->method_name =
"set_primary_status";
01463 req->cb = completion;
01464 client().send_request(ptr<rpc_request>(req));
01465 }
01466 RPC< void > set_primary_status(
bool status)
const {
01467 RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN *req =
01468
new RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN(status);
01469 ptr<rpc_request> preq(req);
01470 req->object_id = object_id();
01471 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID;
01472 req->method_name =
"set_primary_status";
01473
bool done =
false; RPC< void > ret;
01474 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01475 client().send_request(preq);
01476
while (!done && client().block()) ;
01477 req->cb = Callback<void, RPC< void > >();
01478
return ret;
01479 }
01480
typedef const RPCDefs<Borealis::QueryProcessor>::set_primary_status_OUT &set_primary_status_result;
01481
void set_secondaries(
const Callback<
void, RPC< void > >& completion, vector<MedusaID> secondaries)
const {
01482 RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN *req =
01483
new RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN(secondaries);
01484 req->object_id = object_id();
01485 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID;
01486 req->method_name =
"set_secondaries";
01487 req->cb = completion;
01488 client().send_request(ptr<rpc_request>(req));
01489 }
01490 RPC< void > set_secondaries(vector<MedusaID> secondaries)
const {
01491 RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN *req =
01492
new RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN(secondaries);
01493 ptr<rpc_request> preq(req);
01494 req->object_id = object_id();
01495 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID;
01496 req->method_name =
"set_secondaries";
01497
bool done =
false; RPC< void > ret;
01498 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01499 client().send_request(preq);
01500
while (!done && client().block()) ;
01501 req->cb = Callback<void, RPC< void > >();
01502
return ret;
01503 }
01504
typedef const RPCDefs<Borealis::QueryProcessor>::set_secondaries_OUT &set_secondaries_result;
01505
void set_replicas(
const Callback<
void, RPC< void > >& completion, vector<MedusaID> replicas)
const {
01506 RPCDefs<Borealis::QueryProcessor>::set_replicas_IN *req =
01507
new RPCDefs<Borealis::QueryProcessor>::set_replicas_IN(replicas);
01508 req->object_id = object_id();
01509 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID;
01510 req->method_name =
"set_replicas";
01511 req->cb = completion;
01512 client().send_request(ptr<rpc_request>(req));
01513 }
01514 RPC< void > set_replicas(vector<MedusaID> replicas)
const {
01515 RPCDefs<Borealis::QueryProcessor>::set_replicas_IN *req =
01516
new RPCDefs<Borealis::QueryProcessor>::set_replicas_IN(replicas);
01517 ptr<rpc_request> preq(req);
01518 req->object_id = object_id();
01519 req->method_id = RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID;
01520 req->method_name =
"set_replicas";
01521
bool done =
false; RPC< void > ret;
01522 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01523 client().send_request(preq);
01524
while (!done && client().block()) ;
01525 req->cb = Callback<void, RPC< void > >();
01526
return ret;
01527 }
01528
typedef const RPCDefs<Borealis::QueryProcessor>::set_replicas_OUT &set_replicas_result;
01529
void checkpoint(
const Callback<
void, RPC< void > >& completion, vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim)
const {
01530 RPCDefs<Borealis::QueryProcessor>::checkpoint_IN *req =
01531
new RPCDefs<Borealis::QueryProcessor>::checkpoint_IN(tuples_to_checkpoint, tuples_to_trim);
01532 req->object_id = object_id();
01533 req->method_id = RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID;
01534 req->method_name =
"checkpoint";
01535 req->cb = completion;
01536 client().send_request(ptr<rpc_request>(req));
01537 }
01538 RPC< void > checkpoint(vector<StreamEvent> tuples_to_checkpoint, map<StreamID,string> tuples_to_trim)
const {
01539 RPCDefs<Borealis::QueryProcessor>::checkpoint_IN *req =
01540
new RPCDefs<Borealis::QueryProcessor>::checkpoint_IN(tuples_to_checkpoint, tuples_to_trim);
01541 ptr<rpc_request> preq(req);
01542 req->object_id = object_id();
01543 req->method_id = RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID;
01544 req->method_name =
"checkpoint";
01545
bool done =
false; RPC< void > ret;
01546 req->cb = wrap(&rpc_sync_completion< void >, &done, &ret);
01547 client().send_request(preq);
01548
while (!done && client().block()) ;
01549 req->cb = Callback<void, RPC< void > >();
01550
return ret;
01551 }
01552
typedef const RPCDefs<Borealis::QueryProcessor>::checkpoint_OUT &checkpoint_result;
01553 };
01554 };
01555
template<>
struct RPCDefs<Borealis::
QueryProcessor> : __nmstl_tmp_rpc_defs::RPCDefs<Borealis::QueryProcessor> {};
01556
template<>
struct Remote<Borealis::
QueryProcessor> : __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor> {
01557 Remote<Borealis::QueryProcessor>() {}
01558 Remote<Borealis::QueryProcessor>(RPCClient& cli, rpc_object_id object_id) :
01559 __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor>(cli, object_id) {}
01560
01561 Remote<Borealis::QueryProcessor>(
const __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor>& x) :
01562 __nmstl_tmp_rpc_defs::Remote<Borealis::QueryProcessor>(x) {}
01563 };
01564
01565
01566
#endif // !defined(DOXYGEN_SKIP)
01567
01568 NMSTL_NAMESPACE_END;
01569
01570
#endif // !defined(___QUERYPROCESSOR_RPC_QUERYPROCESSOR_H)
01571
01572
01573
01574
01575
01576
#ifdef NMSTL_RPC_DEFINE
01577
01578
#include <string>
01579
#include <map>
01580
#include <NMSTL/rpc>
01581
01582
void Borealis::QueryProcessor::handle_request(
const NMSTL::RPCObject::response_cb& cb,
const NMSTL::ptr<NMSTL::rpc_request>& req) {
01583
switch(req->method_id) {
01584
case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID:
01585 {
01586
const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN* inp =
01587 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN*>(req.get());
01588
01589
if (!inp) {
01590 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01591
" to Borealis::QueryProcessor::typecheck; expected " +
01592
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN);
01593 WARN << err;
01594 cb(err, 0,
true);
01595
return;
01596 }
01597
01598
const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN& in = *inp;
01599 RPC< Query > result = typecheck(in.query);
01600
if (result.valid()) {
01601 NMSTL::rpc_message_1< Query > ret(*result);
01602 cb(
true, &ret,
true);
01603 }
else {
01604 cb(result.stat(), 0,
true);
01605 }
01606
break;
01607 }
01608
case NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID:
01609 {
01610
const NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN* inp =
01611 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN*>(req.get());
01612
01613
if (!inp) {
01614 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01615
" to Borealis::QueryProcessor::setup_query; expected " +
01616
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN);
01617 WARN << err;
01618 cb(err, 0,
true);
01619
return;
01620 }
01621
01622
const NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN& in = *inp;
01623 NMSTL::AsyncRPC< void > pending = setup_query(in.query);
01624 pending.set_response_handler(cb);
01625
break;
01626 }
01627
case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID:
01628 {
01629
const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN* inp =
01630 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN*>(req.get());
01631
01632
if (!inp) {
01633 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01634
" to Borealis::QueryProcessor::typecheck_and_setup; expected " +
01635
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN);
01636 WARN << err;
01637 cb(err, 0,
true);
01638
return;
01639 }
01640
01641
const NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN& in = *inp;
01642 RPC< Query > result = typecheck_and_setup(in.query);
01643
if (result.valid()) {
01644 NMSTL::rpc_message_1< Query > ret(*result);
01645 cb(
true, &ret,
true);
01646 }
else {
01647 cb(result.stat(), 0,
true);
01648 }
01649
break;
01650 }
01651
case NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID:
01652 {
01653
const NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN* inp =
01654 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN*>(req.get());
01655
01656
if (!inp) {
01657 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01658
" to Borealis::QueryProcessor::load_box; expected " +
01659
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN);
01660 WARN << err;
01661 cb(err, 0,
true);
01662
return;
01663 }
01664
01665
const NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN& in = *inp;
01666 NMSTL::AsyncRPC< void > pending = load_box(in.file_path);
01667 pending.set_response_handler(cb);
01668
break;
01669 }
01670
case NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID:
01671 {
01672
const NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN* inp =
01673 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN*>(req.get());
01674
01675
if (!inp) {
01676 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01677
" to Borealis::QueryProcessor::choke_queries; expected " +
01678
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN);
01679 WARN << err;
01680 cb(err, 0,
true);
01681
return;
01682 }
01683
01684
const NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN& in = *inp;
01685 RPC< vector<Query> > result = choke_queries(in.queries);
01686
if (result.valid()) {
01687 NMSTL::rpc_message_1< vector<Query> > ret(*result);
01688 cb(
true, &ret,
true);
01689 }
else {
01690 cb(result.stat(), 0,
true);
01691 }
01692
break;
01693 }
01694
case NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID:
01695 {
01696
const NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN* inp =
01697 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN*>(req.get());
01698
01699
if (!inp) {
01700 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01701
" to Borealis::QueryProcessor::resume_queries; expected " +
01702
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN);
01703 WARN << err;
01704 cb(err, 0,
true);
01705
return;
01706 }
01707
01708
const NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN& in = *inp;
01709 RPC< void > result = resume_queries(in.queries);
01710
if (result.valid()) {
01711 NMSTL::rpc_message_1< void > ret;
01712 cb(
true, &ret,
true);
01713 }
else {
01714 cb(result.stat(), 0,
true);
01715 }
01716
break;
01717 }
01718
case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID:
01719 {
01720
const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN* inp =
01721 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN*>(req.get());
01722
01723
if (!inp) {
01724 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01725
" to Borealis::QueryProcessor::pack_query; expected " +
01726
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN);
01727 WARN << err;
01728 cb(err, 0,
true);
01729
return;
01730 }
01731
01732
const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN& in = *inp;
01733 RPC< Query > result = pack_query(in.query);
01734
if (result.valid()) {
01735 NMSTL::rpc_message_1< Query > ret(*result);
01736 cb(
true, &ret,
true);
01737 }
else {
01738 cb(result.stat(), 0,
true);
01739 }
01740
break;
01741 }
01742
case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID:
01743 {
01744
const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN* inp =
01745 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN*>(req.get());
01746
01747
if (!inp) {
01748 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01749
" to Borealis::QueryProcessor::pack_queries; expected " +
01750
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN);
01751 WARN << err;
01752 cb(err, 0,
true);
01753
return;
01754 }
01755
01756
const NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN& in = *inp;
01757 RPC< vector<Query> > result = pack_queries(in.queries);
01758
if (result.valid()) {
01759 NMSTL::rpc_message_1< vector<Query> > ret(*result);
01760 cb(
true, &ret,
true);
01761 }
else {
01762 cb(result.stat(), 0,
true);
01763 }
01764
break;
01765 }
01766
case NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID:
01767 {
01768
const NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN* inp =
01769 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN*>(req.get());
01770
01771
if (!inp) {
01772 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01773
" to Borealis::QueryProcessor::remove_query; expected " +
01774
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN);
01775 WARN << err;
01776 cb(err, 0,
true);
01777
return;
01778 }
01779
01780
const NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN& in = *inp;
01781 RPC< void > result = remove_query(in.query);
01782
if (result.valid()) {
01783 NMSTL::rpc_message_1< void > ret;
01784 cb(
true, &ret,
true);
01785 }
else {
01786 cb(result.stat(), 0,
true);
01787 }
01788
break;
01789 }
01790
case NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID:
01791 {
01792
const NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN* inp =
01793 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN*>(req.get());
01794
01795
if (!inp) {
01796 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01797
" to Borealis::QueryProcessor::replace_query; expected " +
01798
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN);
01799 WARN << err;
01800 cb(err, 0,
true);
01801
return;
01802 }
01803
01804
const NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN& in = *inp;
01805 RPC< void > result = replace_query(in.old_queries, in.new_queries);
01806
if (result.valid()) {
01807 NMSTL::rpc_message_1< void > ret;
01808 cb(
true, &ret,
true);
01809 }
else {
01810 cb(result.stat(), 0,
true);
01811 }
01812
break;
01813 }
01814
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID:
01815 {
01816
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN* inp =
01817 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN*>(req.get());
01818
01819
if (!inp) {
01820 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01821
" to Borealis::QueryProcessor::set_query_status; expected " +
01822
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN);
01823 WARN << err;
01824 cb(err, 0,
true);
01825
return;
01826 }
01827
01828
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN& in = *inp;
01829 RPC< void > result = set_query_status(in.name, in.status);
01830
if (result.valid()) {
01831 NMSTL::rpc_message_1< void > ret;
01832 cb(
true, &ret,
true);
01833 }
else {
01834 cb(result.stat(), 0,
true);
01835 }
01836
break;
01837 }
01838
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID:
01839 {
01840
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN* inp =
01841 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN*>(req.get());
01842
01843
if (!inp) {
01844 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01845
" to Borealis::QueryProcessor::set_queries_status; expected " +
01846
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN);
01847 WARN << err;
01848 cb(err, 0,
true);
01849
return;
01850 }
01851
01852
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN& in = *inp;
01853 RPC< void > result = set_queries_status(in.name, in.status);
01854
if (result.valid()) {
01855 NMSTL::rpc_message_1< void > ret;
01856 cb(
true, &ret,
true);
01857 }
else {
01858 cb(result.stat(), 0,
true);
01859 }
01860
break;
01861 }
01862
case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID:
01863 {
01864
const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN* inp =
01865 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN*>(req.get());
01866
01867
if (!inp) {
01868 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01869
" to Borealis::QueryProcessor::subscribe; expected " +
01870
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN);
01871 WARN << err;
01872 cb(err, 0,
true);
01873
return;
01874 }
01875
01876
const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN& in = *inp;
01877 RPC< void > result = subscribe(in.sub, in.add_or_remove);
01878
if (result.valid()) {
01879 NMSTL::rpc_message_1< void > ret;
01880 cb(
true, &ret,
true);
01881 }
else {
01882 cb(result.stat(), 0,
true);
01883 }
01884
break;
01885 }
01886
case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID:
01887 {
01888
const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN* inp =
01889 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN*>(req.get());
01890
01891
if (!inp) {
01892 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01893
" to Borealis::QueryProcessor::subscribe_many; expected " +
01894
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN);
01895 WARN << err;
01896 cb(err, 0,
true);
01897
return;
01898 }
01899
01900
const NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN& in = *inp;
01901 RPC< vector<Subscription> > result = subscribe_many(in.sub, in.add_or_remove);
01902
if (result.valid()) {
01903 NMSTL::rpc_message_1< vector<Subscription> > ret(*result);
01904 cb(
true, &ret,
true);
01905 }
else {
01906 cb(result.stat(), 0,
true);
01907 }
01908
break;
01909 }
01910
case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID:
01911 {
01912
const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN* inp =
01913 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN*>(req.get());
01914
01915
if (!inp) {
01916 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01917
" to Borealis::QueryProcessor::get_subscriptions; expected " +
01918
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN);
01919 WARN << err;
01920 cb(err, 0,
true);
01921
return;
01922 }
01923
01924
const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN& in = *inp;
01925 RPC< vector<Subscription> > result = get_subscriptions(in.streams);
01926
if (result.valid()) {
01927 NMSTL::rpc_message_1< vector<Subscription> > ret(*result);
01928 cb(
true, &ret,
true);
01929 }
else {
01930 cb(result.stat(), 0,
true);
01931 }
01932
break;
01933 }
01934
case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID:
01935 {
01936
const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN* inp =
01937 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN*>(req.get());
01938
01939
if (!inp) {
01940 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01941
" to Borealis::QueryProcessor::get_stats; expected " +
01942
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN);
01943 WARN << err;
01944 cb(err, 0,
true);
01945
return;
01946 }
01947
01948 RPC< vector<Stats> > result = get_stats();
01949
if (result.valid()) {
01950 NMSTL::rpc_message_1< vector<Stats> > ret(*result);
01951 cb(
true, &ret,
true);
01952 }
else {
01953 cb(result.stat(), 0,
true);
01954 }
01955
break;
01956 }
01957
case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID:
01958 {
01959
const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN* inp =
01960 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN*>(req.get());
01961
01962
if (!inp) {
01963 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01964
" to Borealis::QueryProcessor::get_sel; expected " +
01965
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN);
01966 WARN << err;
01967 cb(err, 0,
true);
01968
return;
01969 }
01970
01971 RPC< double > result = get_sel();
01972
if (result.valid()) {
01973 NMSTL::rpc_message_1< double > ret(*result);
01974 cb(
true, &ret,
true);
01975 }
else {
01976 cb(result.stat(), 0,
true);
01977 }
01978
break;
01979 }
01980
case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID:
01981 {
01982
const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN* inp =
01983 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN*>(req.get());
01984
01985
if (!inp) {
01986 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
01987
" to Borealis::QueryProcessor::create_stream; expected " +
01988
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN);
01989 WARN << err;
01990 cb(err, 0,
true);
01991
return;
01992 }
01993
01994
const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN& in = *inp;
01995 RPC< void > result = create_stream(in.stream);
01996
if (result.valid()) {
01997 NMSTL::rpc_message_1< void > ret;
01998 cb(
true, &ret,
true);
01999 }
else {
02000 cb(result.stat(), 0,
true);
02001 }
02002
break;
02003 }
02004
case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID:
02005 {
02006
const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN* inp =
02007 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN*>(req.get());
02008
02009
if (!inp) {
02010 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02011
" to Borealis::QueryProcessor::create_cpview; expected " +
02012
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN);
02013 WARN << err;
02014 cb(err, 0,
true);
02015
return;
02016 }
02017
02018
const NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN& in = *inp;
02019 RPC< void > result = create_cpview(in.view_desc, in.streamdef);
02020
if (result.valid()) {
02021 NMSTL::rpc_message_1< void > ret;
02022 cb(
true, &ret,
true);
02023 }
else {
02024 cb(result.stat(), 0,
true);
02025 }
02026
break;
02027 }
02028
case NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID:
02029 {
02030
const NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN* inp =
02031 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN*>(req.get());
02032
02033
if (!inp) {
02034 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02035
" to Borealis::QueryProcessor::update_stream; expected " +
02036
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN);
02037 WARN << err;
02038 cb(err, 0,
true);
02039
return;
02040 }
02041
02042
const NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN& in = *inp;
02043 RPC< void > result = update_stream(in.old_sd, in.new_sd);
02044
if (result.valid()) {
02045 NMSTL::rpc_message_1< void > ret;
02046 cb(
true, &ret,
true);
02047 }
else {
02048 cb(result.stat(), 0,
true);
02049 }
02050
break;
02051 }
02052
case NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID:
02053 {
02054
const NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN* inp =
02055 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN*>(req.get());
02056
02057
if (!inp) {
02058 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02059
" to Borealis::QueryProcessor::ack; expected " +
02060
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN);
02061 WARN << err;
02062 cb(err, 0,
true);
02063
return;
02064 }
02065
02066
const NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN& in = *inp;
02067 RPC< void > result = ack(in.node, in.id, in.last_tuple);
02068
if (result.valid()) {
02069 NMSTL::rpc_message_1< void > ret;
02070 cb(
true, &ret,
true);
02071 }
else {
02072 cb(result.stat(), 0,
true);
02073 }
02074
break;
02075 }
02076
case NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID:
02077 {
02078
const NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN* inp =
02079 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN*>(req.get());
02080
02081
if (!inp) {
02082 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02083
" to Borealis::QueryProcessor::trim; expected " +
02084
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN);
02085 WARN << err;
02086 cb(err, 0,
true);
02087
return;
02088 }
02089
02090
const NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN& in = *inp;
02091 RPC< void > result = trim(in.node, in.id, in.last_tuple);
02092
if (result.valid()) {
02093 NMSTL::rpc_message_1< void > ret;
02094 cb(
true, &ret,
true);
02095 }
else {
02096 cb(result.stat(), 0,
true);
02097 }
02098
break;
02099 }
02100
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID:
02101 {
02102
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN* inp =
02103 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN*>(req.get());
02104
02105
if (!inp) {
02106 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02107
" to Borealis::QueryProcessor::set_recovery_method; expected " +
02108
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN);
02109 WARN << err;
02110 cb(err, 0,
true);
02111
return;
02112 }
02113
02114
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN& in = *inp;
02115 RPC< void > result = set_recovery_method(in.method);
02116
if (result.valid()) {
02117 NMSTL::rpc_message_1< void > ret;
02118 cb(
true, &ret,
true);
02119 }
else {
02120 cb(result.stat(), 0,
true);
02121 }
02122
break;
02123 }
02124
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID:
02125 {
02126
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN* inp =
02127 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN*>(req.get());
02128
02129
if (!inp) {
02130 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02131
" to Borealis::QueryProcessor::set_primary_status; expected " +
02132
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN);
02133 WARN << err;
02134 cb(err, 0,
true);
02135
return;
02136 }
02137
02138
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN& in = *inp;
02139 RPC< void > result = set_primary_status(in.status);
02140
if (result.valid()) {
02141 NMSTL::rpc_message_1< void > ret;
02142 cb(
true, &ret,
true);
02143 }
else {
02144 cb(result.stat(), 0,
true);
02145 }
02146
break;
02147 }
02148
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID:
02149 {
02150
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN* inp =
02151 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN*>(req.get());
02152
02153
if (!inp) {
02154 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02155
" to Borealis::QueryProcessor::set_secondaries; expected " +
02156
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN);
02157 WARN << err;
02158 cb(err, 0,
true);
02159
return;
02160 }
02161
02162
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN& in = *inp;
02163 RPC< void > result = set_secondaries(in.secondaries);
02164
if (result.valid()) {
02165 NMSTL::rpc_message_1< void > ret;
02166 cb(
true, &ret,
true);
02167 }
else {
02168 cb(result.stat(), 0,
true);
02169 }
02170
break;
02171 }
02172
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID:
02173 {
02174
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN* inp =
02175 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN*>(req.get());
02176
02177
if (!inp) {
02178 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02179
" to Borealis::QueryProcessor::set_replicas; expected " +
02180
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN);
02181 WARN << err;
02182 cb(err, 0,
true);
02183
return;
02184 }
02185
02186
const NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN& in = *inp;
02187 RPC< void > result = set_replicas(in.replicas);
02188
if (result.valid()) {
02189 NMSTL::rpc_message_1< void > ret;
02190 cb(
true, &ret,
true);
02191 }
else {
02192 cb(result.stat(), 0,
true);
02193 }
02194
break;
02195 }
02196
case NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID:
02197 {
02198
const NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN* inp =
02199 dynamic_cast<const NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN*>(req.get());
02200
02201
if (!inp) {
02202 string err = string(
"Invalid RPC request type ") +
typeid(*(req.get())) +
02203
" to Borealis::QueryProcessor::checkpoint; expected " +
02204
typeid(NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN);
02205 WARN << err;
02206 cb(err, 0,
true);
02207
return;
02208 }
02209
02210
const NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN& in = *inp;
02211 RPC< void > result = checkpoint(in.tuples_to_checkpoint, in.tuples_to_trim);
02212
if (result.valid()) {
02213 NMSTL::rpc_message_1< void > ret;
02214 cb(
true, &ret,
true);
02215 }
else {
02216 cb(result.stat(), 0,
true);
02217 }
02218
break;
02219 }
02220
default:
02221 string err = string(
"Unknown method_id ") + req->method_id +
" in Borealis::QueryProcessor::handle_request";
02222 WARN << err;
02223 cb(err, 0,
true);
02224 }
02225 }
02226
void Borealis::QueryProcessor::handle_request(
const response_cb& cb, rpc_id method_id, ISerial& is) {
02227
switch(method_id) {
02228
case RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID:
02229 {
02230 RPCDefs<Borealis::QueryProcessor>::typecheck_IN in;
02231 is >> in;
02232
if (!is) {
02233 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::typecheck: " << is.stat();
02234 cb(is.stat(), 0,
true);
02235
return;
02236 }
02237 DEBUG <<
"Handling typecheck";
02238 RPC< Query > result = typecheck(in.query);
02239
if (result.valid()) {
02240 NMSTL::rpc_message_1< Query > ret(*result);
02241 cb(
true, &ret,
true);
02242 }
else {
02243 cb(result.stat(), 0,
true);
02244 }
02245
break;
02246 }
02247
case RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID:
02248 {
02249 RPCDefs<Borealis::QueryProcessor>::setup_query_IN in;
02250 is >> in;
02251
if (!is) {
02252 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::setup_query: " << is.stat();
02253 cb(is.stat(), 0,
true);
02254
return;
02255 }
02256 DEBUG <<
"Handling setup_query";
02257 AsyncRPC< void > pending = setup_query(in.query);
02258 pending.set_response_handler(cb);
02259
break;
02260 }
02261
case RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID:
02262 {
02263 RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN in;
02264 is >> in;
02265
if (!is) {
02266 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::typecheck_and_setup: " << is.stat();
02267 cb(is.stat(), 0,
true);
02268
return;
02269 }
02270 DEBUG <<
"Handling typecheck_and_setup";
02271 RPC< Query > result = typecheck_and_setup(in.query);
02272
if (result.valid()) {
02273 NMSTL::rpc_message_1< Query > ret(*result);
02274 cb(
true, &ret,
true);
02275 }
else {
02276 cb(result.stat(), 0,
true);
02277 }
02278
break;
02279 }
02280
case RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID:
02281 {
02282 RPCDefs<Borealis::QueryProcessor>::load_box_IN in;
02283 is >> in;
02284
if (!is) {
02285 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::load_box: " << is.stat();
02286 cb(is.stat(), 0,
true);
02287
return;
02288 }
02289 DEBUG <<
"Handling load_box";
02290 AsyncRPC< void > pending = load_box(in.file_path);
02291 pending.set_response_handler(cb);
02292
break;
02293 }
02294
case RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID:
02295 {
02296 RPCDefs<Borealis::QueryProcessor>::choke_queries_IN in;
02297 is >> in;
02298
if (!is) {
02299 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::choke_queries: " << is.stat();
02300 cb(is.stat(), 0,
true);
02301
return;
02302 }
02303 DEBUG <<
"Handling choke_queries";
02304 RPC< vector<Query> > result = choke_queries(in.queries);
02305
if (result.valid()) {
02306 NMSTL::rpc_message_1< vector<Query> > ret(*result);
02307 cb(
true, &ret,
true);
02308 }
else {
02309 cb(result.stat(), 0,
true);
02310 }
02311
break;
02312 }
02313
case RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID:
02314 {
02315 RPCDefs<Borealis::QueryProcessor>::resume_queries_IN in;
02316 is >> in;
02317
if (!is) {
02318 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::resume_queries: " << is.stat();
02319 cb(is.stat(), 0,
true);
02320
return;
02321 }
02322 DEBUG <<
"Handling resume_queries";
02323 RPC< void > result = resume_queries(in.queries);
02324
if (result.valid()) {
02325 NMSTL::rpc_message_1< void > ret;
02326 cb(
true, &ret,
true);
02327 }
else {
02328 cb(result.stat(), 0,
true);
02329 }
02330
break;
02331 }
02332
case RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID:
02333 {
02334 RPCDefs<Borealis::QueryProcessor>::pack_query_IN in;
02335 is >> in;
02336
if (!is) {
02337 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::pack_query: " << is.stat();
02338 cb(is.stat(), 0,
true);
02339
return;
02340 }
02341 DEBUG <<
"Handling pack_query";
02342 RPC< Query > result = pack_query(in.query);
02343
if (result.valid()) {
02344 NMSTL::rpc_message_1< Query > ret(*result);
02345 cb(
true, &ret,
true);
02346 }
else {
02347 cb(result.stat(), 0,
true);
02348 }
02349
break;
02350 }
02351
case RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID:
02352 {
02353 RPCDefs<Borealis::QueryProcessor>::pack_queries_IN in;
02354 is >> in;
02355
if (!is) {
02356 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::pack_queries: " << is.stat();
02357 cb(is.stat(), 0,
true);
02358
return;
02359 }
02360 DEBUG <<
"Handling pack_queries";
02361 RPC< vector<Query> > result = pack_queries(in.queries);
02362
if (result.valid()) {
02363 NMSTL::rpc_message_1< vector<Query> > ret(*result);
02364 cb(
true, &ret,
true);
02365 }
else {
02366 cb(result.stat(), 0,
true);
02367 }
02368
break;
02369 }
02370
case RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID:
02371 {
02372 RPCDefs<Borealis::QueryProcessor>::remove_query_IN in;
02373 is >> in;
02374
if (!is) {
02375 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::remove_query: " << is.stat();
02376 cb(is.stat(), 0,
true);
02377
return;
02378 }
02379 DEBUG <<
"Handling remove_query";
02380 RPC< void > result = remove_query(in.query);
02381
if (result.valid()) {
02382 NMSTL::rpc_message_1< void > ret;
02383 cb(
true, &ret,
true);
02384 }
else {
02385 cb(result.stat(), 0,
true);
02386 }
02387
break;
02388 }
02389
case RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID:
02390 {
02391 RPCDefs<Borealis::QueryProcessor>::replace_query_IN in;
02392 is >> in;
02393
if (!is) {
02394 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::replace_query: " << is.stat();
02395 cb(is.stat(), 0,
true);
02396
return;
02397 }
02398 DEBUG <<
"Handling replace_query";
02399 RPC< void > result = replace_query(in.old_queries, in.new_queries);
02400
if (result.valid()) {
02401 NMSTL::rpc_message_1< void > ret;
02402 cb(
true, &ret,
true);
02403 }
else {
02404 cb(result.stat(), 0,
true);
02405 }
02406
break;
02407 }
02408
case RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID:
02409 {
02410 RPCDefs<Borealis::QueryProcessor>::set_query_status_IN in;
02411 is >> in;
02412
if (!is) {
02413 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::set_query_status: " << is.stat();
02414 cb(is.stat(), 0,
true);
02415
return;
02416 }
02417 DEBUG <<
"Handling set_query_status";
02418 RPC< void > result = set_query_status(in.name, in.status);
02419
if (result.valid()) {
02420 NMSTL::rpc_message_1< void > ret;
02421 cb(
true, &ret,
true);
02422 }
else {
02423 cb(result.stat(), 0,
true);
02424 }
02425
break;
02426 }
02427
case RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID:
02428 {
02429 RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN in;
02430 is >> in;
02431
if (!is) {
02432 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::set_queries_status: " << is.stat();
02433 cb(is.stat(), 0,
true);
02434
return;
02435 }
02436 DEBUG <<
"Handling set_queries_status";
02437 RPC< void > result = set_queries_status(in.name, in.status);
02438
if (result.valid()) {
02439 NMSTL::rpc_message_1< void > ret;
02440 cb(
true, &ret,
true);
02441 }
else {
02442 cb(result.stat(), 0,
true);
02443 }
02444
break;
02445 }
02446
case RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID:
02447 {
02448 RPCDefs<Borealis::QueryProcessor>::subscribe_IN in;
02449 is >> in;
02450
if (!is) {
02451 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::subscribe: " << is.stat();
02452 cb(is.stat(), 0,
true);
02453
return;
02454 }
02455 DEBUG <<
"Handling subscribe";
02456 RPC< void > result = subscribe(in.sub, in.add_or_remove);
02457
if (result.valid()) {
02458 NMSTL::rpc_message_1< void > ret;
02459 cb(
true, &ret,
true);
02460 }
else {
02461 cb(result.stat(), 0,
true);
02462 }
02463
break;
02464 }
02465
case RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID:
02466 {
02467 RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN in;
02468 is >> in;
02469
if (!is) {
02470 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::subscribe_many: " << is.stat();
02471 cb(is.stat(), 0,
true);
02472
return;
02473 }
02474 DEBUG <<
"Handling subscribe_many";
02475 RPC< vector<Subscription> > result = subscribe_many(in.sub, in.add_or_remove);
02476
if (result.valid()) {
02477 NMSTL::rpc_message_1< vector<Subscription> > ret(*result);
02478 cb(
true, &ret,
true);
02479 }
else {
02480 cb(result.stat(), 0,
true);
02481 }
02482
break;
02483 }
02484
case RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID:
02485 {
02486 RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN in;
02487 is >> in;
02488
if (!is) {
02489 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::get_subscriptions: " << is.stat();
02490 cb(is.stat(), 0,
true);
02491
return;
02492 }
02493 DEBUG <<
"Handling get_subscriptions";
02494 RPC< vector<Subscription> > result = get_subscriptions(in.streams);
02495
if (result.valid()) {
02496 NMSTL::rpc_message_1< vector<Subscription> > ret(*result);
02497 cb(
true, &ret,
true);
02498 }
else {
02499 cb(result.stat(), 0,
true);
02500 }
02501
break;
02502 }
02503
case RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID:
02504 {
02505 RPCDefs<Borealis::QueryProcessor>::get_stats_IN in;
02506 is >> in;
02507
if (!is) {
02508 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::get_stats: " << is.stat();
02509 cb(is.stat(), 0,
true);
02510
return;
02511 }
02512 DEBUG <<
"Handling get_stats";
02513 RPC< vector<Stats> > result = get_stats();
02514
if (result.valid()) {
02515 NMSTL::rpc_message_1< vector<Stats> > ret(*result);
02516 cb(
true, &ret,
true);
02517 }
else {
02518 cb(result.stat(), 0,
true);
02519 }
02520
break;
02521 }
02522
case RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID:
02523 {
02524 RPCDefs<Borealis::QueryProcessor>::get_sel_IN in;
02525 is >> in;
02526
if (!is) {
02527 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::get_sel: " << is.stat();
02528 cb(is.stat(), 0,
true);
02529
return;
02530 }
02531 DEBUG <<
"Handling get_sel";
02532 RPC< double > result = get_sel();
02533
if (result.valid()) {
02534 NMSTL::rpc_message_1< double > ret(*result);
02535 cb(
true, &ret,
true);
02536 }
else {
02537 cb(result.stat(), 0,
true);
02538 }
02539
break;
02540 }
02541
case RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID:
02542 {
02543 RPCDefs<Borealis::QueryProcessor>::create_stream_IN in;
02544 is >> in;
02545
if (!is) {
02546 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::create_stream: " << is.stat();
02547 cb(is.stat(), 0,
true);
02548
return;
02549 }
02550 DEBUG <<
"Handling create_stream";
02551 RPC< void > result = create_stream(in.stream);
02552
if (result.valid()) {
02553 NMSTL::rpc_message_1< void > ret;
02554 cb(
true, &ret,
true);
02555 }
else {
02556 cb(result.stat(), 0,
true);
02557 }
02558
break;
02559 }
02560
case RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID:
02561 {
02562 RPCDefs<Borealis::QueryProcessor>::create_cpview_IN in;
02563 is >> in;
02564
if (!is) {
02565 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::create_cpview: " << is.stat();
02566 cb(is.stat(), 0,
true);
02567
return;
02568 }
02569 DEBUG <<
"Handling create_cpview";
02570 RPC< void > result = create_cpview(in.view_desc, in.streamdef);
02571
if (result.valid()) {
02572 NMSTL::rpc_message_1< void > ret;
02573 cb(
true, &ret,
true);
02574 }
else {
02575 cb(result.stat(), 0,
true);
02576 }
02577
break;
02578 }
02579
case RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID:
02580 {
02581 RPCDefs<Borealis::QueryProcessor>::update_stream_IN in;
02582 is >> in;
02583
if (!is) {
02584 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::update_stream: " << is.stat();
02585 cb(is.stat(), 0,
true);
02586
return;
02587 }
02588 DEBUG <<
"Handling update_stream";
02589 RPC< void > result = update_stream(in.old_sd, in.new_sd);
02590
if (result.valid()) {
02591 NMSTL::rpc_message_1< void > ret;
02592 cb(
true, &ret,
true);
02593 }
else {
02594 cb(result.stat(), 0,
true);
02595 }
02596
break;
02597 }
02598
case RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID:
02599 {
02600 RPCDefs<Borealis::QueryProcessor>::ack_IN in;
02601 is >> in;
02602
if (!is) {
02603 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::ack: " << is.stat();
02604 cb(is.stat(), 0,
true);
02605
return;
02606 }
02607 DEBUG <<
"Handling ack";
02608 RPC< void > result = ack(in.node, in.id, in.last_tuple);
02609
if (result.valid()) {
02610 NMSTL::rpc_message_1< void > ret;
02611 cb(
true, &ret,
true);
02612 }
else {
02613 cb(result.stat(), 0,
true);
02614 }
02615
break;
02616 }
02617
case RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID:
02618 {
02619 RPCDefs<Borealis::QueryProcessor>::trim_IN in;
02620 is >> in;
02621
if (!is) {
02622 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::trim: " << is.stat();
02623 cb(is.stat(), 0,
true);
02624
return;
02625 }
02626 DEBUG <<
"Handling trim";
02627 RPC< void > result = trim(in.node, in.id, in.last_tuple);
02628
if (result.valid()) {
02629 NMSTL::rpc_message_1< void > ret;
02630 cb(
true, &ret,
true);
02631 }
else {
02632 cb(result.stat(), 0,
true);
02633 }
02634
break;
02635 }
02636
case RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID:
02637 {
02638 RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN in;
02639 is >> in;
02640
if (!is) {
02641 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::set_recovery_method: " << is.stat();
02642 cb(is.stat(), 0,
true);
02643
return;
02644 }
02645 DEBUG <<
"Handling set_recovery_method";
02646 RPC< void > result = set_recovery_method(in.method);
02647
if (result.valid()) {
02648 NMSTL::rpc_message_1< void > ret;
02649 cb(
true, &ret,
true);
02650 }
else {
02651 cb(result.stat(), 0,
true);
02652 }
02653
break;
02654 }
02655
case RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID:
02656 {
02657 RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN in;
02658 is >> in;
02659
if (!is) {
02660 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::set_primary_status: " << is.stat();
02661 cb(is.stat(), 0,
true);
02662
return;
02663 }
02664 DEBUG <<
"Handling set_primary_status";
02665 RPC< void > result = set_primary_status(in.status);
02666
if (result.valid()) {
02667 NMSTL::rpc_message_1< void > ret;
02668 cb(
true, &ret,
true);
02669 }
else {
02670 cb(result.stat(), 0,
true);
02671 }
02672
break;
02673 }
02674
case RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID:
02675 {
02676 RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN in;
02677 is >> in;
02678
if (!is) {
02679 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::set_secondaries: " << is.stat();
02680 cb(is.stat(), 0,
true);
02681
return;
02682 }
02683 DEBUG <<
"Handling set_secondaries";
02684 RPC< void > result = set_secondaries(in.secondaries);
02685
if (result.valid()) {
02686 NMSTL::rpc_message_1< void > ret;
02687 cb(
true, &ret,
true);
02688 }
else {
02689 cb(result.stat(), 0,
true);
02690 }
02691
break;
02692 }
02693
case RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID:
02694 {
02695 RPCDefs<Borealis::QueryProcessor>::set_replicas_IN in;
02696 is >> in;
02697
if (!is) {
02698 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::set_replicas: " << is.stat();
02699 cb(is.stat(), 0,
true);
02700
return;
02701 }
02702 DEBUG <<
"Handling set_replicas";
02703 RPC< void > result = set_replicas(in.replicas);
02704
if (result.valid()) {
02705 NMSTL::rpc_message_1< void > ret;
02706 cb(
true, &ret,
true);
02707 }
else {
02708 cb(result.stat(), 0,
true);
02709 }
02710
break;
02711 }
02712
case RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID:
02713 {
02714 RPCDefs<Borealis::QueryProcessor>::checkpoint_IN in;
02715 is >> in;
02716
if (!is) {
02717 WARN <<
"Invalid RPC request to Borealis::QueryProcessor::checkpoint: " << is.stat();
02718 cb(is.stat(), 0,
true);
02719
return;
02720 }
02721 DEBUG <<
"Handling checkpoint";
02722 RPC< void > result = checkpoint(in.tuples_to_checkpoint, in.tuples_to_trim);
02723
if (result.valid()) {
02724 NMSTL::rpc_message_1< void > ret;
02725 cb(
true, &ret,
true);
02726 }
else {
02727 cb(result.stat(), 0,
true);
02728 }
02729
break;
02730 }
02731
default:
02732 string err = string(
"Unknown method_id ") + method_id +
" in Borealis::QueryProcessor::handle_request";
02733 WARN << err;
02734 cb(err, 0,
true);
02735 }
02736 }
02737 NMSTL::rpc_id Borealis::QueryProcessor::method_id_for_name(string method_name) {
02738 map<string, int>::const_iterator i = RPCDefs<Borealis::QueryProcessor>::method_id_by_name().find(method_name);
02739
return (i == RPCDefs<Borealis::QueryProcessor>::method_id_by_name().end()) ? rpc_id() : i->second;
02740 }
02741 string Borealis::QueryProcessor::method_name_for_id(NMSTL::rpc_id method_id) {
02742 map<int, string>::const_iterator i = RPCDefs<Borealis::QueryProcessor>::method_name_by_id().find(method_id);
02743
return (i == RPCDefs<Borealis::QueryProcessor>::method_name_by_id().end()) ? string() : i->second;
02744 }
02745 NMSTL::ptr<NMSTL::rpc_request> Borealis::QueryProcessor::make_request(NMSTL::rpc_id method_id, NMSTL::ISerial& is) {
02746
switch(method_id) {
02747
case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_METHOD_ID:
02748 {
02749 NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_IN;
02750 NMSTL::ptr<NMSTL::rpc_request> req(in);
02751
if (is >> *in)
return req;
02752 }
02753
case NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_METHOD_ID:
02754 {
02755 NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::setup_query_IN;
02756 NMSTL::ptr<NMSTL::rpc_request> req(in);
02757
if (is >> *in)
return req;
02758 }
02759
case NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_METHOD_ID:
02760 {
02761 NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::typecheck_and_setup_IN;
02762 NMSTL::ptr<NMSTL::rpc_request> req(in);
02763
if (is >> *in)
return req;
02764 }
02765
case NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_METHOD_ID:
02766 {
02767 NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::load_box_IN;
02768 NMSTL::ptr<NMSTL::rpc_request> req(in);
02769
if (is >> *in)
return req;
02770 }
02771
case NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_METHOD_ID:
02772 {
02773 NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::choke_queries_IN;
02774 NMSTL::ptr<NMSTL::rpc_request> req(in);
02775
if (is >> *in)
return req;
02776 }
02777
case NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_METHOD_ID:
02778 {
02779 NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::resume_queries_IN;
02780 NMSTL::ptr<NMSTL::rpc_request> req(in);
02781
if (is >> *in)
return req;
02782 }
02783
case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_METHOD_ID:
02784 {
02785 NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_query_IN;
02786 NMSTL::ptr<NMSTL::rpc_request> req(in);
02787
if (is >> *in)
return req;
02788 }
02789
case NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_METHOD_ID:
02790 {
02791 NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::pack_queries_IN;
02792 NMSTL::ptr<NMSTL::rpc_request> req(in);
02793
if (is >> *in)
return req;
02794 }
02795
case NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_METHOD_ID:
02796 {
02797 NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::remove_query_IN;
02798 NMSTL::ptr<NMSTL::rpc_request> req(in);
02799
if (is >> *in)
return req;
02800 }
02801
case NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_METHOD_ID:
02802 {
02803 NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::replace_query_IN;
02804 NMSTL::ptr<NMSTL::rpc_request> req(in);
02805
if (is >> *in)
return req;
02806 }
02807
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_METHOD_ID:
02808 {
02809 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_query_status_IN;
02810 NMSTL::ptr<NMSTL::rpc_request> req(in);
02811
if (is >> *in)
return req;
02812 }
02813
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_METHOD_ID:
02814 {
02815 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_queries_status_IN;
02816 NMSTL::ptr<NMSTL::rpc_request> req(in);
02817
if (is >> *in)
return req;
02818 }
02819
case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_METHOD_ID:
02820 {
02821 NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_IN;
02822 NMSTL::ptr<NMSTL::rpc_request> req(in);
02823
if (is >> *in)
return req;
02824 }
02825
case NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_METHOD_ID:
02826 {
02827 NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::subscribe_many_IN;
02828 NMSTL::ptr<NMSTL::rpc_request> req(in);
02829
if (is >> *in)
return req;
02830 }
02831
case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_METHOD_ID:
02832 {
02833 NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::get_subscriptions_IN;
02834 NMSTL::ptr<NMSTL::rpc_request> req(in);
02835
if (is >> *in)
return req;
02836 }
02837
case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_METHOD_ID:
02838 {
02839 NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::get_stats_IN;
02840 NMSTL::ptr<NMSTL::rpc_request> req(in);
02841
if (is >> *in)
return req;
02842 }
02843
case NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_METHOD_ID:
02844 {
02845 NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::get_sel_IN;
02846 NMSTL::ptr<NMSTL::rpc_request> req(in);
02847
if (is >> *in)
return req;
02848 }
02849
case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_METHOD_ID:
02850 {
02851 NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::create_stream_IN;
02852 NMSTL::ptr<NMSTL::rpc_request> req(in);
02853
if (is >> *in)
return req;
02854 }
02855
case NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_METHOD_ID:
02856 {
02857 NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::create_cpview_IN;
02858 NMSTL::ptr<NMSTL::rpc_request> req(in);
02859
if (is >> *in)
return req;
02860 }
02861
case NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_METHOD_ID:
02862 {
02863 NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::update_stream_IN;
02864 NMSTL::ptr<NMSTL::rpc_request> req(in);
02865
if (is >> *in)
return req;
02866 }
02867
case NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_METHOD_ID:
02868 {
02869 NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::ack_IN;
02870 NMSTL::ptr<NMSTL::rpc_request> req(in);
02871
if (is >> *in)
return req;
02872 }
02873
case NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_METHOD_ID:
02874 {
02875 NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::trim_IN;
02876 NMSTL::ptr<NMSTL::rpc_request> req(in);
02877
if (is >> *in)
return req;
02878 }
02879
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_METHOD_ID:
02880 {
02881 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_recovery_method_IN;
02882 NMSTL::ptr<NMSTL::rpc_request> req(in);
02883
if (is >> *in)
return req;
02884 }
02885
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_METHOD_ID:
02886 {
02887 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_primary_status_IN;
02888 NMSTL::ptr<NMSTL::rpc_request> req(in);
02889
if (is >> *in)
return req;
02890 }
02891
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_METHOD_ID:
02892 {
02893 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_secondaries_IN;
02894 NMSTL::ptr<NMSTL::rpc_request> req(in);
02895
if (is >> *in)
return req;
02896 }
02897
case NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_METHOD_ID:
02898 {
02899 NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::set_replicas_IN;
02900 NMSTL::ptr<NMSTL::rpc_request> req(in);
02901
if (is >> *in)
return req;
02902 }
02903
case NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_METHOD_ID:
02904 {
02905 NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN *in =
new NMSTL::RPCDefs<Borealis::QueryProcessor>::checkpoint_IN;
02906 NMSTL::ptr<NMSTL::rpc_request> req(in);
02907
if (is >> *in)
return req;
02908 }
02909 }
02910
return NMSTL::ptr<NMSTL::rpc_request>();
02911 }
02912
02913
02914
#endif
02915