Logo Search packages:      
Sourcecode: darkice version File versions

unsigned int MultiThreadedConnector::transfer ( unsigned long  bytes,
unsigned int  bufSize,
unsigned int  sec,
unsigned int  usec 
) throw ( Exception ) [virtual]

Transfer a given amount of data from the Source to all the Sinks attached. If an attached Sink closes or encounteres an error during the process, it is detached and the function carries on with the rest of the Sinks. If no Sinks remain, or an error is encountered with the Source, the function returns prematurely.

Parameters:
bytes the amount of data to transfer, in bytes. If 0, transfer forever.
bufSize the size of the buffer to use for transfering. This amount of data is read from the Source and written to each Sink on each turn.
sec the number of seconds to wait for the Source to have data available in each turn, and the number of seconds to wait for the Sinks to accept data.
usec the number of micros seconds to wait for the Source to have data available in each turn, and the number of micro seconds to wait for the Sinks to accept data.
Returns:
the number of bytes read from the Source.
Exceptions:
Exception 

Reimplemented from Connector.

Definition at line 208 of file MultiThreadedConnector.cpp.

References condProduce, dataBuffer, dataSize, MultiThreadedConnector::ThreadData::isDone, mutexProduce, Connector::numSinks, Reporter::reportEvent(), Connector::source, and threads.

{   
    unsigned int        b;

    if ( numSinks == 0 ) {
        return 0;
    }

    if ( bufSize == 0 ) {
        return 0;
    }

    dataBuffer   = new unsigned char[bufSize];
    dataSize     = 0;

    reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes);

    for ( b = 0; !bytes || b < bytes; ) {
        if ( source->canRead( sec, usec) ) {
            unsigned int        i;

            pthread_mutex_lock( &mutexProduce);
            dataSize = source->read( dataBuffer, bufSize);
            b       += dataSize;

            // check for EOF
            if ( dataSize == 0 ) {
                reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
                pthread_mutex_unlock( &mutexProduce);
                break;
            }

            for ( i = 0; i < numSinks; ++i ) {
                threads[i].isDone = false;
            }

            // tell sink threads that there is some data available
            pthread_cond_broadcast( &condProduce);

            // wait for all sink threads to get done with this data
            while ( true ) {
                for ( i = 0; i < numSinks && threads[i].isDone; ++i );
                if ( i == numSinks ) {
                    break;
                }
                pthread_cond_wait( &condProduce, &mutexProduce);
            }
            pthread_mutex_unlock( &mutexProduce);
        } else {
            reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
            break;
        }
    }

    delete[] dataBuffer;
    return b;
}


Generated by  Doxygen 1.6.0   Back to index