Subj : lockless low-overhead 'pipes' (w/semaphores) To : comp.programming.threads From : lindahlb Date : Tue Apr 19 2005 01:46 pm Below is a pseudo-implementation of lockless pipes that will block in the kernel of a given OS. It removes the overhead of a system call per path and only makes a semaphore call when absolutely necessary. I assume some implemenation of an atomic get+add operation, based on a set or a single instruction (ideally, the i486+ instruction, xadd - with a lock prefix when suitable). Error checking has been removed for simplicity - the trivial code should be a bit test before 'acquire' and a bit set and 'release' on error. Assume a single reader and a single writer, as there are obvious race conditions otherwise (with respect to 'i'). Assume that the kernel limits the semaphore value to semaphore::max_value and will not overflow it (possibly throwing an out-of-range exception/error, which is legitimately ignored in this implementation). ------------------ shared data ------------------ char* buffer; // ring buffer (read only) int maxlen; // size allocated for buffer (read only) int len; // amount of data in buffer (read/write) semaphore consume; // kernel primitive (implicit membars) semaphore produce; // kernel primitive (implicit membars) ------------------ producer function ------------------ produce(char* data, int dsize) { static int i = 0; while(len + dsize >= maxlen) consume.acquire(semaphore::max_value); // block here memcpy(buffer+i, data, dsize); // assume maxlen aligned on dsize i = (i + dsize) % maxlen; // atomically adds dsize to len and returns the original value of len (xadd for intel) if (atomic_get_add(len, dsize) == 0) produce.release(semaphore::max_value); // ignore possible out-of-range exception } ------------------ consumer process ------------------ int i; while(forever) { while(len == 0) produce.acquire(semaphore::max_value); // block here int n = len; do_something_with_data(buffer+i, n); // handles wrap-around i = (i + n) % maxlen; // atomically subtract n from len and return the original value of len (xadd for intel) if (atomic_get_sub(len, n) == maxlen) consume.release(semaphore::max_value); // ignore possible out-of-range exception } --------------- Conclusion --------------- Looking briefly, it appears that there may be a possible deadlock or race condition, but after exhaustively going through the algorithm's paths on paper, I was unable to come up with anything conclusive. Are my assumptions correct in assuming that it is free of deadlocks and race conditions? Granted, we could spin once in the while loop (which would happen very rarely and is not critical). .