00001
#ifndef AURORA_QBOXSTATE_H
00002
#define AURORA_QBOXSTATE_H
00003
00004
#include <vector>
00005
#include <map>
00006
#include <NMSTL/serial>
00007
#include <NMSTL/iserial>
00008
#include <NMSTL/oserial>
00009
00010
#include "Stream.h"
00011
#include "TupleQueue.h"
00012
#include "BoxPackage.h"
00013
00014 BOREALIS_NAMESPACE_BEGIN;
00015
00016 class QueueState :
public AbstractQueueState
00017 {
00018
public:
00019 QueueState(){}
00020
00021 QueueState(
const QueueState& qs) { _serialized_queues = qs._serialized_queues; }
00022
00023 QueueState(vector<ptr<TupleQueue> > queues) {
00024
for(vector<ptr<TupleQueue> >::iterator i = queues.begin(); i != queues.end(); i++) {
00025
Stream* stream = i->get()->getStream();
00026
00027
00028
TupleQueue::SeekIterator dq = i->get()->seek_iterator();
00029
00030
unsigned int tuple_size = i->get()->getTupleSize();
00031
unsigned int packed_tuples = 0;
00032
00033 DEBUG <<
"Queue for stream " << stream->
getName() <<
" has "
00034 << i->get()->size() <<
" tuples "
00035 <<
" of size " << i->get()->getTupleSize();
00036
00037 OSerialString serial_queue(OSerial::binary|OSerial::nosignature);
00038
00039
while(dq.
avail()) {
00040
const void* data = dq.
tuple();
00041 string tuple((
const char*) data, tuple_size);
00042 serial_queue << tuple;
00043 ++dq;
00044 ++packed_tuples;
00045 }
00046
00047 _serialized_queues[stream->
getName()] = serial_queue.str();
00048
00049 DEBUG <<
"Packed " << packed_tuples <<
" tuples for stream: " << stream->
getName();
00050 }
00051 }
00052
00053 ~QueueState() {}
00054
00055
NMSTL_SERIAL_SUBCLASS(
QueueState, AbstractQueueState, );
00056 };
00057
00058 BOREALIS_NAMESPACE_END;
00059
00060
#endif