00001
#ifndef PSEUDO_SCHEDULER_H
00002
#define PSEUDO_SCHEDULER_H
00003
00004
#include "TupleQueue.h"
00005
#include "Scheduler.h"
00006
00007 BOREALIS_NAMESPACE_BEGIN;
00008
00009
class QBox;
00010
00013 class PseudoScheduler :
public Scheduler {
00014
public:
00015
PseudoScheduler();
00016
00017
void start();
00018
void shutdown();
00019
void scheduleExclusiveTask(ptr<SchedulerTask>);
00020
void topologyChanged();
00021
void drain();
00022
00023
00024
void removeFromSchedule(
QBox* b);
00025
00026
00027 vector<string>
_disabled_boxes;
00028
00029
private:
00030
struct MyBoxTupleQueueListener :
public TupleQueueListener {
00031
void notify(
const TupleQueue&);
00032
00033
PseudoScheduler *_ps;
00034 };
00035
friend struct MyBoxTupleQueueListener;
00036
00037
struct MyInputTupleQueueListener :
public TupleQueueListener {
00038
void notify(
const TupleQueue&);
00039
00040
PseudoScheduler *_ps;
00041 };
00042
friend struct MyInputTupleQueueListener;
00043
00044
bool _die;
00045
bool _draining;
00046
00047
00048 pthread_t _ps_thread;
00049 pthread_mutex_t *_wait_mutex;
00050 pthread_cond_t *_wait_cond;
00051
00052
00053
00054
bool _use_shadow_boxmap;
00055
bool _use_shadow_qlength;
00056
unsigned long _last_boxcost_update;
00057
unsigned long _last_qlength_update;
00058
unsigned long _last_irate_update;
00059
00060
00061 set<QBox*> _setup_boxes;
00062
00063
00064
typedef vector<TupleQueueListener*> TQListeners;
00065 TQListeners _tqlisteners;
00066
00067
PtMutex _ps_lock;
00068
PtCondition _ps_condition;
00069 set<QBox*> _ext_boxes_to_run;
00070
bool _perf_stats;
00071
00072 set<QBox*> _boxes_to_run;
00073
00074
00075
00080
void invalidateBoxes( vector<string> boxNames );
00081
00089
void updateListeners( vector<string> boxNames,
bool add );
00090
00094
void chokeSubNetwork( vector<string> boxNames );
00095
00099
void suspendSubNetwork( vector<string> boxNames );
00100
00105
void resumeSubNetwork( vector<string> boxNames );
00106
00111
void drainSubNetwork( vector<string> boxNames );
00112
00113
00114
00115
00116
double use_cpu_time(
double time_to_use );
00117
00118
00119
00120
virtual void init_sched_stats(
int windowSize,
int historySize );
00121
00122
virtual string
to_string();
00123
00124
typedef vector<ptr<SchedulerTask> > Tasks;
00125 Tasks _tasks;
00126
00127
void setupListeners();
00128
00129
static void *launch(
void *pthis) { ((
PseudoScheduler*)pthis)->run();
return 0; }
00130
void run();
00131
00132
AURORA_DECLARE_SCHEDULER(
PseudoScheduler);
00133 };
00134
00135 BOREALIS_NAMESPACE_END;
00136
00137
#endif