Subj : Re: multi-threaded server, pthreads & sleep To : comp.unix.programmer,comp.programming.threads,comp.lang.c From : parahat Date : Wed Mar 16 2005 11:25 am Pascal Bourguignon wrote in message news:<873buvrbb6.fsf@thalassa.informatimago.com>... > parahat@gmail.com (Parahat Melayev) writes: > > > I am trying to writa a multi-client & multi-threaded TCP server. > > There is a thread pool. Each thread in the pool will handle requests > > of multiple clients. > > > > But here I have a problem. I find a solution but it is not how it must > > be... i think. When threads working without sleep(1) I can't get > > response from server but when I put sleep(1) in thread function as you > > will see in code, everything works fine and server can make echo > > nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong > > here? > > > I don't know, but instead of: > > > for(i = 0; i < MAX_CLIENT_PER_THREAD; i++) > > { > > if(threads[tid].clients[i] != 0) > > { > > n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0); > > I'd use select(2) or poll(2). That would probably avoid the problem > you're hiding with the sleep call. yes I programmed using select firstly, actually. but it is not doing what it is supposed to do, when I connect 1000 client at once. here is the code. ------ selectserver.c ----------------------- $ cc selectserver.c -o selectserver -lpthread --------------------------------------------- #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_CLIENT_PER_THREAD 100 #define MAX_THREAD 100 #define PORT 3355 /*#define DEBUG*/ #define HIGHFD MAX_CLIENT_PER_THREAD * MAX_THREAD int listenfd; int which_thread = 0; /* * FD_ZERO() * * * */ typedef struct { pthread_t tid; int tnumber; long client_count; int clients[MAX_CLIENT_PER_THREAD]; fd_set clientset; } Thread; pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER; Thread threads[MAX_THREAD]; void nonblock(int sockfd) { int opts; opts = fcntl(sockfd, F_GETFL); if(opts < 0) { perror("fcntl(F_GETFL)\n"); exit(1); } opts = (opts | O_NONBLOCK); if(fcntl(sockfd, F_SETFL, opts) < 0) { perror("fcntl(F_SETFL)\n"); exit(1); } } void *thread_init_func(void *arg) { int tid = (int) arg; int readsocks; int i; char buffer[1024]; int n; fd_set readset; int clifd; struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 1; #ifdef DEBUG printf("thread %d created\n", tid); printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients)); #endif memset( (int *) &threads[tid].clients, 0, sizeof(threads[tid].clients)); while(1) { #ifdef DEBUG printf("thread %d running, client count: %d\n", tid, threads[tid].client_count); sleep(3); #endif sleep(1); readset = threads[tid].clientset; #ifdef DEBUG printf("before select\n"); #endif readsocks = select(HIGHFD + 1, &readset, NULL, NULL, &tv); #ifdef DEBUG printf("after select\n"); #endif for(i = 0; i < threads[tid].client_count; i++) { if( threads[tid].clients[i] == 0) continue; if(FD_ISSET(threads[tid].clients[i], &readset)) { n = Readline(threads[tid].clients[i], buffer, sizeof(buffer)); if(n == 0) { #ifdef DEBUG printf("client %d closed connection 0\n", threads[tid].clients[i]); #endif FD_CLR(threads[tid].clients[i], &threads[tid].clientset); threads[tid].clients[i] = 0; threads[tid].client_count--; } else if(n < 0) { if(errno == EAGAIN) { #ifdef DEBUG printf("errno: EAGAIN\n"); #endif } else { #ifdef DEBUG printf("errno: %d\n", errno); #endif FD_CLR(threads[tid].clients[i], &threads[tid].clientset); threads[tid].clients[i] = 0; threads[tid].client_count--; #ifdef DEBUG printf("client %d closed connection -1\n", threads[tid].clients[i]); #endif } } else { #ifdef DEBUG printf("\n %d bytes received from %d - %s\n", n, threads[tid].clients[i], buffer); #endif send(threads[tid].clients[i], buffer, strlen(buffer), 0); } } } } } int choose_thread() { int i=MAX_THREAD-1; int min = 0; while(i > -1) { if(threads[i].client_count < threads[i-1].client_count) { min = i; break; } i--; } return min; } int main() { char c; struct sockaddr_in srv, cli; int clifd; int tid; int i; int choosen; if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("sockfd\n"); exit(1); } bzero(&srv, sizeof(srv)); srv.sin_family = AF_INET; srv.sin_addr.s_addr = INADDR_ANY; srv.sin_port = htons(PORT); if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0) { perror("bind\n"); exit(1); } listen(listenfd, 1024); /* create threads */ for(i = 0; i < MAX_THREAD; i++) { pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *) i); threads[i].client_count = 0; } for( ; ; ) { clifd = accept(listenfd, NULL, NULL); nonblock(clifd); pthread_mutex_lock(&new_connection_mutex); choosen = choose_thread();; for(i = 0; i < MAX_CLIENT_PER_THREAD; i++) { if(threads[choosen].clients[i] == 0) { threads[choosen].clients[i] = clifd; threads[choosen].client_count++; FD_SET(threads[choosen].clients[i], &threads[choosen].clientset); break; } } #ifdef DEBUG printf("choosen: %d\n", choosen); for(i = 0; i < MAX_THREAD; i++) { printf("threads[%d].client_count:%d\n", i, threads[i].client_count); } #endif pthread_mutex_unlock(&new_connection_mutex); } return 0; } .