Logo Search packages:      
Sourcecode: darkice version File versions

MultiThreadedConnector.cpp

/*------------------------------------------------------------------------------

   Copyright (c) 2000 Tyrell Corporation. All rights reserved.

   Tyrell DarkIce

   File     : MultiThreadedConnector.cpp
   Version  : $Revision: 1.4 $
   Author   : $Author: darkeye $
   Location : $Source: /cvsroot/darkice/darkice/src/MultiThreadedConnector.cpp,v $
   
   Copyright notice:

    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License  
    as published by the Free Software Foundation; either version 2
    of the License, or (at your option) any later version.
   
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
    GNU General Public License for more details.
   
    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.

------------------------------------------------------------------------------*/

/* ============================================================ include files */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#else
#error need sys/types.h
#endif


#include "Exception.h"
#include "MultiThreadedConnector.h"


/* ===================================================  local data structures */


/* ================================================  local constants & macros */

/*------------------------------------------------------------------------------
 *  File identity
 *----------------------------------------------------------------------------*/
static const char fileid[] = "$Id: MultiThreadedConnector.cpp,v 1.4 2004/01/07 13:18:17 darkeye Exp $";


/* ===============================================  local function prototypes */


/* =============================================================  module code */

/*------------------------------------------------------------------------------
 *  Initialize the object
 *----------------------------------------------------------------------------*/
void
00067 MultiThreadedConnector :: init ( void )                 throw ( Exception )
{
    pthread_mutex_init( &mutexProduce, 0);
    pthread_cond_init( &condProduce, 0);
    threads = 0;
}


/*------------------------------------------------------------------------------
 *  De-initialize the object
 *----------------------------------------------------------------------------*/
void
00079 MultiThreadedConnector :: strip ( void )                throw ( Exception )
{
    if ( threads ) {
        delete[] threads;
        threads = 0;
    }

    pthread_cond_destroy( &condProduce);
    pthread_mutex_destroy( &mutexProduce);
}


/*------------------------------------------------------------------------------
 *  Constructor
 *----------------------------------------------------------------------------*/
00094 MultiThreadedConnector :: MultiThreadedConnector (
                                const MultiThreadedConnector &   connector )
                                                            throw ( Exception )
            : Connector( connector)
{
    mutexProduce    = connector.mutexProduce;
    condProduce     = connector.condProduce;

    if ( threads ) {
        delete[] threads;
    }
    threads = new ThreadData[numSinks];
    for ( unsigned int  i = 0; i < numSinks; ++i ) {
        threads[i] = connector.threads[i];
    }
}


/*------------------------------------------------------------------------------
 *  Assignment operator
 *----------------------------------------------------------------------------*/
MultiThreadedConnector &
00116 MultiThreadedConnector :: operator= ( const MultiThreadedConnector & connector )
                                                            throw ( Exception )
{
    if ( this != &connector ) {
        Connector::operator=( connector);

        mutexProduce    = connector.mutexProduce;
        condProduce     = connector.condProduce;

        if ( threads ) {
            delete[] threads;
        }
        threads = new ThreadData[numSinks];
        for ( unsigned int  i = 0; i < numSinks; ++i ) {
            threads[i] = connector.threads[i];
        }
    }

    return *this;
}


/*------------------------------------------------------------------------------
 *  Open the source and all the sinks if needed
 *  Create the sink threads
 *----------------------------------------------------------------------------*/
bool
00143 MultiThreadedConnector :: open ( void )                     throw ( Exception )
{
    unsigned int        i;
    size_t              st;

    if ( !Connector::open() ) {
        return false;
    }

    running = true;

    pthread_attr_init( &threadAttr);
    pthread_attr_getstacksize(&threadAttr, &st);
    if (st < 128 * 1024) {
        reportEvent( 5, "MultiThreadedConnector :: open, stack size ",
                        (long)st);
        st = 128 * 1024;
        pthread_attr_setstacksize(&threadAttr, st);
    }
    pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_JOINABLE);

    threads = new ThreadData[numSinks];
    for ( i = 0; i < numSinks; ++i ) {
        ThreadData    * threadData = threads + i;

        threadData->connector = this;
        threadData->ixSink    = i;
        threadData->accepting = true;
        threadData->isDone    = true;
        if ( pthread_create( &(threadData->thread),
                             &threadAttr,
                             ThreadData::threadFunction,
                             threadData ) ) {
            break;
        }
    }

    // if could not create all, delete the ones created
    if ( i < numSinks ) {
        unsigned int    j;

        // signal to stop for all running threads
        pthread_mutex_lock( &mutexProduce);
        running = false;
        pthread_cond_broadcast( &condProduce);
        pthread_mutex_unlock( &mutexProduce);

        for ( j = 0; j < i; ++j ) {
            pthread_join( threads[j].thread, 0);
        }

        delete[] threads;
        threads = 0;

        return false;
    }

    return true;
}


/*------------------------------------------------------------------------------
 *  Transfer some data from the source to the sink
 *----------------------------------------------------------------------------*/
unsigned int
00208 MultiThreadedConnector :: transfer ( unsigned long       bytes,
                                     unsigned int        bufSize,
                                     unsigned int        sec,
                                     unsigned int        usec )
                                                            throw ( Exception )
{   
    unsigned int        b;

    if ( numSinks == 0 ) {
        return 0;
    }

    if ( bufSize == 0 ) {
        return 0;
    }

    dataBuffer   = new unsigned char[bufSize];
    dataSize     = 0;

    reportEvent( 6, "MultiThreadedConnector :: tranfer, bytes", bytes);

    for ( b = 0; !bytes || b < bytes; ) {
        if ( source->canRead( sec, usec) ) {
            unsigned int        i;

            pthread_mutex_lock( &mutexProduce);
            dataSize = source->read( dataBuffer, bufSize);
            b       += dataSize;

            // check for EOF
            if ( dataSize == 0 ) {
                reportEvent( 3, "MultiThreadedConnector :: transfer, EOF");
                pthread_mutex_unlock( &mutexProduce);
                break;
            }

            for ( i = 0; i < numSinks; ++i ) {
                threads[i].isDone = false;
            }

            // tell sink threads that there is some data available
            pthread_cond_broadcast( &condProduce);

            // wait for all sink threads to get done with this data
            while ( true ) {
                for ( i = 0; i < numSinks && threads[i].isDone; ++i );
                if ( i == numSinks ) {
                    break;
                }
                pthread_cond_wait( &condProduce, &mutexProduce);
            }
            pthread_mutex_unlock( &mutexProduce);
        } else {
            reportEvent( 3, "MultiThreadedConnector :: transfer, can't read");
            break;
        }
    }

    delete[] dataBuffer;
    return b;
}


/*------------------------------------------------------------------------------
 *  The function for each thread.
 *  Read the presented data
 *----------------------------------------------------------------------------*/
void
00276 MultiThreadedConnector :: sinkThread( int       ixSink )
{
    ThreadData    * threadData = &threads[ixSink];
    Sink          * sink       = sinks[ixSink].get();

    while ( running ) {
        // wait for some data to become available
        pthread_mutex_lock( &mutexProduce);
        while ( running && threadData->isDone ) {
            pthread_cond_wait( &condProduce, &mutexProduce);
        }
        if ( !running ) {
            pthread_mutex_unlock( &mutexProduce);
            break;
        }

        if ( threadData->accepting ) {
            if ( sink->canWrite( 0, 0) ) {
                try {
                    sink->write( dataBuffer, dataSize);
                } catch ( Exception     & e ) {
                    // something wrong. don't accept more data, try to
                    // reopen the sink next time around
                    threadData->accepting = false;
                }
            } else {
                reportEvent( 4,
                            "MultiThreadedConnector :: sinkThread can't write ",
                             ixSink);
                // don't care if we can't write
            }
        }
        threadData->isDone = true;
        pthread_cond_broadcast( &condProduce);
        pthread_mutex_unlock( &mutexProduce);

        if ( !threadData->accepting ) {
            // if we're not accepting, try to reopen the sink
            try {
                sink->close();
                sink->open();
                threadData->accepting = sink->isOpen();
            } catch ( Exception   & e ) {
                // don't care, just try and try again
            }
        }
    }
}


/*------------------------------------------------------------------------------
 *  Stop the treads
 *  Close the source and all the sinks if needed
 *----------------------------------------------------------------------------*/
void
00331 MultiThreadedConnector :: close ( void )                    throw ( Exception )
{
    unsigned int    i;

    // signal to stop for all threads
    pthread_mutex_lock( &mutexProduce);
    running = false;
    pthread_cond_broadcast( &condProduce);
    pthread_mutex_unlock( &mutexProduce);

    // wait for all the threads to finish
    for ( i = 0; i < numSinks; ++i ) {
        pthread_join( threads[i].thread, 0);
    }
    pthread_attr_destroy( &threadAttr);

    Connector::close();
}


/*------------------------------------------------------------------------------
 *  The thread function
 *----------------------------------------------------------------------------*/
void *
00355 MultiThreadedConnector :: ThreadData :: threadFunction( void  * param )
{
    ThreadData     * threadData = (ThreadData*) param;
    threadData->connector->sinkThread( threadData->ixSink);

    return 0;
}


/*------------------------------------------------------------------------------
 
  $Source: /cvsroot/darkice/darkice/src/MultiThreadedConnector.cpp,v $

  $Log: MultiThreadedConnector.cpp,v $
  Revision 1.4  2004/01/07 13:18:17  darkeye
  commited patch sent by John Hay, fixing FreeBSD problems

  Revision 1.3  2002/10/20 20:43:17  darkeye
  more graceful reconnect

  Revision 1.2  2002/10/19 13:35:21  darkeye
  when a connection is dropped, DarkIce tries to reconnect, indefinitely
  removed extreme event reporting for thread-related events

  Revision 1.1  2002/10/19 12:25:47  darkeye
  changed internals so that now each encoding/server connection is
  a separate thread


  
------------------------------------------------------------------------------*/


Generated by  Doxygen 1.6.0   Back to index