Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

AuroraNode.h

Go to the documentation of this file.
00001 #ifndef AURORANODE_H 00002 #define AURORANODE_H 00003 00004 #include "Exceptions.h" 00005 00006 #include "ConfFile.h" 00007 #include "CpuStats.h" 00008 #include "QBox.h" 00009 #include "Stream.h" 00010 #include "TupleDescription.h" 00011 #include "Scheduler.h" 00012 #include "StreamProperties.h" 00013 #include "LockSet.h" 00014 00015 #include "StreamDef.h" 00016 00017 #include "FixLenTimeSeries.h" 00018 #include "StatsMgr.h" 00019 #include "TSStats.h" 00020 #include "VersionRWLock.h" 00021 00022 #ifdef USE_STREAMMON 00023 #include "StreamMon.h" 00024 #endif 00025 00026 #include "BoxPackage.h" 00027 #include "NodeTable.h" 00028 00029 BOREALIS_NAMESPACE_BEGIN; 00030 00031 class SchedulerTask; 00032 class ConverterBoxRunner; 00033 00034 class CP; 00035 class CPView; 00036 class CPViewDescription; 00037 00038 00065 class AuroraNode 00066 { 00069 struct Subscription 00070 { ptr<TupleQueue> _q; 00071 00072 }; 00073 00074 00078 struct DequeueContext 00079 { TuplesAvailableMailbox _mailbox; 00080 map<Stream*, Subscription> _subscriptions; 00081 list<ptr<TupleQueue> > _deleted_queues; 00082 00083 // Called by UnsubscribeAllTask 00084 void unsubscribeAll(); 00085 }; 00086 00087 00088 friend struct UnsubscribeAllTask; 00089 friend struct ModifyNetworkTask; 00090 friend struct GenericCheckpointTask; 00091 friend struct CheckpointTask; 00092 friend struct RecoveryTask; 00093 00094 public: 00096 enum EntityType 00097 { 00099 STREAM = 1, 00100 00102 SCHEMA = 2, 00103 00105 BOX = 3, 00106 00108 INPUT_STREAMS = 100, 00109 00112 OUTPUT_STREAMS = 101 00113 }; 00114 00115 00116 double selval; 00117 00127 class DequeueHandle 00128 { 00129 public: 00130 DequeueHandle() : _node(0), _context() {} 00131 00132 00135 void subscribe( string name ) throw( AuroraException ); 00136 00137 00140 void unsubscribe( string name ) throw( AuroraException ); 00141 00142 00157 size_t dequeue( string &stream_name, 00158 TupleDescription &td, 00159 void *buffer, 00160 size_t buffer_size 00161 ) 00162 throw( AuroraException ); 00163 00166 void close(); 00167 00168 private: 00169 AuroraNode *_node; 00170 ptr<DequeueContext> _context; 00171 00172 friend class AuroraNode; 00173 friend struct UnsubscribeAllTask; 00174 }; 00175 00176 00177 friend class DequeueHandle; 00178 00179 typedef ::StreamProperties StreamProperties; 00180 00181 00183 AuroraNode( ptr<ConfFile> conf = ptr<ConfFile>() ) 00184 throw( AuroraException ); 00185 00186 00188 ~AuroraNode(); 00189 00190 00197 void modifyNetwork( string data, 00198 BoxPackage &boxes 00199 ) 00200 throw( AuroraException ); 00201 00205 void load_box( string file_path ) 00206 throw( AuroraException ); 00207 00208 00216 void typecheck( string data, map<string, TupleDescription> &streams ) 00217 throw( AuroraException ); 00218 00219 00233 void enqueue( const char *stream_name, 00234 const void *data, 00235 size_t tuple_length, 00236 size_t num_tuples, 00237 const void *schema_uuid = 0 00238 ) 00239 throw( AuroraException ); 00240 00241 00243 DequeueHandle getDequeueHandle(); 00244 00245 00247 void start() throw( AuroraException ); 00248 00249 00251 void shutdown() throw( AuroraException ); 00252 00253 00257 void typecheck( string data, vector<StreamProperties> &streams ) 00258 throw( AuroraException ); 00259 00260 00264 void validate( string data ) throw( AuroraException ); 00265 00266 00269 StreamProperties getStreamProperties( string entity ) 00270 throw( AuroraException ); 00271 00272 00275 string describe( string entity ) throw( AuroraException ); 00276 00277 00281 void listEntities( EntityType type, vector<string> &names ) 00282 throw( AuroraException ); 00283 00284 00289 void drain() throw( AuroraException ); 00290 00291 00293 void chokeBoxes( vector<string> names ); 00294 00296 void suspendBoxes( vector<string> names ); 00297 00299 void resumeBoxes( vector<string> names ); 00300 00301 00303 void packBoxes( string data, BoxPackage &boxes ) 00304 throw( AuroraException ); 00305 00311 void checkpoint(vector<long long>& measurements); 00313 void recover(); 00314 00316 void schedule_checkpoint_task(ptr<GenericCheckpointTask> task); 00317 00319 string repr() const; 00320 00321 00323 Statistics &getStatsObject() 00324 { return( sm.getStats() ); 00325 } 00326 00327 00328 const Statistics &getStatsObject() const 00329 { return( sm.getStats() ); 00330 } 00331 00332 00334 set<ptr<QBox> > getBoxes() const; 00335 00336 00338 ptr<QBox> getBox( string name ) const 00339 { return( lookup( _boxes, name )); 00340 } 00341 00342 00344 ptr<Stream> getStream( string name ) const 00345 { return( lookup( _streams, name )); 00346 } 00347 00348 00351 ptr<LockSet> getLockSet( string name ); 00352 00353 00355 TupleDescription getTupleDescription( string name ) const 00356 { return( lookup( _schemas, name )); 00357 } 00358 00359 00361 TupleDescription getTupleDescriptionByHash( string hash ) const 00362 { return( lookup( _schemas_by_hash, hash )); 00363 } 00364 00365 00368 ptr<Stream> createStream( string name, TupleDescription desc ) 00369 throw( AuroraException ); 00370 00371 00373 void create_cpview( CPViewDescription view_desc, StreamDef streamdef ) 00374 throw( AuroraException ); 00375 00376 00378 ptr<ConfFile> getProperties() const throw( AuroraException ) 00379 { return( _conf ); 00380 } 00381 00382 00384 unsigned int xGetInputQueueLengthSum() const; 00385 00386 00387 string getNetworkAsDot() const; 00388 string getNetworkAsDotSubgraph(string prefix, string attrs) const; 00389 string getNetworkAsDotStatements(string prefix) const; 00390 00391 00392 PagePool &getPagePool() 00393 { return( *_page_pool ); 00394 } 00395 00396 00397 const CpuStats &getCpuStats() const 00398 { return( _cpu_stats ); 00399 } 00400 00401 00406 StatsMgr sm; 00407 00408 private: 00409 typedef map<string, ptr<Stream> > StreamMap; 00410 typedef map<string, ptr<QBox> > BoxMap; 00411 typedef map<string, TupleDescription> SchemaMap; 00412 typedef map<string, TupleDescription> SchemaByHashMap; 00413 00414 // typedef map<string, ptr<CPoint> > CPointMap; 00415 00416 struct Topology 00417 { 00418 StreamMap _streams; 00419 BoxMap _boxes; 00420 SchemaMap _schemas; 00421 TableMap _tables; 00422 //CPointMap _cpoints; 00423 00424 bool containsEntity( string name ) const; 00425 00427 void copyFrom( const Topology &other ); 00428 00429 string repr() const; 00430 }; 00431 00432 00433 void parseAddBoxXml( const DOMElement *addNode, 00434 Topology &new_topology, 00435 BoxPackage &packedBoxes 00436 ) 00437 throw( AuroraException ); 00438 00439 00440 void parseAddXml( const DOMElement *addNode, 00441 Topology &new_topology, 00442 BoxPackage &packedBoxes 00443 ) 00444 throw( AuroraException ); 00445 00446 00447 void parseRemoveBoxXml( const DOMElement *removeBoxNode, 00448 Topology &top 00449 ) 00450 throw( AuroraException ); 00451 00452 00453 void parseRemoveXml( const DOMElement *removeNode, 00454 Topology &top, 00455 BoxPackage &box_package 00456 ) 00457 throw( AuroraException ); 00458 00459 00460 00463 void handleModifyXml( string xml, 00464 bool commit, 00465 map<string, TupleDescription>&, 00466 BoxPackage &boxes 00467 ) 00468 throw( AuroraException ); 00469 00471 void schedule( ptr<SchedulerTask> task ); 00472 00473 void loadLibrary(string file) throw (AuroraException); 00474 00475 bool loadLibrariesInDirectory(string directory) throw (AuroraException); 00476 00477 void loadLibraries(const ConfFile& conf); 00478 00479 enum State 00480 { STATE_INIT, 00481 STATE_RUNNING, 00482 STATE_STOPPED 00483 }; 00484 00485 State _state; 00486 00487 // Note that order is important here; the streams/boxes may depend 00488 // on the PagePool, so they must be destroyed *before* the 00489 // PagePool. 00490 // 00491 // Note that these are not currently protected by a mutex (which 00492 // is fine because the network cannot be modified after it has 00493 // started). 00494 00495 ptr<PagePool> _page_pool; 00496 00498 Topology _topology; 00499 00501 NodeTable _node_table; 00502 00504 StreamMap &_streams; 00505 BoxMap &_boxes; 00506 SchemaMap &_schemas; 00508 // CPointMap &_cpoints; 00509 00510 SchemaByHashMap _schemas_by_hash; 00511 00512 map<string, ptr<LockSet> > _lock_sets; 00513 00514 // please ignore this hack. it is implemented until Ying fixes the 00515 // stats manager and implements a removeTSStat 00516 bool addStats; 00517 00520 PtMutex _lock; 00521 00522 00524 typedef set< ptr<DequeueContext> > DeqContexts; 00525 DeqContexts _deqs; 00526 00527 00529 ptr<ConfFile> _conf; 00530 ptr<Scheduler> _scheduler; 00531 00532 00534 ptr<ConverterBoxRunner> _cbox_runner; 00535 00536 00539 typedef map<Name, ptr<CP> > CPs; 00540 CPs m_cps; 00541 00542 00544 vector<ptr<CPView> > m_cp_views; 00545 00546 00547 CpuStats _cpu_stats; 00548 00550 BoxPackage _checkpointed_state; 00551 00552 00553 #ifdef USE_STREAMMON 00554 00555 ptr<StreamMon> _stream_mon; 00556 #endif 00557 }; 00558 00559 typedef AuroraNode::DequeueHandle DequeueHandle; 00560 00561 BOREALIS_NAMESPACE_END; 00562 00563 #endif // AURORANODE_H

Generated on Fri Nov 12 15:15:20 2004 for Borealis by doxygen 1.3.8