00001
#ifndef CLOSABLEFIFOQUEUE_H
00002
#define CLOSABLEFIFOQUEUE_H
00003
00004
#include "Exceptions.h"
00005
#include "PtMutex.h"
00006
#include "PtCondition.h"
00007
#include "LockHolder.h"
00008
#include "BinarySem.h"
00009
00010
#include <queue>
00011
#include <assert.h>
00012
#include <iostream>
00013
#include <string>
00014
00015 BOREALIS_NAMESPACE_BEGIN;
00016
00017
template<
typename T>
00018 class ClosableFifoQueue
00019 {
00020
public:
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
ClosableFifoQueue(
PtCondition *pNonEmptyCond,
00033
BinarySem * pNonEmptyBinarySem,
00034
BinarySem * pClosedAndEmpty,
00035 string name =
"(No name given)")
00036
throw (exception);
00037
00038
virtual ~ClosableFifoQueue();
00039
00040
00041
00042
00043
00044
00045
void close()
00046
throw (exception);
00047
00048
00049
void enqueue(
const T &newValue)
00050
throw (exception,
00051 AuroraClosedException);
00052
00053
00054 T
dequeueBlocking()
00055
throw (exception,
00056 AuroraClosedException);
00057
00058
00059
00060
00061
bool dequeueNonBlocking(T &value)
00062
throw (exception,
00063 AuroraClosedException);
00064
00065
00066
unsigned int size()
00067
throw (exception);
00068
00069
bool isEmpty()
00070
throw (exception);
00071
00072
bool isClosed()
00073
throw (exception);
00074
private:
00075
00076
ClosableFifoQueue(
const ClosableFifoQueue &rhs) {assert(
false);}
00077
ClosableFifoQueue & operator =(
const ClosableFifoQueue &rhs) {assert(
false);}
00078
00079
bool _closed;
00080 queue<T> _dataqueue;
00081
00082
PtMutex _mutex;
00083
PtCondition _cond;
00084
PtCondition * _pNonEmptyCond;
00085
BinarySem * _pNonEmptyBinarySem;
00086
BinarySem * _pClosedAndEmpty;
00087 string _name;
00088 };
00089
00090
00091
00092
00093 template<
typename T>
ClosableFifoQueue<T>::ClosableFifoQueue(
PtCondition *pNonEmptyCond,
00094
BinarySem * pNonEmptyBinarySem,
00095
BinarySem * pClosedAndEmpty,
00096 string name)
00097
throw (exception) :
00098 _closed(
false),
00099 _pNonEmptyCond(pNonEmptyCond),
00100 _pNonEmptyBinarySem(pNonEmptyBinarySem),
00101 _pClosedAndEmpty(pClosedAndEmpty),
00102 _name(name)
00103 {
00104 }
00105
00106
00107
00108 template<
typename T>
ClosableFifoQueue<T>::~ClosableFifoQueue()
00109 {
00110 }
00111
00112
00113
00114 template<
typename T>
void ClosableFifoQueue<T>::enqueue(
const T &newValue)
00115
throw (exception,
00116 AuroraClosedException)
00117
00118 {
00119
LockHolder holder(_mutex);
00120
00121
if (_closed)
00122 {
00123
Throw(AuroraClosedException,
"ClosableFifoQueue is closed.");
00124 }
00125
00126 _dataqueue.push(newValue);
00127
00128
if (_dataqueue.size() == 1)
00129 {
00130
if (_pNonEmptyCond != NULL)
00131 {
00132 _pNonEmptyCond->broadcast();
00133 }
00134
00135
if (_pNonEmptyBinarySem != NULL)
00136 {
00137
00138 _pNonEmptyBinarySem->post();
00139 }
00140 }
00141
00142 _cond.broadcast();
00143 }
00144
00145
00146
00147 template<
typename T>
void ClosableFifoQueue<T>::close()
00148 throw (exception)
00149 {
00150
LockHolder holder(_mutex);
00151
00152 _closed =
true;
00153 _cond.
broadcast();
00154
00155
if ((_pClosedAndEmpty != NULL) && _dataqueue.empty())
00156 {
00157
00158 _pClosedAndEmpty->
post();
00159 }
00160 }
00161
00162
00163
00164 template<
typename T> T
ClosableFifoQueue<T>::dequeueBlocking()
00165 throw (AuroraClosedException,
00166 exception)
00167 {
00168
LockHolder holder(_mutex);
00169
00170
while (
true)
00171 {
00172
if (_dataqueue.empty() && _closed)
00173 {
00174
Throw(AuroraClosedException,
"ClosableFifoQueue is closed and empty.");
00175 }
00176
00177
if (! _dataqueue.empty())
00178 {
00179
00180 T returnVal = _dataqueue.front();
00181 _dataqueue.pop();
00182
00183
if ((_pClosedAndEmpty != NULL) && _dataqueue.empty() && _closed)
00184 {
00185
00186
00187 _pClosedAndEmpty->
post();
00188 }
00189
00190 _cond.
broadcast();
00191
return returnVal;
00192 }
00193
00194 _mutex.
waitCond(_cond);
00195 }
00196 }
00197
00198
00199
00200
00201
00202
00203
00204 template<
typename T>
bool ClosableFifoQueue<T>::dequeueNonBlocking(T &value)
00205
throw (exception,
00206 AuroraClosedException)
00207 {
00208
LockHolder holder(_mutex);
00209
00210
if (_closed)
00211 {
00212
Throw(AuroraClosedException,
"ClosableFifoQueue is closed and empty.");
00213 }
00214
00215
if (_dataqueue.empty())
00216 {
00217
return false;
00218 }
00219
else
00220 {
00221 value = _dataqueue.front();
00222 _dataqueue.pop();
00223
00224
if ((_pClosedAndEmpty != NULL) && _dataqueue.empty() && _closed)
00225 {
00226
00227
00228 _pClosedAndEmpty->post();
00229 }
00230
00231 _cond.broadcast();
00232
return true;
00233 }
00234 }
00235
00236
00237
00238 template<
typename T>
unsigned int ClosableFifoQueue<T>::size()
00239 throw (exception)
00240 {
00241
LockHolder holder(_mutex);
00242
bool returnVal = _dataqueue.size();
00243
return returnVal;
00244 }
00245
00246
00247
00248 template<
typename T>
bool ClosableFifoQueue<T>::isEmpty()
00249 throw (exception)
00250 {
00251
LockHolder holder(_mutex);
00252
bool returnVal = _dataqueue.empty();
00253
return returnVal;
00254 }
00255
00256
00257
00258 template<
typename T>
bool ClosableFifoQueue<T>::isClosed()
00259 throw (exception)
00260 {
00261
LockHolder holder(_mutex);
00262
return _closed;
00263 }
00264
00265
00266 BOREALIS_NAMESPACE_END;
00267
00268
#endif