Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

DataPath.h

Go to the documentation of this file.
00001 00002 #ifndef BOREALIS_DATAPATH_H 00003 #define BOREALIS_DATAPATH_H 00004 00005 #include <NMSTL/callback> 00006 #include <NMSTL/thread> 00007 #include <NMSTL/tqueue> 00008 #include <NMSTL/netioevent> 00009 #include <NMSTL/serial> 00010 00011 #include "runtime/AuroraNode.h" 00012 #include "Exceptions.h" 00013 00014 #include "MTuple.h" 00015 #include "StreamState.h" 00016 #include "DataHandler.h" 00017 #include "Recovery.h" 00018 00019 #include <sys/types.h> 00020 #include <sys/stat.h> 00021 #include <unistd.h> 00022 00023 BOREALIS_NAMESPACE_BEGIN; 00024 00025 00033 class DataPath { 00034 00035 00036 public: 00037 DataPath(AuroraNode& node, InetAddress data_add, bool m_no_dups = false); 00038 ~DataPath(); 00039 void init(); 00040 void stop_enqueue(); 00041 00042 // So we don't set-up queries and enqueue at the same time 00043 Mutex m_aurora_lock; 00044 00049 vector< pair< ptr<StreamState> , string> > last_input_tuples(); 00050 00055 vector< pair< ptr<StreamState> , string> > last_output_tuples(); 00056 00061 vector<StreamEvent> checkpoint_tuples(); 00062 00066 void set_recovery_method(RecoveryMethod method); 00067 00071 RecoveryMethod get_recovery_method(); 00072 00076 void set_primary_status(bool status); 00077 00081 bool is_primary(); 00082 00086 void suspend_dequeue(); 00087 00091 void resume_dequeue(); 00092 00093 // Forward events on that stream to Aurora 00094 void add_input_path(ptr<StreamState> str); 00095 bool is_input_path(ptr<StreamState> str); 00096 void remove_input_path(ptr<StreamState> str); 00097 string update_input_path(Name stream_name, StreamDef& sd); 00098 00099 // Forward events on that stream from Aurora to downstream nodes 00100 void add_output_path(ptr<StreamState> str); 00101 void remove_output_path(ptr<StreamState> str); 00102 void set_data_path(Name stream_name, MedusaID fast_dst, bool with_history, string first_tuple); 00103 void close_data_path(Name stream_name, MedusaID fast_dst); 00104 00109 Status send_downstream(vector<StreamEvent>& events_to_send); 00110 00114 void update(StreamEvent& stream_event); 00115 00119 void update(StreamID stream_id, string last_tuple); 00120 00121 00125 void subscribe_cb(Name stream_name, DataHandler::DHCallback cb); 00126 00127 00131 void enqueue(ptr<StreamEvent> event) { 00132 m_enq_thread.do_handle_input_event(event); 00133 } 00134 00135 private: 00136 00138 static const int MAX_PER_SENDER_OUTPUT_QUEUE = 10000000; // 10 MB 00139 00140 // Is the thread that enqueues into Aurora running? 00141 bool m_enqueuing; 00142 00143 // Stream processing engine 00144 AuroraNode& m_aurora; 00145 00146 // Handle through which we dequeue tuples from Aurora 00147 DequeueHandle m_deq_handle; 00148 00149 // Lists of all input and output streams 00150 typedef map< Name, ptr<StreamState> > IOPaths; 00151 IOPaths m_input_paths; 00152 Mutex m_input_paths_lock; 00153 IOPaths m_output_paths; 00154 Mutex m_output_paths_lock; 00155 00156 // If query network produce control streams, callbacks handle their output 00157 map< Name, DataHandler::DHCallback > m_control_callbacks; 00158 00159 // Are we the primary or secondary (the HA component has to tell us) 00160 bool m_is_primary; 00161 00162 // What is the recovery method (the HA component has to tell us) 00163 RecoveryMethod m_recovery_method; 00164 00165 // Should we eliminate duplicates on input streams? 00166 bool m_no_dups; 00167 00168 // -------------------------------------------------- 00176 friend class EnqueueThread : public Thread { 00177 DataPath& m_data_path; 00178 InetAddress m_data_add; // Adress where we listen for new data connections 00179 IOEventLoop m_data_loop; // Event loop for this thread only 00180 DataHandlers m_data_handlers; // Handlers that receive events on input streams 00181 00182 public: 00183 EnqueueThread(DataPath& data_path, InetAddress data_add) 00184 : m_data_path(data_path), m_data_add(data_add) {} 00185 virtual ~EnqueueThread() { stop(); } 00186 // Pushes one event into Aurora 00187 Status do_handle_input_event( ptr<StreamEvent> event); 00188 void stop() { m_data_loop.terminate(); } 00189 void run(); 00190 }; 00191 EnqueueThread m_enq_thread; 00192 00193 // -------------------------------------------------- 00195 friend class DequeueThread : public Thread { 00196 DataPath& m_data_path; 00197 bool m_done; 00198 public: 00199 DequeueThread(DataPath& data_path) : m_data_path(data_path) {} 00200 virtual ~DequeueThread() { stop(); } 00201 void stop() { m_done = true; } 00202 void run(); 00203 StreamID buffer(string stream_name, char* buf, size_t total_read, size_t tuple_size); 00204 00205 }; 00206 DequeueThread m_deq_thread; 00207 00208 // Should we suspend forwarding all events 00209 bool m_fwd_suspend; 00210 00212 list< ptr<StreamEvent> > m_events_to_send; 00213 00215 Mutex m_events_to_send_lock; 00216 00217 // Allows dequeue thread to signal forward thread in a non-blocking manner 00218 IOHandle m_deq_pipe_write, m_deq_pipe_read; 00219 00220 00221 // -------------------------------------------------- 00223 friend class FastDequeueForwarder : public Thread { 00224 DataPath& m_data_path; 00225 public: 00226 IOEventLoop m_data_fwd_loop; 00227 00232 friend class ForwardHandler : public IOHandler { 00233 public: 00234 ForwardHandler(IOEventLoop& loop, DataPath::FastDequeueForwarder& fwd_thread); 00235 void ravail(); 00236 private: 00237 DataPath& m_data_path; 00238 DataPath::FastDequeueForwarder& m_fast_dequeue_fwd; 00239 }; 00240 ptr<ForwardHandler> m_fwd_handler; 00241 00242 //list of data handlers, one list for each output stream 00243 typedef map< Name, list< pair<MedusaID, ptr<DataHandler> > > > DataHandlers; 00244 DataHandlers m_data_handlers; 00245 vector< ptr<DataHandler> > m_failed; 00246 00247 //lock on data handlers. 00248 Mutex m_data_handlers_lock; 00249 00250 FastDequeueForwarder(DataPath& data_path); 00251 ~FastDequeueForwarder(); 00252 00259 Status set_data_path(Name stream_name, MedusaID dst, 00260 bool with_history, string first_tuple); 00261 00262 // Stop forwarding a stream to a downstream node 00263 Status close_data_path(Name stream_name, MedusaID dst); 00264 00268 void close_all_data_paths(Name stream_name); 00269 00270 // Send one event on one stream (multiple destinations) 00271 Status send_data(Name stream_name, ptr<StreamEvent> event); 00272 00277 Status send_downstream(vector<StreamEvent>& events_to_send); 00278 00284 //bool send_one_event_persistently(ptr<StreamEvent> event, ptr<DataHandler> h); 00285 00286 void clean_failed_handlers(); 00287 00288 void run(); 00289 void stop(); 00290 }; 00291 FastDequeueForwarder m_fast_dequeue_fwd; 00292 00293 00294 00295 }; 00296 00297 BOREALIS_NAMESPACE_END; 00298 00299 #endif

Generated on Fri Nov 12 15:15:20 2004 for Borealis by doxygen 1.3.8