Subj : Unexplained race condition with POSIX threads To : comp.programming.threads,comp.sources.d From : usenet Date : Mon Mar 28 2005 03:14 pm Greetings, Reproduced below is a ludicrously simple example of multi-threading that uses the Active-Object design pattern. I have arrived at this example by adapting the real problem that I am working on and stripping all the unnecessary complexity from it. Fortunately (or unfortunately) I am able to reproduce the problem even with this ridiculous example. As you can see, I am passing the "epoch-count" to an active object, that runs its real business logic in its intrinsic thread. In the calling thread, the active object merely push the requests to an "activation list". The intrinsic thread is waiting on a condition variable. When I add an entry into the "activation list", the condition variable is signaled. At some subsequent point in time, when the active object's intrinsic thread gets scheduled, it processes the queued up requests and converts the epoch count to ASCII form. So far so simple. Now this is what happens. Every once in a while, the sample program gets permanently blocked on the condition variable, and the sample program gets stuck. @@@@ Here's what a "good" output looks like: @@@@ Displaying activation list ************************** Value: 100000, Thu Jan 1 21:46:40 1970 Value: 200000, Sat Jan 3 01:33:20 1970 Value: 400000, Mon Jan 5 09:06:40 1970 Thu Jan 1 21:46:40 1970 Sat Jan 3 01:33:20 1970 Mon Jan 5 09:06:40 1970 @@@@ Here's what a "bad" output looks like: @@@@ Thu Jan 1 21:46:40 1970 Sat Jan 3 01:33:20 1970 Mon Jan 5 09:06:40 1970 Displaying activation list ************************** I am obviously doing something silly that results in the race condition. Some help will be appreciated. The source code follows. Thanks, Bhat ////////////////////////////////////////// ////////////////// HEADER-FILE /////////// ////////////////////////////////////////// #include #include #include #include class ActiveObject { // Simple class that illustrates the active object pattern. // Clients invoke the method displayTime and pass the // "epoch-time" value. The conversion of the "epoch-time" // value to the time string is performed in the intrinsic // thread of the active object. public: ActiveObject(); virtual ~ActiveObject(); void displayTime(const time_t ts); const pthread_t& intrinsicThread(); #ifdef DEBUG void displayActivationList(); #endif // DEBUG protected: void spawn(); static void *run (void *args); virtual void waitForMessages(); void doDisplayTime(const time_t ts); // ** Attributes ** pthread_t m_thread; // Intrinsic thread pthread_mutex_t m_mutex; // Mutex pthread_cond_t m_condition; // Condition variable list m_activationList; // Activation list that queues up the // incoming requests }; ////////////////////////////////////////// ////////////////// SOURCE-FILE /////////// ////////////////////////////////////////// #include using namespace std; #include "hdr.h" ///////////////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////////////// ActiveObject::ActiveObject() { // Initialize the mutex attribute pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); attr.__mutexkind = PTHREAD_MUTEX_RECURSIVE_NP; // Initialize the mutex pthread_mutex_init(&m_mutex,&attr); // Initialize the condition variable pthread_cond_init(&m_condition, NULL); spawn(); } ///////////////////////////////////////////////////////////////// // Destructor ///////////////////////////////////////////////////////////////// ActiveObject::~ActiveObject() { char *errStr = NULL; // Lock the mutex if(pthread_mutex_lock(&m_mutex)) { cerr << "Error in locking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Remove entries from the activation list list::iterator itor; itor = m_activationList.erase(m_activationList.begin(), m_activationList.end()); // Destroy the condition variable if(pthread_cond_destroy(&m_condition)) { cerr << "Error in destroying condition variable\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Destroy the mutex if(pthread_mutex_destroy(&m_mutex)) { cerr << "Error in destroying mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } } ///////////////////////////////////////////////////////////////// // The main "business logic" method. This method does not // actually process the business logic. It merely queues // up the incoming requests for execution within the // intrinsic thread ///////////////////////////////////////////////////////////////// void ActiveObject::displayTime(const time_t ts) { char *errStr = NULL; // Lock the mutex if(pthread_mutex_lock(&m_mutex)) { cerr << "Error in locking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Store data in the activation list m_activationList.push_back(ts); // Signal the condition variable, thereby indicating a // pending request if(pthread_cond_signal(&m_condition)) { cerr << "Error signaling condition variable\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Unock the mutex if(pthread_mutex_unlock(&m_mutex)) { cerr << "Error in unlocking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } } ///////////////////////////////////////////////////////////////// // Returns the reference to the intrinsic thread (for use // in a join operation) ///////////////////////////////////////////////////////////////// const pthread_t& ActiveObject::intrinsicThread() { // Return the intrinsic thread return m_thread; } ///////////////////////////////////////////////////////////////// void ActiveObject::spawn() { char *errStr = NULL; int errcode; // Spawn the intrinsic thread. if((errcode=pthread_create(&m_thread, NULL, run, this))) { if(errcode) { cerr << "Error in spawning active object intrinsic thread\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; cerr << "\n Exiting ...\n"; exit(1); } } } ///////////////////////////////////////////////////////////////// // Starting function for the intrinsic thread ///////////////////////////////////////////////////////////////// void * ActiveObject::run (void *args) { // Cast the argument (passed as a void pointer) to an object pointer ActiveObject *thisObj = (ActiveObject *) args; // Invoke the wait loop in which the intrinsic thread processes requests thisObj->waitForMessages(); return (void *) NULL; } ///////////////////////////////////////////////////////////////// // Method that executes in the intrinsic thread in an // infinite loop and processes queued up requests ///////////////////////////////////////////////////////////////// void ActiveObject::waitForMessages() { char *errStr = NULL; // Lock the mutex if(pthread_mutex_lock(&m_mutex)) { cerr << "Error in locking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Loop for ever for(;;) { // Iterate through the activation list and process the pending requests list::iterator itor, EndOfList = m_activationList.end(); for(itor = m_activationList.begin(); itor != EndOfList; itor ++) { if(pthread_mutex_unlock(&m_mutex)) { cerr << "Error in unlocking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } doDisplayTime(*itor); if(pthread_mutex_lock(&m_mutex)) { cerr << "Error in locking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Note:- I tried getting rid of the unlock and lock // calls within this for loop. THAT DOES NOT FIX THE // PROBLEM. } // Remove entries from the activation list list::iterator retItor; retItor = m_activationList.erase(m_activationList.begin(), m_activationList.end()); // Wait on the condition variable if(pthread_cond_wait(&m_condition, &m_mutex)) { cerr << "Error waiting on condition variable\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // When we come out of the condition variable wait, the mutex is locked // for next iteration of the loop } } ///////////////////////////////////////////////////////////////// // Method that actually executes the business logic ///////////////////////////////////////////////////////////////// void ActiveObject::doDisplayTime(const time_t ts) { cout << ctime(&ts) << endl; } #ifdef DEBUG ///////////////////////////////////////////////////////////////// // Displays the activation list in which the incoming // requests are queued ///////////////////////////////////////////////////////////////// void ActiveObject::displayActivationList() { char *errStr = NULL; cout << "\n\nDisplaying activation list\n" << "**************************\n"; // Lock the mutex if(pthread_mutex_lock(&m_mutex)) { cerr << "Error in locking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } // Iterate through the activation list and display the contents list::iterator itor, EndOfList = m_activationList.end(); for(itor = m_activationList.begin(); itor != EndOfList; itor ++) { time_t tstamp = *itor; cout << "Value: " << tstamp << ", " << ctime(&tstamp); } // Unock the mutex if(pthread_mutex_unlock(&m_mutex)) { cerr << "Error in unlocking mutex\n"; errStr = strerror(errno); if(errStr) cerr << "ERROR: " << errStr << endl; else cerr << " Errno: " << errno << endl; } cout << "\n\n" << endl; } #endif // DEBUG ////////////////////////////////////////// ////////////////// DRIVER-FILE /////////// ////////////////////////////////////////// #include using namespace std; #include "hdr.h" /////////////////////////////////////////////////////////////////// // Starting function for the thread in which the active object // is exercised /////////////////////////////////////////////////////////////////// void* testActiveObj(void * args) { ActiveObject *ao = static_cast(args); ao->displayTime(100000); ao->displayTime(200000); ao->displayTime(400000); ao->displayActivationList(); } ///////////////////////////////////////////////////////////////////// // Main function ////////////////////////////////////////////////////////////////////// main() { ActiveObject aobj; pthread_t pt; pthread_create(&pt, NULL, testActiveObj, &aobj); pthread_join(aobj.intrinsicThread(), NULL); pthread_join(pt, NULL); } .