00001
#ifndef BOREALIS_CONSISTENCYMNGR_H
00002
#define BOREALIS_CONSISTENCYMNGR_H
00003
00004
#include "common.h"
00005
#include "Query.h"
00006
#include "StreamEvent.h"
00007
#include "Timestamp.h"
00008
#include "Tuple.h"
00009
00010
#include <NMSTL/tqueuerpc>
00011
#include <NMSTL/tcprpc>
00012
#include <NMSTL/xmlrpc>
00013
#include <fstream>
00014
00015 BOREALIS_NAMESPACE_BEGIN;
00016
00017
class QueryProcessor;
00018
00019
00020 class MicroBenchmarkType {
00021
00022
public:
00023
00024 enum Type {
00025
00026
NONE = 0,
00027
TUPLE_TYPES,
00028
UNDO_LOG,
00029
CHECKPOINT,
00030
00031
00032
SUNION,
00033
SUNION_PUNCTUATION,
00034
SOUTPUT,
00035
00036
BLOCK,
00037
00038
00039
ALL_TOGETHER_UNDO_REDO_NO_MOVE,
00040
ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE,
00041
00042
00043
ALL_TOGETHER_UNDO_REDO,
00044
ALL_TOGETHER_CHECKPOINT_REDO,
00045
00046
00047
ADAPTIVE
00048
00049
00050 };
00051
00052 bool doKeepUndoLog()
00053 {
return ((
_type ==
UNDO_LOG)
00054 || (
_type ==
ALL_TOGETHER_UNDO_REDO_NO_MOVE)
00055 || (
_type ==
ALL_TOGETHER_UNDO_REDO)
00056 || (
_type ==
ADAPTIVE));
00057 }
00058
00059 bool doSerialize()
00060 {
return ((
_type ==
SUNION) || (
_type ==
SUNION_PUNCTUATION) || (
_type ==
BLOCK)
00061 || (
_type ==
ALL_TOGETHER_UNDO_REDO_NO_MOVE) || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE)
00062 || (
_type ==
ALL_TOGETHER_UNDO_REDO) || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO)
00063 || (
_type ==
ADAPTIVE));
00064 }
00065
00066 bool doPunctuation()
00067 {
return ((
_type ==
SUNION_PUNCTUATION) || (
_type ==
BLOCK)
00068 || (
_type ==
ALL_TOGETHER_UNDO_REDO_NO_MOVE) || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE)
00069 || (
_type ==
ALL_TOGETHER_UNDO_REDO) || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO)
00070 || (
_type ==
ADAPTIVE));
00071
00072 }
00073
00074 bool doCheckpoint()
00075 {
return ((
_type ==
CHECKPOINT)
00076 || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE)
00077 || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO)
00078 || (
_type ==
ADAPTIVE)
00079 );
00080 }
00081
00082 bool doBlock()
00083 {
return (
_type ==
BLOCK);
00084 }
00085
00086 bool doForceMoveSubscriptions()
00087 {
return ( (
_type ==
ALL_TOGETHER_UNDO_REDO) || (
_type ==
ALL_TOGETHER_CHECKPOINT_REDO));
00088 }
00089
00090 bool doUndoReconciliation()
00091 {
return ((
_type ==
ALL_TOGETHER_UNDO_REDO_NO_MOVE) || (
_type ==
ALL_TOGETHER_UNDO_REDO));
00092 }
00093
00094 bool doAdaptive()
00095 {
return (
_type ==
ADAPTIVE);
00096 }
00097
00098 MicroBenchmarkType(Type t = ALL_TOGETHER_UNDO_REDO)
00099 :
_type(t)
00100 {
00101 }
00102
00103 MicroBenchmarkType(string type)
00104 {
00105
if ( type ==
"none" || type ==
"NONE" )
00106
_type =
NONE;
00107
00108
else if ( type ==
"tuple_types" || type ==
"TUPLE_TYPES" )
00109
_type =
TUPLE_TYPES;
00110
00111
else if ( type ==
"undo_log" || type ==
"UNDO_LOG" )
00112
_type =
UNDO_LOG;
00113
00114
else if ( type ==
"checkpoint" || type ==
"CHECKPOINT" )
00115
_type =
CHECKPOINT;
00116
00117
else if ( type ==
"sunion" || type ==
"SUNION" )
00118
_type =
SUNION;
00119
00120
else if ( type ==
"sunion_punctuation" || type ==
"SUNION_PUNCTUATION" )
00121
_type =
SUNION_PUNCTUATION;
00122
00123
else if ( type ==
"soutput" || type ==
"SOUTPUT" )
00124
_type =
SOUTPUT;
00125
00126
else if ( type ==
"block" || type ==
"BLOCK" )
00127
_type =
BLOCK;
00128
00129
else if ( type ==
"all_together_checkpoint_redo" || type ==
"ALL_TOGETHER_CHECKPOINT_REDO" )
00130
_type =
ALL_TOGETHER_CHECKPOINT_REDO;
00131
00132
else if ( type ==
"all_together_checkpoint_redo_no_move" || type ==
"ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE" )
00133
_type =
ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE;
00134
00135
else if ( type ==
"all_together_undo_redo" || type ==
"ALL_TOGETHER_UNDO_REDO" )
00136
_type =
ALL_TOGETHER_UNDO_REDO;
00137
00138
else if ( type ==
"all_together_undo_redo_no_move" || type ==
"ALL_TOGETHER_UNDO_REDO_NO_MOVE" )
00139
_type =
ALL_TOGETHER_UNDO_REDO_NO_MOVE;
00140
00141
else _type =
ADAPTIVE;
00142
00143 }
00144
00145 string
as_string( )
const {
00146
switch (
_type) {
00147
case NONE:
return string(
"none");
00148
case TUPLE_TYPES :
return string(
"tuple_types");
00149
case UNDO_LOG:
return string(
"undo_log");
00150
case CHECKPOINT:
return string(
"checkpoint");
00151
case SUNION:
return string(
"sunion");
00152
case SUNION_PUNCTUATION:
return string(
"sunion_punctuation");
00153
case SOUTPUT:
return string(
"soutput");
00154
case BLOCK:
return string(
"block");
00155
case ALL_TOGETHER_CHECKPOINT_REDO_NO_MOVE:
return string(
"all_together_checkpoint_redo_no_move");
00156
case ALL_TOGETHER_CHECKPOINT_REDO:
return string(
"all_together_checkpoint_redo");
00157
case ALL_TOGETHER_UNDO_REDO_NO_MOVE :
return string(
"all_together_undo_redo_no_move");
00158
case ALL_TOGETHER_UNDO_REDO :
return string(
"all_together_undo_redo");
00159
case ADAPTIVE :
return string(
"adaptive");
00160
default:
return string(
"adaptive");
00161 }
00162 }
00163
00164 Type _type;
00165
00166 };
00167
00168
NMSTL_TO_STRING(
MicroBenchmarkType);
00169
00170
00177 class ConsistencyMngr {
00178
public:
00179
00185
ConsistencyMngr(
QueryProcessor& qp);
00186
00190
~ConsistencyMngr();
00191
00196
void checkForControlStreams(Query& q);
00197
00198
void set_replicas(vector<MedusaID> replicas);
00199
void add_subscription(Subscription sub);
00200
void remove_subscription(Subscription sub);
00201
00202
private:
00203
00205 Name findControlStreamName(
const Box::StreamBindings& outputs);
00206
00208
void monitorControlStream(Name control_stream_name);
00209
00211 Status handleControlTuples(ptr<StreamEvent> event);
00212
00214
void sendControlTuple(Name output_stream_name, TupleType type,
Timestamp since);
00215
00216
void startCheckpointing();
00217
00219
void checkpoint();
00220
00221
void launchReconciliation(Name control_stream,
Timestamp time);
00222
void reconcile(Name control_stream,
Timestamp time);
00223
void finishReconciliation();
00224
00226
void updateSubHistory();
00227
void moveSubscriptionsIfNeeded();
00228
void releaseDataPathIfNeeded();
00229
void moveSubscriptionsAway();
00230
void moveSubscriptionsAway2(MedusaID replica, RPC< vector<Subscription> > result);
00231
00232
void moveSubscriptionsBack();
00233
void moveSubscriptionsBack2(RPC< vector<Subscription> > result);
00234
00236
QueryProcessor& _qp;
00237
00239 vector<MedusaID> _replicas;
00240 MedusaID _sub_temp_location;
00241
00243 vector<Subscription> _subs;
00244
00250
typedef map< Name, pair<Name, int> > IOControl;
00251 IOControl _io_control;
00252
00254
bool _reconciliation_in_progress;
00255
00256
static const int CHECKPOINT_INTERVAL = 1000;
00257
int _checkpoint_interval;
00258
Timestamp _last_checkpoint_time;
00259
bool _checkpointing_started;
00260
bool _suspend_checkpoints;
00261
00262
00263
static const int MAX_FAILURE = 30000;
00264
static const int MAX_NO_SEND_DELAY = 10000;
00265
static const int MAX_DELAY_DEFAULT = 1000;
00266
static const double ALPHA_MAX = 0.6;
00267
int _max_delay;
00268
double _actual_delay;
00269
Timestamp _failure_start_time;
00270
00271
00272
MicroBenchmarkType _benchmark;
00273 ofstream _log_file;
00274 vector<long long> _measurements;
00275
int _state_size;
00276
00277 };
00278
00279 BOREALIS_NAMESPACE_END;
00280
00281
#endif