00001
00002
#ifndef BOREALIS_DATAPATH_H
00003
#define BOREALIS_DATAPATH_H
00004
00005
#include <NMSTL/callback>
00006
#include <NMSTL/thread>
00007
#include <NMSTL/tqueue>
00008
#include <NMSTL/netioevent>
00009
#include <NMSTL/serial>
00010
00011
#include "runtime/AuroraNode.h"
00012
#include "Exceptions.h"
00013
00014
#include "MTuple.h"
00015
#include "StreamState.h"
00016
#include "DataHandler.h"
00017
#include "Recovery.h"
00018
00019
#include <sys/types.h>
00020
#include <sys/stat.h>
00021
#include <unistd.h>
00022
00023 BOREALIS_NAMESPACE_BEGIN;
00024
00025
00033 class DataPath {
00034
00035
00036
public:
00037
DataPath(
AuroraNode& node, InetAddress data_add,
bool m_no_dups =
false);
00038
~DataPath();
00039
void init();
00040
void stop_enqueue();
00041
00042
00043 Mutex
m_aurora_lock;
00044
00049 vector< pair< ptr<StreamState> , string> >
last_input_tuples();
00050
00055 vector< pair< ptr<StreamState> , string> >
last_output_tuples();
00056
00061 vector<StreamEvent>
checkpoint_tuples();
00062
00066
void set_recovery_method(RecoveryMethod method);
00067
00071 RecoveryMethod
get_recovery_method();
00072
00076
void set_primary_status(
bool status);
00077
00081
bool is_primary();
00082
00086
void suspend_dequeue();
00087
00091
void resume_dequeue();
00092
00093
00094
void add_input_path(ptr<StreamState> str);
00095
bool is_input_path(ptr<StreamState> str);
00096
void remove_input_path(ptr<StreamState> str);
00097 string
update_input_path(Name stream_name, StreamDef& sd);
00098
00099
00100
void add_output_path(ptr<StreamState> str);
00101
void remove_output_path(ptr<StreamState> str);
00102
void set_data_path(Name stream_name, MedusaID fast_dst,
bool with_history, string first_tuple);
00103
void close_data_path(Name stream_name, MedusaID fast_dst);
00104
00109 Status
send_downstream(vector<StreamEvent>& events_to_send);
00110
00114
void update(StreamEvent& stream_event);
00115
00119
void update(StreamID stream_id, string last_tuple);
00120
00121
00125
void subscribe_cb(Name stream_name, DataHandler::DHCallback cb);
00126
00127
00131 void enqueue(ptr<StreamEvent> event) {
00132 m_enq_thread.do_handle_input_event(event);
00133 }
00134
00135
private:
00136
00138
static const int MAX_PER_SENDER_OUTPUT_QUEUE = 10000000;
00139
00140
00141
bool m_enqueuing;
00142
00143
00144
AuroraNode& m_aurora;
00145
00146
00147
DequeueHandle m_deq_handle;
00148
00149
00150
typedef map< Name, ptr<StreamState> > IOPaths;
00151 IOPaths m_input_paths;
00152 Mutex m_input_paths_lock;
00153 IOPaths m_output_paths;
00154 Mutex m_output_paths_lock;
00155
00156
00157 map< Name, DataHandler::DHCallback > m_control_callbacks;
00158
00159
00160
bool m_is_primary;
00161
00162
00163 RecoveryMethod m_recovery_method;
00164
00165
00166
bool m_no_dups;
00167
00168
00176
friend class EnqueueThread :
public Thread {
00177
DataPath& m_data_path;
00178 InetAddress m_data_add;
00179 IOEventLoop m_data_loop;
00180 DataHandlers m_data_handlers;
00181
00182
public:
00183 EnqueueThread(
DataPath& data_path, InetAddress data_add)
00184 : m_data_path(data_path), m_data_add(data_add) {}
00185
virtual ~EnqueueThread() { stop(); }
00186
00187 Status do_handle_input_event( ptr<StreamEvent> event);
00188
void stop() { m_data_loop.terminate(); }
00189
void run();
00190 };
00191 EnqueueThread m_enq_thread;
00192
00193
00195
friend class DequeueThread :
public Thread {
00196
DataPath& m_data_path;
00197
bool m_done;
00198
public:
00199 DequeueThread(
DataPath& data_path) : m_data_path(data_path) {}
00200
virtual ~DequeueThread() { stop(); }
00201
void stop() { m_done =
true; }
00202
void run();
00203 StreamID buffer(string stream_name,
char* buf, size_t total_read, size_t tuple_size);
00204
00205 };
00206 DequeueThread m_deq_thread;
00207
00208
00209
bool m_fwd_suspend;
00210
00212 list< ptr<StreamEvent> > m_events_to_send;
00213
00215 Mutex m_events_to_send_lock;
00216
00217
00218 IOHandle m_deq_pipe_write, m_deq_pipe_read;
00219
00220
00221
00223
friend class FastDequeueForwarder :
public Thread {
00224
DataPath& m_data_path;
00225
public:
00226 IOEventLoop m_data_fwd_loop;
00227
00232 friend class ForwardHandler :
public IOHandler {
00233
public:
00234
ForwardHandler(IOEventLoop& loop, DataPath::FastDequeueForwarder& fwd_thread);
00235
void ravail();
00236
private:
00237
DataPath& m_data_path;
00238 DataPath::FastDequeueForwarder& m_fast_dequeue_fwd;
00239 };
00240 ptr<ForwardHandler> m_fwd_handler;
00241
00242
00243
typedef map< Name, list< pair<MedusaID, ptr<DataHandler> > > > DataHandlers;
00244 DataHandlers m_data_handlers;
00245 vector< ptr<DataHandler> > m_failed;
00246
00247
00248 Mutex m_data_handlers_lock;
00249
00250 FastDequeueForwarder(
DataPath& data_path);
00251 ~FastDequeueForwarder();
00252
00259 Status
set_data_path(Name stream_name, MedusaID dst,
00260
bool with_history, string first_tuple);
00261
00262
00263 Status
close_data_path(Name stream_name, MedusaID dst);
00264
00268
void close_all_data_paths(Name stream_name);
00269
00270
00271 Status send_data(Name stream_name, ptr<StreamEvent> event);
00272
00277 Status
send_downstream(vector<StreamEvent>& events_to_send);
00278
00284
00285
00286
void clean_failed_handlers();
00287
00288
void run();
00289
void stop();
00290 };
00291 FastDequeueForwarder m_fast_dequeue_fwd;
00292
00293
00294
00295 };
00296
00297 BOREALIS_NAMESPACE_END;
00298
00299
#endif