00001
#ifndef PRIORITY_SCHEDULER_H
00002
#define PRIORITY_SCHEDULER_H
00003
00004
#include "TupleQueue.h"
00005
#include "Scheduler.h"
00006
#include <queue>
00007
00008 BOREALIS_NAMESPACE_BEGIN;
00009
00010
class QBox;
00011
00014 class PriorityScheduler :
public Scheduler {
00015
public:
00016
PriorityScheduler();
00017
00018
void start();
00019
void shutdown();
00020
void scheduleExclusiveTask(ptr<SchedulerTask>);
00021
void topologyChanged();
00022
void drain();
00023
00024
00025 vector<string>
_disabled_boxes;
00026
00027
private:
00028
00029
00030
struct MyBoxData :
public BoxData {
00031 MyBoxData(
QBox& box);
00032
00033
QBox *_box;
00034
int _priority;
00035
int _weight;
00036
bool _scheduled;
00037
bool _in_pending;
00038
int _running_time;
00039 };
00040
struct PriorityLevel {
00041 PriorityLevel() : _scheduled(false) {}
00042
00043 vector<MyBoxData*> _boxes_next;
00044 vector<MyBoxData*> _boxes_to_run;
00045
bool _scheduled;
00046 };
00047 map<int, PriorityLevel> _schedules;
00048
typedef priority_queue<int, vector<int>, greater<int> > PrioritiesToRun;
00049 PrioritiesToRun _priorities_to_run;
00050
00051
void scheduleBox(MyBoxData&);
00052
void schedulePending();
00053
bool checkRep() const;
00054
00055 MyBoxData& getMyBoxData(
QBox& box)
const {
00056 MyBoxData *data = static_cast<MyBoxData*>(
Scheduler::getBoxData(box));
00057 ASSERT(data);
00058
return *data;
00059 }
00060
00061
bool _die;
00062
bool _draining;
00063
00064 pthread_t _ps_thread;
00065
00066
virtual void invalidateBoxes( vector<string> boxNames );
00067
00068
virtual void updateListeners( vector<string> boxNames,
bool add );
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
virtual void init_sched_stats(
int windowSize,
int historySize );
00079
00080
virtual string
to_string();
00081
00082
struct MyBoxTupleQueueListener :
public TupleQueueListener {
00083
void notify(
const TupleQueue&);
00084
00085
PriorityScheduler *_ps;
00086 };
00087
friend struct MyBoxTupleQueueListener;
00088
00089
struct MyInputTupleQueueListener :
public TupleQueueListener {
00090
void notify(
const TupleQueue&);
00091
00092
PriorityScheduler *_ps;
00093 };
00094
friend struct MyInputTupleQueueListener;
00095
00096
00097 set<QBox*> _setup_boxes;
00098
00099
typedef vector<ptr<TupleQueueListener> > TQListeners;
00100 TQListeners _tqlisteners;
00101
00102
PtMutex _ps_lock;
00103
PtCondition _ps_condition;
00104
00105 vector<MyBoxData*> _ext_boxes_to_run;
00106
00107
typedef vector<ptr<SchedulerTask> > Tasks;
00108 Tasks _tasks;
00109
00110
void setupListeners();
00111
00112
static void *launch(
void *pthis) { ((
PriorityScheduler*)pthis)->run();
return 0; }
00113
void run();
00114
00115
AURORA_DECLARE_SCHEDULER(
PriorityScheduler);
00116 };
00117
00118 BOREALIS_NAMESPACE_END;
00119
00120
#endif