Subj : Re: Win32 condition variables redux To : comp.programming.threads From : Chris Thomasson Date : Wed Aug 03 2005 04:08 pm > - The solution should be based on the Windows API without making any > assumption beyond its documentation. > - The solution should not make assumptions about the underlying > hardware. > > So what would such an implementation look like? > > Many implementations out there simply do not work or do not work well. > Others are reputed to work, but they are so complicated that I can't > decide if I can trust them or not. A quick and dirty method for windows is to force the users of your condvar to use SCHED_OTHER. ;( An event object per-thread and linked-lists for waitsets can create a "fairly efficient" and correct condvar for windows. The reason I say its efficient is because it only uses a single critical section per-condvar and a single event per-thread. You could extend it even further by removing the condvar_t::mutex member and just hash the address of a condvar_t into an index of a global array of CRITICAL_SECTION's... Something like this: #define PER_THREAD_WAITING 1 typedef struct per_thread_ { struct per_thread_ *next; struct per_thread_ *prev; LONG refs; int state; HANDLE waiter; } per_thread_t; typedef struct condvar_ { per_thread_t *front; per_thread_t *back; LONG waiters; LONG count; CRITICAL_SECTION mutex; } condvar_t; void condvar_init( condvar_t *_this ) { _this->front = 0; _this->back = 0; _this->waiters = 0; _this->count = 0; InitalizeCriticalSection( &_this->mutex ); } int condvar_destroy( condvar_t *_this ) { int err = 0; EnterCriticalSection( &_this->mutex ); if ( _this->front ) { err = EBUSY; } LeaveCriticalSection( &_this->mutex ); if ( ! err ) { DeleteCriticalSection( &_this->mutex ); if ( _this->count ) { abort(); } } return err; } int condvar_timedwait( condvar_t *_this, mutex_t *mutex, DWORD timeout ) { int err = 0; DWORD wait_ret; per_thread_t *thread = per_thread_self(); InterlockedIncrement( &thread->refs ); EnterCriticalSection( &_this->mutex ); if ( ! _this->front ) { _this->front = thread; } else { _this->back->next = thread; thread->prev = _this->back; } _this->back = thread; ++_this->count; thread->state = PER_THREAD_WAITING; InterlockedIncrement( &_this->waiters ); LeaveCriticalSection( &_this->mutex ); LeaveCriticalSection( mutex ); wait_ret = WaitForSingleObject ( thread->waiter, timeout ); if ( wait_ret != WAIT_OBJECT_0 ) { err = prv_condvar_timeout( _this, thread ); if ( wait_ret != WAIT_TIMEOUT ) { abort(); } } EnterCriticalSection( mutex ); return err; } void condvar_signal( condvar_t *_this ) { register LONG cmp, old = _this->waiters; do { cmp = old; old = InterlockedCompareExchange ( &_this->waiters, ( old > 0 ) ? old - 1 : old, cmp ); } while ( cmp != old ); if ( old ) { /* slow-path */ per_thread_t *thread; EnterCriticalSection( &_this->mutex ); thread = _this->front; if ( thread ) { prv_condvar_pop( _this, thread ); assert( thread->state == PER_THREAD_WAITING ); thread->state = 0; } LeaveCriticalSection( &_this->mutex ); if ( thread ) { if ( ! SetEvent( thread->waiter ) ) { abort(); } per_thread_release( thread ); } } } void condvar_broadcast( condvar_t *_this ) { if ( InterlockedExchange( &_this->waiters, 0 ) ) { /* slow-path */ per_thread_t *thread, *next; EnterCriticalSection( &_this->mutex ); thread = _this->front; if ( thread ) { per_thread_t *temp = thread; _this->front = 0; _this->back = 0; _this->count = 0; while ( thread ) { assert( thread->state == PER_THREAD_WAITING ); thread->state = 0; thread = thread->next; } thread = temp; } LeaveCriticalSection( &_this->mutex ); while ( thread ) { next = thread->next; assert( ! thread->state ); thread->next = 0; thread->prev = 0; if ( ! SetEvent( thread->waiter ) ) { abort(); } per_thread_release( thread ); thread = next; } } } per_thread_t* per_thread_self( void ) { per_thread_t *_this = pthread_getspecific( ... ); if ( ! _this ) { _this = malloc( sizeof( *_this ) ); if ( ! _this ) { abort(); } _this->state = 0; _this->next = 0; _this->prev = 0; _this->refs = 1; _this->waiter = CreateEvent( 0, FALSE, FALSE, 0 ); if ( ! _this->waiter ) { free( _this ); abort(); } pthread_setspecific( ..., _this ); } assert( ! _this->state && ! _this->next && ! _this->prev ); return _this; } void per_thread_release( per_thread_t *_this ) { if ( ! InterlockedDecrement( &_this->refs ) ) { if ( ! CloseHandle( _this->waitset ) ) { abort(); } free( _this ); } } void prv_per_thread_tlsdtor( void *tls ) { if ( tls ) { per_thread_release( tls ); } } void prv_condvar_pop( condvar_t *_this, per_thread_t *thread ) { per_thread_t *next = thread->next; per_thread_t *prev = thread->prev; if ( ! prev && ! next ) { _this->front = 0; _this->back = 0; } else if ( ! prev && next ) { next->prev = 0; _this->front = next; } else if ( prev && ! next ) { prev->next = 0; _this->back = prev; } else { next->prev = next; prev->next = prev; } --_this->count; thread->next = 0; thread->prev = 0; } int prv_condvar_timeout( condvar_t *_this, per_thread_t *thread ) { int err = 0, wait = 0; LONG count = 0; EnterCriticalSection( &_this->mutex ); if ( thread->state ) { prv_condvar_pop( _this, thread ); count = _this->count; thread->state = 0; err = ETIMEDOUT; } else { wait = 1; } if ( err ) { /* process cancels */ InterlockedExchange ( &_this->waiters, count ); } LeaveCriticalSection( &_this->mutex ); if ( wait ) { /* we are signaled for sure, so we need to wait on that */ if ( WaitForSingleObject ( thread->waiter, INFINITE ) != WAIT_OBJECT_0 ) { abort(); } } if ( err ) { per_thread_release( thread ); } return err; } .