Main Page | Namespace List | Class Hierarchy | Class List | File List | Class Members | File Members

LRWaitForQBox Class Reference

Inherits QBox.

List of all members.

Protected Member Functions

void setupImpl () throw (AuroraException)
void initImpl () throw (AuroraException)
void runImpl (QBoxInvocation &) throw (AuroraException)
 AURORA_DECLARE_QBOX (LRWaitForQBox,"LRWaitFor")


Member Function Documentation

LRWaitForQBox::AURORA_DECLARE_QBOX LRWaitForQBox  ,
"LRWaitFor" 
[protected]
 

void LRWaitForQBox::initImpl  )  throw (AuroraException) [protected, virtual]
 

Reimplemented from QBox.

void LRWaitForQBox::runImpl QBoxInvocation inv  )  throw (AuroraException) [protected, virtual]
 

Tuples from stream 1 (CONTROL STREAM) are looked at first to update the max_keys You then go through the buffer to look for people to emit You finally read every tuple on stream 0 (data) and either emit it, or buffer it

GO THROUGH CONTROL STREAM UPDATING MAX_KEYS

GO THROUGH BUFFERS LOOKING FOR PEOPLE TO EMIT *

Now must try every valid key (from 0 to max_key inclusive) to look for every list TODO ---- THIS ASSUMES KEYS ARE NEVER NEGATIVE (FIX: Get all keys from the hash) for ( int ii = 0; ii <= max_key; ++ii ) { TuplesList &tuples = keyed_tuples[ ii ]; // The list of tuples for this key

TuplesList::iterator j = tuples.begin(); // Go through the list (if any) - all these are matches, output them!

while ( j != tuples.end() ) { outputted = true; emitTuple((*j)->getTuple()); j = tuples.erase( j ); } }

for ( MinuteHash::iterator i = _max_keys.begin(); i != _max_keys.end(); ++i ) { string key = (*i).first; // key is a groupby string int max_key = (*i).second; // this is the max minute for that group by

TuplesList& tuples = _buffers[ key ]; // the data tuples held corresponding to the current groupby TuplesList::iterator j = tuples.begin();

while ( j != tuples.end() ) { ctxt.reset(); ctxt.setTuple((*j)->getTuple());

int data_key = (*j)->getKey(); // _data_key_field->eval<int32>(ctxt);

Data if ( data_key <= max_key ) { outputted = true; emitTuple((*j)->getTuple()); j = tuples.erase( j ); } else { ++j; } } }

GO THROUGH ALL NEW DATA TUPLES AND EMIT OR BUFFER

di = deq( 0 ); max_tuples = inv.getMaxTuplesToDequeue( 0 );

while ( max_tuples-- && di.avail() ) { char* data_tuple = (char*) di.tuple();

DEBUG << "LRWaitForQBox DATA IN: " << getInputDescription(0).tupleToString( data_tuple );

ctxt.reset(); ctxt.setTuple( data_tuple ); string group_by = groupByForTupleData( ctxt ); int32 data_key = _data_key_field->eval<int32>( ctxt );

If you know a control value for this group by if ( _max_keys.find( group_by ) != _max_keys.end() ) { if ( data_key <= _max_keys[ group_by ]) // and the key val <= control val, pass { DEBUG << "DATA tuple has key value " << data_key << " <= known control value " << _max_keys[group_by] << " for group by " << group_by; outputted = true; emitTuple( data_tuple ); } else // else buffer { DEBUG << "DATA tuple key value " << data_key << " > known control value " << _max_keys[group_by] << " for group by " << group_by; DEBUG << "Buffering DATA tuple";

Buffer the tuple ptr<StoredTuple> p( new StoredTuple( data_tuple, _data_tuple_size, data_key ));

_buffers[ group_by ].push_back( p ); }

++di; continue; }

If you allow the data to provide the first control value, implicit pass and set of control value if ( _use_data_as_first_keyval ) { DEBUG << "Taking first control value of " << data_key << " from DATA tuple, and passing, for group by " << group_by;

_max_keys[ group_by ] = data_key; outputted = true; emitTuple( data_tuple ); ++di; continue; }

Try looking for a default, and key <= n, emit if ( _use_start_keyval && data_key <= _start_keyval ) { DEBUG << "Passing DATA tuple due to key value " << data_key << " <= default start key-value (for all groups) of " << _start_keyval;

outputted = true; emitTuple( data_tuple ); ++di; continue; }

Oh well! Buffer DEBUG << "Buffering DATA tuple";

Buffer the tuple ptr<StoredTuple> p( new StoredTuple( data_tuple, _data_tuple_size, data_key ));

_buffers[ group_by ].push_back( p ); ++di; }

Reimplemented from QBox.

void LRWaitForQBox::setupImpl  )  throw (AuroraException) [protected, virtual]
 

Implements QBox.


The documentation for this class was generated from the following file:
Generated on Fri Nov 12 15:15:23 2004 for Borealis by doxygen 1.3.8