473,287 Members | 1,927 Online
Bytes | Software Development & Data Engineering Community
Post Job

Home Posts Topics Members FAQ

Join Bytes to post your question to a community of 473,287 software developers and data experts.

ThreadPoolingMixIn

Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Is it worth including in SocketServer.py?
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
"""Mix-in class to handle requests in a thread
pool.

The pool grows and thrinks depending on
load.

For instance, a threading UDP server class is created as
follows:

class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
pass

"""
__author__ = 'Pavel Uvarov <pa**********@gmail.com>'

def init_thread_pool(self, min_workers = 5,
max_workers = 100, min_spare_workers = 5):
"""Initialize thread pool."""
self.q = Queue.Queue()
self.min_workers = min_workers
self.max_workers = max_workers
self.min_spare_workers = min_spare_workers
self.num_workers = 0
self.num_busy_workers = 0
self.workers_mutex = threading.Lock()
self.start_workers(self.min_workers)

def start_workers(self, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Thread(target = self.worker)
t.setDaemon(True)
t.start()

def worker(self):
"""A function of a working
thread.

It gets a request from queue (blocking if
there
are no requests) and processes
it.

After processing it checks how many spare
workers
are there now and if this value is greater
than
self.min_spare_workers then the worker
exits.
Otherwise it loops
infinitely.

"""
with self.workers_mutex:
self.num_workers += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mutex:
self.num_busy_workers += 1
self.process_request_thread(request, client_address)
self.q.task_done()
with self.workers_mutex:
self.num_busy_workers -= 1
if self.num_workers - self.num_busy_workers \
self.min_spare_workers:
self.num_workers -= 1
return

def process_request(self, request, client_address):
"""Puts a request into
queue.

If the queue size is too large, it adds extra
worker.

"""
self.q.put((request, client_address))
with self.workers_mutex:
if self.q.qsize() 3 and self.num_workers <
self.max_workers:
self.start_workers(1)

def join(self):
"""Wait for all busy threads"""
self.q.join()
Jun 27 '08 #1
9 2699
On May 30, 2:40 pm, pavel.uva...@gmail.com wrote:
Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.
Do you have any benchmarks demonstrating the performance difference/

t.setDaemon(True)
Unfortunately, shutdown with daemon threads is fairly buggy. :/
Jun 27 '08 #2
On 30 Mag, 22:40, pavel.uva...@gmail.com wrote:
Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Is it worth including in SocketServer.py?

from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
* * """Mix-in class to handle requests in a thread
pool.

* * The pool grows and thrinks depending on
load.

* * For instance, a threading UDP server class is created as
follows:

* * class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
pass

* * """
* * __author__ = 'Pavel Uvarov <pavel.uva...@gmail.com>'

* * def init_thread_pool(self, min_workers = 5,
* * * * * * * * * * * * *max_workers = 100, min_spare_workers = 5):
* * * * """Initialize thread pool."""
* * * * self.q = Queue.Queue()
* * * * self.min_workers = min_workers
* * * * self.max_workers = max_workers
* * * * self.min_spare_workers = min_spare_workers
* * * * self.num_workers = 0
* * * * self.num_busy_workers = 0
* * * * self.workers_mutex = threading.Lock()
* * * * self.start_workers(self.min_workers)

* * def start_workers(self, n):
* * * * """Start n workers."""
* * * * for i in xrange(n):
* * * * * * t = threading.Thread(target = self.worker)
* * * * * * t.setDaemon(True)
* * * * * * t.start()

* * def worker(self):
* * * * """A function of a working
thread.

* * * * It gets a request from queue (blocking if
there
* * * * are no requests) and processes
it.

* * * * After processing it checks how many spare
workers
* * * * are there now and if this value is greater
than
* * * * self.min_spare_workers then the worker
exits.
* * * * Otherwise it loops
infinitely.

* * * * """
* * * * with self.workers_mutex:
* * * * * * self.num_workers += 1
* * * * while True:
* * * * * * (request, client_address) = self.q.get()
* * * * * * with self.workers_mutex:
* * * * * * * * self.num_busy_workers += 1
* * * * * * self.process_request_thread(request, client_address)
* * * * * * self.q.task_done()
* * * * * * with self.workers_mutex:
* * * * * * * * self.num_busy_workers -= 1
* * * * * * * * if self.num_workers - self.num_busy_workers \
* * * * * * * * * * * * self.min_spare_workers:
* * * * * * * * * * self.num_workers -= 1
* * * * * * * * * * return

* * def process_request(self, request, client_address):
* * * * """Puts a request into
queue.

* * * * If the queue size is too large, it adds extra
worker.

* * * * """
* * * * self.q.put((request, client_address))
* * * * with self.workers_mutex:
* * * * * * if self.q.qsize() 3 and self.num_workers <
self.max_workers:
* * * * * * * * self.start_workers(1)

* * def join(self):
* * * * """Wait for all busy threads"""
* * * * self.q.join()
This is not the right place to discuss about such a thing.
Post this same message on python-dev ml or, even better, open a new
ticket on the bug tracker attaching the patch and, most important, a
benchmark demonstrating the speed improvement.

--- Giampaolo
http://code.google.com/p/pyftpdlib/
Jun 27 '08 #3
On May 31, 1:40 pm, "Giampaolo Rodola'" <gne...@gmail.comwrote:
On 30 Mag, 22:40, pavel.uva...@gmail.com wrote:
Hi, everybody!
I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.
Is it worth including in SocketServer.py?
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
"""Mix-in class to handle requests in a thread
pool.
The pool grows and thrinks depending on
load.
For instance, a threading UDP server class is created as
follows:
class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
pass
"""
__author__ = 'Pavel Uvarov <pavel.uva...@gmail.com>'
def init_thread_pool(self, min_workers = 5,
max_workers = 100, min_spare_workers = 5):
"""Initialize thread pool."""
self.q = Queue.Queue()
self.min_workers = min_workers
self.max_workers = max_workers
self.min_spare_workers = min_spare_workers
self.num_workers = 0
self.num_busy_workers = 0
self.workers_mutex = threading.Lock()
self.start_workers(self.min_workers)
def start_workers(self, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Thread(target = self.worker)
t.setDaemon(True)
t.start()
def worker(self):
"""A function of a working
thread.
It gets a request from queue (blocking if
there
are no requests) and processes
it.
After processing it checks how many spare
workers
are there now and if this value is greater
than
self.min_spare_workers then the worker
exits.
Otherwise it loops
infinitely.
"""
with self.workers_mutex:
self.num_workers += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mutex:
self.num_busy_workers += 1
self.process_request_thread(request, client_address)
self.q.task_done()
with self.workers_mutex:
self.num_busy_workers -= 1
if self.num_workers - self.num_busy_workers \
self.min_spare_workers:
self.num_workers -= 1
return
def process_request(self, request, client_address):
"""Puts a request into
queue.
If the queue size is too large, it adds extra
worker.
"""
self.q.put((request, client_address))
with self.workers_mutex:
if self.q.qsize() 3 and self.num_workers <
self.max_workers:
self.start_workers(1)
def join(self):
"""Wait for all busy threads"""
self.q.join()

This is not the right place to discuss about such a thing.
Post this same message on python-dev ml or, even better, open a new
ticket on the bug tracker attaching the patch and, most important, a
benchmark demonstrating the speed improvement.
It's perfectly acceptable to discuss ideas here before bringing them
up on python-ideas, and then python-dev.
Jun 27 '08 #4
On May 31, 9:13 pm, Rhamphoryncus <rha...@gmail.comwrote:
On May 30, 2:40 pm, pavel.uva...@gmail.com wrote:
Hi, everybody!
I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Do you have any benchmarks demonstrating the performance difference/
To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

I started 100 remote clients and got 500 replies/s for ThreadingMixIn
and more than 1500 replies/s for ThreadPoolingMixIn. I tested it on
FreeBSD 6.2 amd64.

I'm very curious about the exactness of the number 500 for
ThreadingMixIn. It seems to be the same for various packet sizes.
I suspect there is some OS limit on thread creating rate.

Below I include a bugfixed ThreadPoolingMixIn and the benchmarking
utility. The utility can be used to start clients on localhost, though
the reply rate will be slower (around 1000 replies/s).

To start benchmarking server with localhost clients use:
python ./TestServer.py --server=threading --n-clients=100
or
python ./TestServer.py --server=threadpooling --n-clients=100
#------- ThreadPoolingMixIn.py
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
"""Mix-in class to handle requests in a thread pool.

The pool grows and thrinks depending on load.

For instance, a threadpooling TCP server class is created as
follows:

class ThreadPoolingUDPServer(ThreadPoolingMixIn, TCPServer): pass
"""
__author__ = 'Pavel Uvarov <pa**********@gmail.com>'

def init_thread_pool(self, min_workers = 5,
max_workers = 100, min_spare_workers = 5):
"""Initialize thread pool."""
self.q = Queue.Queue()
self.min_workers = min_workers
self.max_workers = max_workers
self.min_spare_workers = min_spare_workers
self.num_workers = 0
self.num_busy_workers = 0
self.workers_mutex = threading.Lock()
self.start_workers(self.min_workers)

def start_workers(self, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Thread(target = self.worker)
t.setDaemon(True)
t.start()

def worker(self):
"""A function of a working thread.

It gets a request from queue (blocking if there
are no requests) and processes it.

After processing it checks how many spare workers
are there now and if this value is greater than
self.min_spare_workers then the worker exits.
Otherwise it loops infinitely.

"""
with self.workers_mutex:
self.num_workers += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mutex:
self.num_busy_workers += 1
self.process_request_thread(request, client_address)
self.q.task_done()
with self.workers_mutex:
self.num_busy_workers -= 1
if (self.num_workers self.min_workers and
self.num_workers - self.num_busy_workers >
self.min_spare_workers):
self.num_workers -= 1
return

def process_request(self, request, client_address):
"""Puts a request into queue.

If the queue size is too large, it adds extra worker.

"""
self.q.put((request, client_address))
with self.workers_mutex:
if self.q.qsize() 3 and self.num_workers <
self.max_workers:
self.start_workers(1)

def join(self):
"""Wait for all busy threads"""
self.q.join()

#------- TestServer.py
from __future__ import with_statement
from SocketServer import *
import socket
import sys
import threading
import time
import os
from ThreadPoolingMixIn import *

class ThreadPoolingTCPServer(ThreadPoolingMixIn, TCPServer): pass

class TestServer(ThreadingTCPServer):

allow_reuse_address = True
request_queue_size = 128

def __init__(self, server_address, RequestHandlerClass,
packet_size):
TCPServer.__init__(self, server_address, RequestHandlerClass)
self.packet_size = packet_size
self.sum_t = 0
self.total_num_requests = 0
self.num_requests = 0
self.t0 = time.time()
self.lock = threading.Lock()

def reset_stats(self):
with self.lock:
self.total_num_requests += self.num_requests
self.num_requests = 0
self.sum_t = 0
self.t0 = time.time()

def update_stats(self, t0, t1):
with self.lock:
self.num_requests += 1
self.sum_t += t1 - t0
n = self.num_requests
sum_t = self.sum_t
avg_t = sum_t / n
rate = n / (t1 - self.t0)
return (n, avg_t, rate)

def handle_request(self):
"""Handle one request, possibly blocking."""
try:
request, client_address = self.get_request()
except KeyboardInterrupt:
raise
except socket.error:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except KeyboardInterrupt:
raise
except:
self.handle_error(request, client_address)
self.close_request(request)

class TestServerThreadPool(ThreadPoolingMixIn,TestServer ):
def __init__(self, server_address, RequestHandlerClass,
packet_size):
TestServer.__init__(self, server_address, RequestHandlerClass,
packet_size)
self.init_thread_pool(2, 200, 20)

class TestRequestHandler(StreamRequestHandler):

def __init__(self, request, client_address, server):
self.t0 = time.time()
StreamRequestHandler.__init__(self, request,
client_address, server)

def handle(self):
self.wfile.write('a'*(self.server.packet_size))

t1 = time.time()
(n, avg_t, rate) = self.server.update_stats(self.t0, t1)
if n % 10000 == 0:
print('rate=%.2f ' % rate)
self.server.reset_stats()

from optparse import OptionParser

def server(o):
HandlerClass = TestRequestHandler

if o.server == "threading":
ServerClass = TestServer
elif o.server == "threadpooling":
ServerClass = TestServerThreadPool
else:
return

server_address = ('', o.port)
try:
srv = ServerClass(server_address, HandlerClass,
o.packet_size)

sa = srv.socket.getsockname()
print "Serving on", sa[0], "port", sa[1], "..."
srv.serve_forever()
except Exception, val:
print "Exception: %s" % str(val)
raise

def client(o):
for f in xrange(0,o.n_clients):
if os.fork():
while True:
try:
sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
sock.connect(("localhost",o.port))
while len(sock.recv(4096)):
pass
sock.close()
except Exception, val:
print val
time.sleep(1)

if __name__ == '__main__':
args = sys.argv[1:]
usage = "usage: %prog [options]"
parser = OptionParser(usage)
parser.add_option( "-p", "--port", help="Server port",
type="int", default=8123 )
parser.add_option( "", "--n-clients", help="Number of client
forks",
type="int", default=0 )
parser.add_option( "", "--server",
help="Type of the server (threading or
threadpooling)",
type="string", default="" )
parser.add_option( "", "--packet-size", help="Packet size",
type="int", default=16*1024 )
(o,a) = parser.parse_args(args)

if os.fork() == 0:
server(o)
else:
client(o)
Jun 27 '08 #5
On Jun 2, 7:09 pm, pavel.uva...@gmail.com wrote:
On May 31, 9:13 pm, Rhamphoryncus <rha...@gmail.comwrote:
On May 30, 2:40 pm, pavel.uva...@gmail.com wrote:
Hi, everybody!
I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.
Do you have any benchmarks demonstrating the performance difference/

To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

I started 100 remote clients and got 500 replies/s for ThreadingMixIn
and more than 1500 replies/s for ThreadPoolingMixIn. I tested it on
FreeBSD 6.2 amd64.

I'm very curious about the exactness of the number 500 for
ThreadingMixIn. It seems to be the same for various packet sizes.
I suspect there is some OS limit on thread creating rate.

Below I include a bugfixed ThreadPoolingMixIn and the benchmarking
utility. The utility can be used to start clients on localhost, though
the reply rate will be slower (around 1000 replies/s).

To start benchmarking server with localhost clients use:
python ./TestServer.py --server=threading --n-clients=100
or
python ./TestServer.py --server=threadpooling --n-clients=100
I've just tested it on a linux box and got a 240 replies/s vs 2000
replies/s, that is 8x performance improvement.
Jun 27 '08 #6
pa**********@gmail.com wrote:
To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.
Just a general note: When benchmarking such a network service it would
be valuable to see benchmark results for several data sizes. I'd expect
better numbers for a ThreadPoolingMixIn when there are more requests
with smaller data size.

Ciao, Michael.
Jun 27 '08 #7
On Jun 2, 7:15 pm, Michael Ströder <mich...@stroeder.comwrote:
pavel.uva...@gmail.com wrote:
To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

Just a general note: When benchmarking such a network service it would
be valuable to see benchmark results for several data sizes. I'd expect
better numbers for a ThreadPoolingMixIn when there are more requests
with smaller data size.

Ciao, Michael.
Here are benchmarks for FreeBSD 6.2, amd64

packet_size x y
0 499.57 1114.54
1024 499.29 1130.02
3072 500.09 1119.14
7168 498.20 1111.76
15360 499.29 1086.73
31744 500.04 1036.46
64512 499.43 939.60
130048 499.28 737.44
261120 498.04 499.03
523264 307.54 312.04
1047552 173.57 185.32
2096128 93.61 94.39

x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s
Jun 27 '08 #8
On Jun 2, 12:41*pm, pavel.uva...@gmail.com wrote:
On Jun 2, 7:15 pm, Michael Ströder <mich...@stroeder.comwrote:
>
Here are benchmarks for FreeBSD 6.2, amd64

packet_size * * * * x * * * * y
* * * * * 0 * *499.57 * 1114.54
* * * *1024 * *499.29 * 1130.02
* * * *3072 * *500.09 * 1119.14
* * * *7168 * *498.20 * 1111.76
* * * 15360 * *499.29 * 1086.73
* * * 31744 * *500.04 * 1036.46
* * * 64512 * *499.43 * *939.60
* * *130048 * *499.28 * *737.44
* * *261120 * *498.04 * *499.03
* * *523264 * *307.54 * *312.04
* * 1047552 * *173.57 * *185.32
* * 2096128 * * 93.61 * * 94.39

x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s
Well, I'd say you've got yourself a winner. Performance (at least on
FreeBSD) seems as good or better for your ThreadPoolingMixin than
ThreadingMixin. Is this with the default values of min=5 and max=5
worker threads?
Jun 27 '08 #9
On Jun 3, 1:19 am, miller.pau...@gmail.com wrote:
On Jun 2, 12:41 pm, pavel.uva...@gmail.com wrote:
On Jun 2, 7:15 pm, Michael Ströder <mich...@stroeder.comwrote:
Here are benchmarks for FreeBSD 6.2, amd64
packet_size x y
0 499.57 1114.54
1024 499.29 1130.02
3072 500.09 1119.14
7168 498.20 1111.76
15360 499.29 1086.73
31744 500.04 1036.46
64512 499.43 939.60
130048 499.28 737.44
261120 498.04 499.03
523264 307.54 312.04
1047552 173.57 185.32
2096128 93.61 94.39
x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s

Well, I'd say you've got yourself a winner. Performance (at least on
FreeBSD) seems as good or better for your ThreadPoolingMixin than
ThreadingMixin. Is this with the default values of min=5 and max=5
worker threads?
No, I initialized thread pool with min_threads=2, max_threads=200 and
min_spare_threads=20.

For Linux (2.6.22, amd64) I got even more dramatic improvement:

packet_size x y
0 249.97 2014.63
1024 249.98 1782.83
3072 240.09 1859.00
7168 249.98 1900.61
15360 249.98 1787.30
31744 238.96 1808.17
64512 249.85 1561.47
130048 237.26 1213.26
261120 249.98 841.96
523264 249.97 595.40
1047552 236.40 351.96
2096128 216.26 218.15

x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s
Jun 27 '08 #10

This thread has been closed and replies have been disabled. Please start a new discussion.

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.