00001
#ifndef AURORANODE_H
00002
#define AURORANODE_H
00003
00004
#include "Exceptions.h"
00005
00006
#include "ConfFile.h"
00007
#include "CpuStats.h"
00008
#include "QBox.h"
00009
#include "Stream.h"
00010
#include "TupleDescription.h"
00011
#include "Scheduler.h"
00012
#include "StreamProperties.h"
00013
#include "LockSet.h"
00014
00015
#include "StreamDef.h"
00016
00017
#include "FixLenTimeSeries.h"
00018
#include "StatsMgr.h"
00019
#include "TSStats.h"
00020
#include "VersionRWLock.h"
00021
00022
#ifdef USE_STREAMMON
00023
#include "StreamMon.h"
00024
#endif
00025
00026
#include "BoxPackage.h"
00027
#include "NodeTable.h"
00028
00029 BOREALIS_NAMESPACE_BEGIN;
00030
00031
class SchedulerTask;
00032
class ConverterBoxRunner;
00033
00034
class CP;
00035
class CPView;
00036
class CPViewDescription;
00037
00038
00065 class AuroraNode
00066 {
00069
struct Subscription
00070 { ptr<TupleQueue> _q;
00071
00072 };
00073
00074
00078
struct DequeueContext
00079 {
TuplesAvailableMailbox _mailbox;
00080 map<Stream*, Subscription> _subscriptions;
00081 list<ptr<TupleQueue> > _deleted_queues;
00082
00083
00084
void unsubscribeAll();
00085 };
00086
00087
00088
friend struct UnsubscribeAllTask;
00089
friend struct ModifyNetworkTask;
00090
friend struct GenericCheckpointTask;
00091
friend struct CheckpointTask;
00092
friend struct RecoveryTask;
00093
00094
public:
00096 enum EntityType
00097 {
00099
STREAM = 1,
00100
00102
SCHEMA = 2,
00103
00105
BOX = 3,
00106
00108
INPUT_STREAMS = 100,
00109
00112
OUTPUT_STREAMS = 101
00113 };
00114
00115
00116 double selval;
00117
00127 class DequeueHandle
00128 {
00129
public:
00130 DequeueHandle() : _node(0), _context() {}
00131
00132
00135
void subscribe( string name )
throw(
AuroraException );
00136
00137
00140
void unsubscribe( string name )
throw(
AuroraException );
00141
00142
00157 size_t
dequeue( string &stream_name,
00158 TupleDescription &td,
00159
void *buffer,
00160 size_t buffer_size
00161 )
00162
throw(
AuroraException );
00163
00166
void close();
00167
00168
private:
00169
AuroraNode *_node;
00170 ptr<DequeueContext> _context;
00171
00172
friend class AuroraNode;
00173
friend struct UnsubscribeAllTask;
00174 };
00175
00176
00177
friend class DequeueHandle;
00178
00179 typedef ::StreamProperties
StreamProperties;
00180
00181
00183
AuroraNode( ptr<ConfFile> conf = ptr<ConfFile>() )
00184
throw(
AuroraException );
00185
00186
00188
~AuroraNode();
00189
00190
00197
void modifyNetwork( string data,
00198 BoxPackage &boxes
00199 )
00200
throw(
AuroraException );
00201
00205
void load_box( string file_path )
00206
throw(
AuroraException );
00207
00208
00216
void typecheck( string data, map<string, TupleDescription> &streams )
00217
throw(
AuroraException );
00218
00219
00233
void enqueue(
const char *stream_name,
00234
const void *data,
00235 size_t tuple_length,
00236 size_t num_tuples,
00237
const void *schema_uuid = 0
00238 )
00239
throw(
AuroraException );
00240
00241
00243
DequeueHandle getDequeueHandle();
00244
00245
00247
void start() throw(
AuroraException );
00248
00249
00251
void shutdown() throw( AuroraException );
00252
00253
00257
void typecheck( string data, vector<
StreamProperties> &streams )
00258 throw( AuroraException );
00259
00260
00264
void validate( string data ) throw( AuroraException );
00265
00266
00269
StreamProperties getStreamProperties( string entity )
00270 throw( AuroraException );
00271
00272
00275 string describe( string entity ) throw( AuroraException );
00276
00277
00281
void listEntities( EntityType type, vector<string> &names )
00282 throw( AuroraException );
00283
00284
00289
void drain() throw( AuroraException );
00290
00291
00293
void chokeBoxes( vector<string> names );
00294
00296
void suspendBoxes( vector<string> names );
00297
00299
void resumeBoxes( vector<string> names );
00300
00301
00303
void packBoxes( string data, BoxPackage &boxes )
00304 throw( AuroraException );
00305
00311
void checkpoint(vector<
long long>& measurements);
00313
void recover();
00314
00316
void schedule_checkpoint_task(ptr<
GenericCheckpointTask> task);
00317
00319 string repr() const;
00320
00321
00323 Statistics &getStatsObject()
00324 {
return(
sm.
getStats() );
00325 }
00326
00327
00328 const Statistics &
getStatsObject()
const
00329
{
return(
sm.
getStats() );
00330 }
00331
00332
00334 set<ptr<QBox> >
getBoxes() const;
00335
00336
00338 ptr<
QBox> getBox( string name )
const
00339
{
return( lookup( _boxes, name ));
00340 }
00341
00342
00344 ptr<Stream>
getStream( string name )
const
00345
{
return( lookup( _streams, name ));
00346 }
00347
00348
00351 ptr<LockSet>
getLockSet( string name );
00352
00353
00355 TupleDescription
getTupleDescription( string name )
const
00356
{
return( lookup( _schemas, name ));
00357 }
00358
00359
00361 TupleDescription
getTupleDescriptionByHash( string hash )
const
00362
{
return( lookup( _schemas_by_hash, hash ));
00363 }
00364
00365
00368 ptr<Stream>
createStream( string name, TupleDescription desc )
00369
throw(
AuroraException );
00370
00371
00373
void create_cpview( CPViewDescription view_desc, StreamDef streamdef )
00374
throw(
AuroraException );
00375
00376
00378 ptr<ConfFile>
getProperties() const throw(
AuroraException )
00379 {
return( _conf );
00380 }
00381
00382
00384
unsigned int xGetInputQueueLengthSum() const;
00385
00386
00387 string getNetworkAsDot() const;
00388 string getNetworkAsDotSubgraph(string prefix, string attrs) const;
00389 string getNetworkAsDotStatements(string prefix) const;
00390
00391
00392 PagePool &getPagePool()
00393 {
return( *_page_pool );
00394 }
00395
00396
00397 const CpuStats &
getCpuStats()
const
00398
{
return( _cpu_stats );
00399 }
00400
00401
00406 StatsMgr sm;
00407
00408
private:
00409
typedef map<string, ptr<Stream> > StreamMap;
00410
typedef map<string, ptr<QBox> > BoxMap;
00411
typedef map<string, TupleDescription> SchemaMap;
00412
typedef map<string, TupleDescription> SchemaByHashMap;
00413
00414
00415
00416
struct Topology
00417 {
00418 StreamMap _streams;
00419 BoxMap _boxes;
00420 SchemaMap _schemas;
00421
TableMap _tables;
00422
00423
00424
bool containsEntity( string name )
const;
00425
00427
void copyFrom(
const Topology &other );
00428
00429 string
repr()
const;
00430 };
00431
00432
00433
void parseAddBoxXml(
const DOMElement *addNode,
00434 Topology &new_topology,
00435 BoxPackage &packedBoxes
00436 )
00437
throw(
AuroraException );
00438
00439
00440
void parseAddXml(
const DOMElement *addNode,
00441 Topology &new_topology,
00442 BoxPackage &packedBoxes
00443 )
00444
throw(
AuroraException );
00445
00446
00447
void parseRemoveBoxXml(
const DOMElement *removeBoxNode,
00448 Topology &top
00449 )
00450
throw(
AuroraException );
00451
00452
00453
void parseRemoveXml(
const DOMElement *removeNode,
00454 Topology &top,
00455 BoxPackage &box_package
00456 )
00457
throw(
AuroraException );
00458
00459
00460
00463
void handleModifyXml( string xml,
00464
bool commit,
00465 map<string, TupleDescription>&,
00466 BoxPackage &boxes
00467 )
00468
throw(
AuroraException );
00469
00471
void schedule( ptr<SchedulerTask> task );
00472
00473
void loadLibrary(string file)
throw (
AuroraException);
00474
00475
bool loadLibrariesInDirectory(string directory)
throw (
AuroraException);
00476
00477
void loadLibraries(
const ConfFile& conf);
00478
00479
enum State
00480 { STATE_INIT,
00481 STATE_RUNNING,
00482 STATE_STOPPED
00483 };
00484
00485 State _state;
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495 ptr<PagePool> _page_pool;
00496
00498 Topology _topology;
00499
00501
NodeTable _node_table;
00502
00504 StreamMap &_streams;
00505 BoxMap &_boxes;
00506 SchemaMap &_schemas;
00508
00509
00510 SchemaByHashMap _schemas_by_hash;
00511
00512 map<string, ptr<LockSet> > _lock_sets;
00513
00514
00515
00516
bool addStats;
00517
00520
PtMutex _lock;
00521
00522
00524
typedef set< ptr<DequeueContext> > DeqContexts;
00525 DeqContexts _deqs;
00526
00527
00529 ptr<ConfFile> _conf;
00530 ptr<Scheduler> _scheduler;
00531
00532
00534 ptr<ConverterBoxRunner> _cbox_runner;
00535
00536
00539
typedef map<Name, ptr<CP> > CPs;
00540 CPs m_cps;
00541
00542
00544 vector<ptr<CPView> > m_cp_views;
00545
00546
00547
CpuStats _cpu_stats;
00548
00550 BoxPackage _checkpointed_state;
00551
00552
00553
#ifdef USE_STREAMMON
00554
00555 ptr<StreamMon> _stream_mon;
00556
#endif
00557
};
00558
00559 typedef AuroraNode::DequeueHandle DequeueHandle;
00560
00561 BOREALIS_NAMESPACE_END;
00562
00563
#endif // AURORANODE_H