On May 31, 9:13 pm, Rhamphoryncus <rha...@gmail.comwrote:
On May 30, 2:40 pm, pavel.uva...@gmail.com wrote:
Hi, everybody!
I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.
Do you have any benchmarks demonstrating the performance difference/
To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.
I started 100 remote clients and got 500 replies/s for ThreadingMixIn
and more than 1500 replies/s for ThreadPoolingMixIn. I tested it on
FreeBSD 6.2 amd64.
I'm very curious about the exactness of the number 500 for
ThreadingMixIn. It seems to be the same for various packet sizes.
I suspect there is some OS limit on thread creating rate.
Below I include a bugfixed ThreadPoolingMixIn and the benchmarking
utility. The utility can be used to start clients on localhost, though
the reply rate will be slower (around 1000 replies/s).
To start benchmarking server with localhost clients use:
python ./TestServer.py --server=threading --n-clients=100
or
python ./TestServer.py --server=threadpooling --n-clients=100
#------- ThreadPoolingMixIn.py
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
"""Mix-in class to handle requests in a thread pool.
The pool grows and thrinks depending on load.
For instance, a threadpooling TCP server class is created as
follows:
class ThreadPoolingUDPServer(ThreadPoolingMixIn, TCPServer): pass
"""
__author__ = 'Pavel Uvarov <pa**********@gmail.com>'
def init_thread_pool(self, min_workers = 5,
max_workers = 100, min_spare_workers = 5):
"""Initialize thread pool."""
self.q = Queue.Queue()
self.min_workers = min_workers
self.max_workers = max_workers
self.min_spare_workers = min_spare_workers
self.num_workers = 0
self.num_busy_workers = 0
self.workers_mutex = threading.Lock()
self.start_workers(self.min_workers)
def start_workers(self, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Thread(target = self.worker)
t.setDaemon(True)
t.start()
def worker(self):
"""A function of a working thread.
It gets a request from queue (blocking if there
are no requests) and processes it.
After processing it checks how many spare workers
are there now and if this value is greater than
self.min_spare_workers then the worker exits.
Otherwise it loops infinitely.
"""
with self.workers_mutex:
self.num_workers += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mutex:
self.num_busy_workers += 1
self.process_request_thread(request, client_address)
self.q.task_done()
with self.workers_mutex:
self.num_busy_workers -= 1
if (self.num_workers self.min_workers and
self.num_workers - self.num_busy_workers >
self.min_spare_workers):
self.num_workers -= 1
return
def process_request(self, request, client_address):
"""Puts a request into queue.
If the queue size is too large, it adds extra worker.
"""
self.q.put((request, client_address))
with self.workers_mutex:
if self.q.qsize() 3 and self.num_workers <
self.max_workers:
self.start_workers(1)
def join(self):
"""Wait for all busy threads"""
self.q.join()
#------- TestServer.py
from __future__ import with_statement
from SocketServer import *
import socket
import sys
import threading
import time
import os
from ThreadPoolingMixIn import *
class ThreadPoolingTCPServer(ThreadPoolingMixIn, TCPServer): pass
class TestServer(ThreadingTCPServer):
allow_reuse_address = True
request_queue_size = 128
def __init__(self, server_address, RequestHandlerClass,
packet_size):
TCPServer.__init__(self, server_address, RequestHandlerClass)
self.packet_size = packet_size
self.sum_t = 0
self.total_num_requests = 0
self.num_requests = 0
self.t0 = time.time()
self.lock = threading.Lock()
def reset_stats(self):
with self.lock:
self.total_num_requests += self.num_requests
self.num_requests = 0
self.sum_t = 0
self.t0 = time.time()
def update_stats(self, t0, t1):
with self.lock:
self.num_requests += 1
self.sum_t += t1 - t0
n = self.num_requests
sum_t = self.sum_t
avg_t = sum_t / n
rate = n / (t1 - self.t0)
return (n, avg_t, rate)
def handle_request(self):
"""Handle one request, possibly blocking."""
try:
request, client_address = self.get_request()
except KeyboardInterrupt:
raise
except socket.error:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except KeyboardInterrupt:
raise
except:
self.handle_error(request, client_address)
self.close_request(request)
class TestServerThreadPool(ThreadPoolingMixIn,TestServer ):
def __init__(self, server_address, RequestHandlerClass,
packet_size):
TestServer.__init__(self, server_address, RequestHandlerClass,
packet_size)
self.init_thread_pool(2, 200, 20)
class TestRequestHandler(StreamRequestHandler):
def __init__(self, request, client_address, server):
self.t0 = time.time()
StreamRequestHandler.__init__(self, request,
client_address, server)
def handle(self):
self.wfile.write('a'*(self.server.packet_size))
t1 = time.time()
(n, avg_t, rate) = self.server.update_stats(self.t0, t1)
if n % 10000 == 0:
print('rate=%.2f ' % rate)
self.server.reset_stats()
from optparse import OptionParser
def server(o):
HandlerClass = TestRequestHandler
if o.server == "threading":
ServerClass = TestServer
elif o.server == "threadpooling":
ServerClass = TestServerThreadPool
else:
return
server_address = ('', o.port)
try:
srv = ServerClass(server_address, HandlerClass,
o.packet_size)
sa = srv.socket.getsockname()
print "Serving on", sa[0], "port", sa[1], "..."
srv.serve_forever()
except Exception, val:
print "Exception: %s" % str(val)
raise
def client(o):
for f in xrange(0,o.n_clients):
if os.fork():
while True:
try:
sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
sock.connect(("localhost",o.port))
while len(sock.recv(4096)):
pass
sock.close()
except Exception, val:
print val
time.sleep(1)
if __name__ == '__main__':
args = sys.argv[1:]
usage = "usage: %prog [options]"
parser = OptionParser(usage)
parser.add_option( "-p", "--port", help="Server port",
type="int", default=8123 )
parser.add_option( "", "--n-clients", help="Number of client
forks",
type="int", default=0 )
parser.add_option( "", "--server",
help="Type of the server (threading or
threadpooling)",
type="string", default="" )
parser.add_option( "", "--packet-size", help="Packet size",
type="int", default=16*1024 )
(o,a) = parser.parse_args(args)
if os.fork() == 0:
server(o)
else:
client(o)