00001
#ifndef SUNION_QBOX_H
00002
#define SUNION_QBOX_H
00003
00004
#include "UnionQBox.h"
00005
#include "JoinQBox.h"
00006
#include <vector>
00007
#include "SControlQBox.h"
00008
00009 BOREALIS_NAMESPACE_BEGIN;
00010
00011
00012
00013 class TuplesBuffer {
00014
public:
00015 static const int DEFAULT_SIZE = 1000;
00016
TuplesBuffer(size_t tuple_size);
00017
void insertTuple(
const Tuple& data);
00018 int nbTuples() {
return _cur_nb_tuples; }
00019
const Tuple
tupleAt(
int i);
00020
00021
void clear();
00022
00023
void clearAfter(
int tuple_id);
00024
00025
bool isStableBuffer();
00026
private:
00027 size_t _tuple_size;
00028
int _cur_nb_tuples;
00029
int _max_nb_tuples;
00030 ptr<dynbuf> _tuples;
00031 };
00032
00033
00034
00035
00042 class SUnionQBox :
public SControlQBox
00043 {
00044
00045
friend class SUninBoxState;
00046
00047
public:
00048
00050
static TupleDescription
createControlOutputDescription();
00051
00052
protected:
00053
virtual void setupImpl()
throw (
AuroraException);
00054
virtual void initImpl()
throw (
AuroraException);
00055
virtual void runImpl(
QBoxInvocation&)
throw (
AuroraException);
00056
void pureUnionRunImpl(
QBoxInvocation&)
throw (
AuroraException);
00057
00058
00059
void setPendingBoxState(ptr<AbstractBoxState> packed_box);
00060
bool isStableForCheckpoint();
00061 ptr<AbstractBoxState>
packState();
00062
void unpackState(ptr<AbstractBoxState> box_state);
00063
00064
private:
00065
00066
00067
static const int MAX_BUCKETS_PER_ITERATION = 5;
00068
00069
00070
static const double MAX_ALPHA = 0.9;
00071
00072
00073
00074
static const double MIN_ALPHA = 0.1;
00075
00076
00077
00078
00079
static const int SLACK = 10;
00080
00081
class Bucket;
00082
00084
void setupParameters();
00085
00087
void handleInputTuple(
const Tuple& tuple,
int stream_id);
00088
00090
void applyUndo(
const Tuple& tuple,
int stream_id,
long long interval_start);
00091
00093
void undoRedo(
long long interval_start);
00094
00096
00097
00099
00100
00102
void insertTupleIntoBucket(
const Tuple& tuple,
int stream_id,
long long interval_start);
00103
00105
bool forwardWithoutWaiting();
00106
00108
void produceOutput();
00109
bool processOutputStableBucket(Bucket& bucket,
long long interval_nb);
00110
bool processOutputUnStableBucket(Bucket& bucket,
long long interval_nb);
00111
00113
void producePunctuation(
Timestamp punctuation_time);
00114
00116
void checkpoint();
00117
00119
00120
00122
void emitTuples(Bucket& bucket, TupleType tuple_type);
00123
00125
00126
00128
00129
00131
void emitUndoTuple(
const Tuple& tuple);
00132
00133
00135
long long timestampToIntervalNb(
Timestamp timestamp);
00136
00138
Timestamp intervalNbToTimestamp(
long long interval_nb);
00139
00140
static const long MILLION = 1000000;
00141
static const long THOUSAND = 1000;
00142
00144
unsigned int _numberOfInputs;
00145
00147
int _interval;
00148
00150
int _max_delay;
00151
double _actual_delay;
00152
bool _stagger;
00153
00154
00155
struct State {
00156
public:
00158 State () : _state_value(STABLE_STATE), _unstable_start(Timestamp::now()),
00159 _unstable_interval_nb(0) {}
00160
00162
bool isStable() {
return _state_value == STABLE_STATE; }
00163
00165
bool isUnstable() {
return _state_value == UNSTABLE_STATE; }
00166
00168
bool canProcessUnstable();
00169
00171
void processingUnstableBucket(
Timestamp unstable_start,
long long interval_nb);
00172
00173
void undoing();
00174
00176
void newStableBucket();
00177
00179
bool somethingToCorrect();
00180
Timestamp correctSinceWhen();
00181
long long firstUnstableInterval();
00182
00184
void reconciling();
00185
bool isReconciling();
00186
void reconcilingRollforward();
00187
bool isReconcilingForward();
00188
00190
void reconciled();
00191
00193
void noReconciliation();
00194
00195
private:
00196
static const int STABLE_STATE = 0;
00197
static const int UNSTABLE_STATE = 1;
00198
static const int UNDONE_STATE = 2;
00199
static const int CONVERGED_STATE = 3;
00200
static const int RECONCILING_STATE_ROLLBACK = 4;
00201
static const int RECONCILING_STATE_ROLLFORWARD = 5;
00202
00203
int _state_value;
00204
Timestamp _unstable_start;
00205
long long _unstable_interval_nb;
00206
00207 };
00208 State _state;
00209
00210
00211
typedef map<int, ptr<TuplesBuffer> > TuplesBufferSet;
00212
struct Bucket {
00213
00214 Bucket(
int nb_buffers,
int tuple_size);
00215
00216 TuplesBufferSet _buffers;
00217
00218
bool _contains_unstable_tuples;
00219 map<int, bool> _punctuations;
00220
Timestamp _creation_time;
00221
00222
Timestamp getCreationTime();
00223
void clear();
00224
bool punctuated();
00225
00226
void resetStability();
00227 };
00228
00229
00230
typedef map<long long, ptr<Bucket> > Buckets;
00231
00232
00233 Buckets _buckets;
00234
long long _current_interval_nb;
00235
00237 int32 _last_stable_tuple_id;
00238
00239
00240
long long _checkpointed_interval_nb;
00241
00242
00243
Timestamp _last_punctuation_time;
00244
00245
00246
MicroBenchmarkType _benchmark;
00247
00248
bool _one_iteration_after_queues_cleared;
00249
00250
00251
bool _sort;
00252
00254
bool _should_reconcile;
00255
00256
AURORA_DECLARE_QBOX(
SUnionQBox,
"sunion");
00257 };
00258
00264 class SUnionBoxState :
public AbstractBoxState
00265 {
00266
public:
00267 SUnionBoxState() : _interval_nb(INIT) {}
00268 SUnionBoxState(
long long interval_nb) : _interval_nb(interval_nb) {}
00269 ~SUnionBoxState() {}
00270
00271 long long getIntervalNb() {
return _interval_nb; }
00272
NMSTL_SERIAL_SUBCLASS(
SUnionBoxState, AbstractBoxState, << _interval_nb);
00273
00274
private:
00275
long long _interval_nb;
00276 };
00277
00278 BOREALIS_NAMESPACE_END;
00279
00280
#endif