473,387 Members | 1,516 Online
Bytes | Software Development & Data Engineering Community
Post Job

Home Posts Topics Members FAQ

Join Bytes to post your question to a community of 473,387 software developers and data experts.

Multithreaded class with queues

Hi!

I wrote a little class to make multihreading easier. It's based on one
of aahz's threading example scripts. What it does:

It spawns up number of CollectorThreads and one ProcessThread. The
CollectorThreads listen on one queue (inputqueue), read, process the
data (with colfunc), put the result onto the outputqueue. The
ProcessThread listens on the outputqueue, reads, processes (with
prfunc). end. (more details in the attached file)

it seems to work with test functions but when I use a network-intensive
function (snmp-queries) it just gets slower with maxThreads set to more
than 1.

Any help?
Thanks.
see the class attached.
ps. Maybe I basically don't understand something...

--
--arutz

#!/usr/local/bin/python

"""
Multithreaded class for the task: multiple collector - one dataprocessor

Usage:

collector = Collector(data, colfunc, prfunc, maxThreads)
collector.run()

Internals:

Collector spawns up the CollectorThreads and a ProcessThread and puts
the data onto the inputQueue. The CollectorThread reads the data from
inputQueue and processes it through 'colfunc()'. Then puts
the result onto the outputQueue. The ProcessThread only listens on the
outputQueue (blocks on it) and feeds the data to `prfunc()`.

Thread shutdown: collectorthreads: inputQueue.put(shutdown=True)
processthread: outputQueue.put(shutdown=True)
"""

import threading
import Queue
#from operator import truth as _truth

#def _xor(a,b):
# return _truth(a) ^ _truth(b)

class _Token:
def __init__(self, data=None, shutdown=None):
#if not _xor(data, shutdown):
# raise "Tsk, tsk, need to set either URL or shutdown (not both)"
self.data = data
self.shutdown = shutdown

class _CollectorThread(threading.Thread):
"""Worker thread blocking on inputQueue.
The result goes to outputQueue after processed by self.func.
"""

def __init__(self, inQueue, outQueue, func):
threading.Thread.__init__(self)
self.inQ = inQueue
self.outQ = outQueue
self.func = func

def run(self):
while True:
token = self.inQ.get()
if token.shutdown is not None:
break
else:
#collect data from the routers
#print token.data
result = self.func(token.data)
self.outQ.put_nowait(_Token(data=result))

class _ProcessThread(threading.Thread):
"""'Reader-only' thread processing outputQueue."""

def __init__(self, outQueue, func):
threading.Thread.__init__(self)
self.outQ = outQueue
self.func = func

def run(self):
while True:
token = self.outQ.get()
if token.shutdown is not None:
break
else:
#insert into db or do anything
self.func(token.data)

class Collector:
"""Spawns up the threadpool (worker and processthreads)
and puts tha data onto the inputQueue of the worker threads.
Then shuts them down."""

def __init__(self, data, colfunc, prfunc, maxThreads=5):
"""Parameters:
- data: data for collectfunc (type of sequence)
- colfunc: function to process inputQueue into outputQueue
- prfunc: function to process outputQueue
- maxThreads: MAX_THREADS
"""

self.data = data
self.inputQueue = Queue.Queue()
self.outputQueue = Queue.Queue()
self.threadPool = []

#Start the worker threads
for i in range(maxThreads):
collector = _CollectorThread(self.inputQueue,
self.outputQueue,
colfunc)
collector.start()
self.threadPool.append(collector)

#Start the db thread
self.processthread = _ProcessThread(self.outputQueue, prfunc)
self.processthread.start()

def run(self):
"""Queue the data and shutdown the threads."""
self._queueData()
self._shutdown()

def _queueData(self):
"""Put data onto the inputQueue."""
for d in self.data:
self.inputQueue.put_nowait(_Token(data=d))

def _shutdown(self):
for i in self.threadPool:
self.inputQueue.put(_Token(shutdown=True))
for thread in self.threadPool:
thread.join()
self.outputQueue.put(_Token(shutdown=True))
self.processthread.join()

if __name__ == '__main__':
def myprint(s):
print s

def hashdata(a):
return a + ': OK'

MAX_THREADS = 5

data = ['1', '2', 'asd', 'qwe']
collect = Collector(data=data, colfunc=hashdata, prfunc=myprint, maxThreads=MAX_THREADS)
collect.run()
Jul 22 '05 #1
1 1629
In article <ma***************************************@python. org>,
Antal Rutz <ar***@mimoza.pantel.net> wrote:

I wrote a little class to make multihreading easier. It's based on one
of aahz's threading example scripts. What it does:
Thanks!
It spawns up number of CollectorThreads and one ProcessThread. The
CollectorThreads listen on one queue (inputqueue), read, process the
data (with colfunc), put the result onto the outputqueue. The
ProcessThread listens on the outputqueue, reads, processes (with
prfunc). end. (more details in the attached file)

it seems to work with test functions but when I use a network-intensive
function (snmp-queries) it just gets slower with maxThreads set to more
than 1.


Hrm. There's nothing obviously wrong. What happens if you use it as a
spider?
--
Aahz (aa**@pythoncraft.com) <*> http://www.pythoncraft.com/

The way to build large Python applications is to componentize and
loosely-couple the hell out of everything.
Jul 29 '05 #2

This thread has been closed and replies have been disabled. Please start a new discussion.

Similar topics

7
by: Anand Pillai | last post by:
The standard Python Queue module, allows to generate queues that have no size limit, by passing the size argument as <= 0. q = Queue(0) In a multithreaded application, these queues could be...
8
by: Matthew Bell | last post by:
Hi, I've got a question about whether there are any issues with directly calling attributes and/or methods of a threaded class instance. I wonder if someone could give me some advice on this. ...
6
by: Dan Kelley | last post by:
We have a multithreaded app that responds to events, and writes these events to a text file. This text file is used by an external system for further processing. We want to be able to write...
7
by: Sidd | last post by:
Hi, I tried finding and example of multithreaded client-serve program in python. Can any one please tell me how to write a multithreaded client-server programn in python such that 1.It can handle...
4
by: Jason | last post by:
Hi, I have this old class that was used for logging error messages and I thought it would be a good exercise (for me) to convert it to .Net specifically c#. This class uses two typed pointer...
9
by: Brian Henry | last post by:
If i inherite a queue class into my class, and do an override of the enqueue member, how would i then go about actually doing an enqueue of an item? I am a little confused on this one... does over...
2
by: Alan Kemp | last post by:
Hi, I have a problem that is half python, half design. I have a multithreaded network server working, each client request spawns a new thread which deals with that client for as long as it is...
3
by: Jake K | last post by:
I have a multithreaded application that I now want to convert into a Windows Service. Does application.run work in a windows service? Are there things to take into consideration when creating a...
5
by: John Henry | last post by:
I am back against the wall trying to migrate my multithreaded application from Python 2.3 to 2.5. The part of the code that's failing has to do with queues (2.3 queues and 2.5 queues are not the...
0
by: taylorcarr | last post by:
A Canon printer is a smart device known for being advanced, efficient, and reliable. It is designed for home, office, and hybrid workspace use and can also be used for a variety of purposes. However,...
0
by: Charles Arthur | last post by:
How do i turn on java script on a villaon, callus and itel keypad mobile phone
0
by: aa123db | last post by:
Variable and constants Use var or let for variables and const fror constants. Var foo ='bar'; Let foo ='bar';const baz ='bar'; Functions function $name$ ($parameters$) { } ...
0
by: ryjfgjl | last post by:
If we have dozens or hundreds of excel to import into the database, if we use the excel import function provided by database editors such as navicat, it will be extremely tedious and time-consuming...
0
by: emmanuelkatto | last post by:
Hi All, I am Emmanuel katto from Uganda. I want to ask what challenges you've faced while migrating a website to cloud. Please let me know. Thanks! Emmanuel
0
BarryA
by: BarryA | last post by:
What are the essential steps and strategies outlined in the Data Structures and Algorithms (DSA) roadmap for aspiring data scientists? How can individuals effectively utilize this roadmap to progress...
1
by: Sonnysonu | last post by:
This is the data of csv file 1 2 3 1 2 3 1 2 3 1 2 3 2 3 2 3 3 the lengths should be different i have to store the data by column-wise with in the specific length. suppose the i have to...
0
by: Hystou | last post by:
Most computers default to English, but sometimes we require a different language, especially when relocating. Forgot to request a specific language before your computer shipped? No problem! You can...
0
jinu1996
by: jinu1996 | last post by:
In today's digital age, having a compelling online presence is paramount for businesses aiming to thrive in a competitive landscape. At the heart of this digital strategy lies an intricately woven...

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.