Protected Member Functions | |
void | setupImpl () throw (AuroraException) |
void | initImpl () throw (AuroraException) |
void | runImpl (QBoxInvocation &) throw (AuroraException) |
AURORA_DECLARE_QBOX (LRWaitForQBox,"LRWaitFor") |
|
|
|
Reimplemented from QBox.
|
|
Tuples from stream 1 (CONTROL STREAM) are looked at first to update the max_keys You then go through the buffer to look for people to emit You finally read every tuple on stream 0 (data) and either emit it, or buffer it GO THROUGH CONTROL STREAM UPDATING MAX_KEYS GO THROUGH BUFFERS LOOKING FOR PEOPLE TO EMIT * Now must try every valid key (from 0 to max_key inclusive) to look for every list TODO ---- THIS ASSUMES KEYS ARE NEVER NEGATIVE (FIX: Get all keys from the hash) for ( int ii = 0; ii <= max_key; ++ii ) { TuplesList &tuples = keyed_tuples[ ii ]; // The list of tuples for this key TuplesList::iterator j = tuples.begin(); // Go through the list (if any) - all these are matches, output them! while ( j != tuples.end() ) { outputted = true; emitTuple((*j)->getTuple()); j = tuples.erase( j ); } } for ( MinuteHash::iterator i = _max_keys.begin(); i != _max_keys.end(); ++i ) { string key = (*i).first; // key is a groupby string int max_key = (*i).second; // this is the max minute for that group by TuplesList& tuples = _buffers[ key ]; // the data tuples held corresponding to the current groupby TuplesList::iterator j = tuples.begin(); while ( j != tuples.end() ) { ctxt.reset(); ctxt.setTuple((*j)->getTuple()); int data_key = (*j)->getKey(); // _data_key_field->eval<int32>(ctxt); Data if ( data_key <= max_key ) { outputted = true; emitTuple((*j)->getTuple()); j = tuples.erase( j ); } else { ++j; } } } GO THROUGH ALL NEW DATA TUPLES AND EMIT OR BUFFER di = deq( 0 ); max_tuples = inv.getMaxTuplesToDequeue( 0 ); while ( max_tuples-- && di.avail() ) { char* data_tuple = (char*) di.tuple(); DEBUG << "LRWaitForQBox DATA IN: " << getInputDescription(0).tupleToString( data_tuple ); ctxt.reset(); ctxt.setTuple( data_tuple ); string group_by = groupByForTupleData( ctxt ); int32 data_key = _data_key_field->eval<int32>( ctxt ); If you know a control value for this group by if ( _max_keys.find( group_by ) != _max_keys.end() ) { if ( data_key <= _max_keys[ group_by ]) // and the key val <= control val, pass { DEBUG << "DATA tuple has key value " << data_key << " <= known control value " << _max_keys[group_by] << " for group by " << group_by; outputted = true; emitTuple( data_tuple ); } else // else buffer { DEBUG << "DATA tuple key value " << data_key << " > known control value " << _max_keys[group_by] << " for group by " << group_by; DEBUG << "Buffering DATA tuple"; Buffer the tuple ptr<StoredTuple> p( new StoredTuple( data_tuple, _data_tuple_size, data_key )); _buffers[ group_by ].push_back( p ); } ++di; continue; } If you allow the data to provide the first control value, implicit pass and set of control value if ( _use_data_as_first_keyval ) { DEBUG << "Taking first control value of " << data_key << " from DATA tuple, and passing, for group by " << group_by; _max_keys[ group_by ] = data_key; outputted = true; emitTuple( data_tuple ); ++di; continue; } Try looking for a default, and key <= n, emit if ( _use_start_keyval && data_key <= _start_keyval ) { DEBUG << "Passing DATA tuple due to key value " << data_key << " <= default start key-value (for all groups) of " << _start_keyval; outputted = true; emitTuple( data_tuple ); ++di; continue; } Oh well! Buffer DEBUG << "Buffering DATA tuple"; Buffer the tuple ptr<StoredTuple> p( new StoredTuple( data_tuple, _data_tuple_size, data_key )); _buffers[ group_by ].push_back( p ); ++di; } Reimplemented from QBox.
|
|
Implements QBox.
|