By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
459,282 Members | 1,549 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 459,282 IT Pros & Developers. It's quick & easy.

multi-threaded server, pthreads & sleep

P: n/a
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?

thanks,
parahat
------------ server.c -------------
$ cc server.c -o server -lpthread
-----------------------------------
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
pthread_t tid;
int client_count;
int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;
Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}

void *thread_init_func(void *arg)
{
int tid = (int) arg;

int readsocks;
int i;
char buffer[MAX_CLIENT_BUFFER];
char c;
int n;
#ifdef DEBUG
printf("thread %d created\n", tid);
printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
memset((int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));
memset((char *) &buffer, 0, sizeof(buffer));
while(1)
{
#ifdef DEBUG
printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
sleep(3);
#endif
sleep(1); /* <-- it works ??? :-| */

for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients[i] != 0)
{
n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);
if(n == 0)
{
#ifdef DEBUG
printf("client %d closed connection 0\n",
threads[tid].clients[i]);
#endif
threads[tid].clients[i] = 0;
threads[tid].client_count--;
memset((char *) &buffer, 0, strlen(buffer));
}
else if(n < 0)
{
if(errno == EAGAIN)
{
#ifdef DEBUG
printf("errno: EAGAIN\n");
#endif
}
else {
#ifdef DEBUG
printf("errno: %d\n", errno);
#endif
threads[tid].clients[i] = 0;
threads[tid].client_count--;
memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
printf("client %d closed connection -1\n",
threads[tid].clients[i]);
#endif
}
}
else {
#ifdef DEBUG
printf("%d bytes received from %d - %s\n", n,
threads[tid].clients[i], buffer);
#endif

send(threads[tid].clients[i], buffer, strlen(buffer), 0);
memset((char *) &buffer, 0, strlen(buffer));
}
}
}
}
}

int choose_thread()
{
int i=MAX_THREAD-1;
int min = 0;
while(i > -1)
{
if(threads[i].client_count < threads[i-1].client_count)
{
min = i;
break;
}
i--;
}
return min;
}

int main()
{
char c;
struct sockaddr_in srv, cli;
int clifd;
int tid;
int i;
int choosen;

signal(SIGPIPE, SIG_IGN);

if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("sockfd\n");
exit(1);
}
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_addr.s_addr = INADDR_ANY;
srv.sin_port = htons(PORT);

if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
perror("bind\n");
exit(1);
}

listen(listenfd, 1024);
/* create threads */
for(i = 0; i < MAX_THREAD; i++)
{
pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *)
i);
threads[i].client_count = 0;
}
for( ; ; )
{
clifd = accept(listenfd, NULL, NULL);
nonblock(clifd);

pthread_mutex_lock(&new_connection_mutex);

choosen = choose_thread();

for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[choosen].clients[i] == 0)
{
#ifdef DEBUG
printf("before threads clifd\n");
#endif
threads[choosen].clients[i] = clifd;
#ifdef DEBUG
printf("after threads clifd\n");
#endif
threads[choosen].client_count++;
break;
}
}

#ifdef DEBUG
printf("choosen: %d\n", choosen);

for(i = 0; i < MAX_THREAD; i++)
{
printf("threads[%d].client_count:%d\n", i,
threads[i].client_count);
}
#endif

pthread_mutex_unlock(&new_connection_mutex);
}

if(errno)
{
printf("errno: %d", errno);
}

return 0;
}
================================================== =========

--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#define MAX_CLIENT 1000
/*#define DEBUG*/

int PORT = 3355;
char *IP = "192.168.222.22";

enum {
U_INT_SIZE = sizeof(unsigned int)
};

int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;

void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}
void cli_con()
{
struct sockaddr_in srv;
struct hostent *h_name;
int clifd;
int n;

bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &srv.sin_addr);

if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("ERROR clifd\n");
exit(1);
}

if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
printf("ERROR connect\n");
exit(1);
}

nonblock(clifd);
for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
{
if(list[n] == 0) {
#ifdef DEBUG
printf("connected %d\n", clifd);
#endif
list[n] = clifd;
clifd = -1;
}
}

if(clifd != -1) {
printf("list FULL");
}
}

void set_list() {
int n;
FD_ZERO(&fdset);

for(n = 0; n < MAX_CLIENT; n++)
{
if(list[n] != 0) {
FD_SET(list[n], &fdset);
if(list[n] > highfd)
highfd = list[n];
}
}
}

void recv_data(int num)
{
int n;
char buffer[1024];
n = recv(list[num], buffer, 1023, 0);
if(n > 0)
{
#ifdef DEBUG
printf("%d bytes from %d\n", n, list[num]);
printf("%s", buffer);
#endif
}
else if(n < 0)
{
if(errno != EAGAIN)
printf("client %d disconnected\n", list[num]);
}
else if(n == 0)
{
printf("client %d disconnected\n", list[num]);
}
}
void send_data(int num)
{
int n;
char buffer[1024];
sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
{
#ifdef DEBUG
printf("%d bytes sent from %d\n", n, list[num]);
#endif
}
}

void scan_clients()
{
#ifdef DEBUG
printf("scan_clients\n");
#endif
int num;

for(num = 0; num < MAX_CLIENT; num++) {
if(FD_ISSET(list[num], &fdset))
recv_data(num);
}

for(num = 0; num < MAX_CLIENT; num++)
send_data(num);
}

int main()
{
int readsocks;
int i=0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10;

while(i < MAX_CLIENT)
{
cli_con();
i++;
}
i=0;
while(i < MAX_CLIENT)
{
printf("list[%d] = %d\n",i,list[i]);
i++;
}
i = 0;
while(1)
{
set_list();
readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
if(readsocks < 0) {
perror("select\n");
exit(1);
}

/*printf("else scan_clients\n");*/
scan_clients();
#ifdef DEBUG
sleep(4);
#endif
}
return 0;
}
Nov 14 '05 #1
Share this Question
Share on Google+
5 Replies


P: n/a
pa*****@gmail.com (Parahat Melayev) writes:
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?

I don't know, but instead of:
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients[i] != 0)
{
n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);


I'd use select(2) or poll(2). That would probably avoid the problem
you're hiding with the sleep call.

--
__Pascal Bourguignon__ http://www.informatimago.com/
Cats meow out of angst
"Thumbs! If only we had thumbs!
We could break so much!"
Nov 14 '05 #2

P: n/a
Pascal Bourguignon <sp**@mouse-potato.com> wrote in message news:<87************@thalassa.informatimago.com>.. .
pa*****@gmail.com (Parahat Melayev) writes:
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?

I don't know, but instead of:
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients[i] != 0)
{
n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);


I'd use select(2) or poll(2). That would probably avoid the problem
you're hiding with the sleep call.

yes I programmed using select firstly, actually. but it is not doing
what it is supposed to do, when I connect 1000 client at once.

here is the code.

------ selectserver.c -----------------------
$ cc selectserver.c -o selectserver -lpthread
---------------------------------------------

#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 100
#define MAX_THREAD 100
#define PORT 3355
/*#define DEBUG*/
#define HIGHFD MAX_CLIENT_PER_THREAD * MAX_THREAD
int listenfd;
int which_thread = 0;
/*
* FD_ZERO()
*
*
*
*/

typedef struct {
pthread_t tid;
int tnumber;
long client_count;
int clients[MAX_CLIENT_PER_THREAD];
fd_set clientset;
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;
Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}

void *thread_init_func(void *arg)
{
int tid = (int) arg;
int readsocks;
int i;
char buffer[1024];
int n;
fd_set readset;
int clifd;

struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 1;
#ifdef DEBUG
printf("thread %d created\n", tid);
printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
memset( (int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));

while(1)
{
#ifdef DEBUG
printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
sleep(3);
#endif
sleep(1);
readset = threads[tid].clientset;
#ifdef DEBUG
printf("before select\n");
#endif
readsocks = select(HIGHFD + 1, &readset, NULL, NULL, &tv);
#ifdef DEBUG
printf("after select\n");
#endif
for(i = 0; i < threads[tid].client_count; i++)
{
if( threads[tid].clients[i] == 0)
continue;
if(FD_ISSET(threads[tid].clients[i], &readset))
{
n = Readline(threads[tid].clients[i], buffer, sizeof(buffer));
if(n == 0)
{
#ifdef DEBUG
printf("client %d closed connection 0\n",
threads[tid].clients[i]);
#endif
FD_CLR(threads[tid].clients[i], &threads[tid].clientset);
threads[tid].clients[i] = 0;
threads[tid].client_count--;
}
else if(n < 0)
{
if(errno == EAGAIN)
{
#ifdef DEBUG
printf("errno: EAGAIN\n");
#endif
}
else {
#ifdef DEBUG
printf("errno: %d\n", errno);
#endif
FD_CLR(threads[tid].clients[i], &threads[tid].clientset);
threads[tid].clients[i] = 0;
threads[tid].client_count--;
#ifdef DEBUG
printf("client %d closed connection -1\n",
threads[tid].clients[i]);
#endif
}
}
else {
#ifdef DEBUG
printf("\n %d bytes received from %d - %s\n", n,
threads[tid].clients[i], buffer);
#endif
send(threads[tid].clients[i], buffer, strlen(buffer), 0);
}

}
}
}
}

int choose_thread()
{
int i=MAX_THREAD-1;
int min = 0;
while(i > -1)
{
if(threads[i].client_count < threads[i-1].client_count)
{
min = i;
break;
}
i--;
}
return min;
}

int main()
{
char c;
struct sockaddr_in srv, cli;
int clifd;
int tid;
int i;
int choosen;

if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("sockfd\n");
exit(1);
}
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_addr.s_addr = INADDR_ANY;
srv.sin_port = htons(PORT);

if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
perror("bind\n");
exit(1);
}

listen(listenfd, 1024);
/* create threads */
for(i = 0; i < MAX_THREAD; i++)
{
pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *)
i);
threads[i].client_count = 0;
}
for( ; ; )
{
clifd = accept(listenfd, NULL, NULL);
nonblock(clifd);

pthread_mutex_lock(&new_connection_mutex);

choosen = choose_thread();;

for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[choosen].clients[i] == 0)
{
threads[choosen].clients[i] = clifd;
threads[choosen].client_count++;
FD_SET(threads[choosen].clients[i], &threads[choosen].clientset);
break;
}
}

#ifdef DEBUG
printf("choosen: %d\n", choosen);

for(i = 0; i < MAX_THREAD; i++)
{
printf("threads[%d].client_count:%d\n", i,
threads[i].client_count);
}
#endif

pthread_mutex_unlock(&new_connection_mutex);
}

return 0;
}
Nov 14 '05 #3

P: n/a

"Parahat Melayev" <pa*****@gmail.com> wrote in message
news:5b**************************@posting.google.c om...
yes I programmed using select firstly, actually. but it is not doing
what it is supposed to do, when I connect 1000 client at once.


You're going to have to get more sophisticated if you want to handle
more than a few hundred concurrent connections. You'll at minimum need to
use 'poll' instead of 'select'. You may need to raise resource limits also.
It's hard to know exactly what your issue is because "it is not doing what
it is supposed to do" is a very vague description of the problem.

DS
Nov 14 '05 #4

P: n/a
Parahat Melayev <pa*****@gmail.com> scribbled the following
on comp.lang.c:
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.


Your code depends inherently on system-specific extensions and is thus
completely off-topic on comp.lang.c. Please stop crossposting to
comp.lang.c, thanks.

--
/-- Joona Palaste (pa*****@cc.helsinki.fi) ------------- Finland --------\
\-------------------------------------------------------- rules! --------/
"I am looking for myself. Have you seen me somewhere?"
- Anon
Nov 14 '05 #5

P: n/a
Parahat Melayev wrote:
.... snip ...
here is the code.

------ selectserver.c -----------------------
$ cc selectserver.c -o selectserver -lpthread
---------------------------------------------

#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>


Of these, only stdio.h, string.h, signal.h and errno.h are
legitimate standard C headers, at least in this newsgroup. So why
are you cluttering c.l.c up with nearly 400 lines of non-standard
and off topic code? Find a newsgroup that deals with your system.
F'ups set.

--
"If you want to post a followup via groups.google.com, don't use
the broken "Reply" link at the bottom of the article. Click on
"show options" at the top of the article, then click on the
"Reply" at the bottom of the article headers." - Keith Thompson
Nov 14 '05 #6

This discussion thread is closed

Replies have been disabled for this discussion.