By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
455,538 Members | 1,293 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 455,538 IT Pros & Developers. It's quick & easy.

RFC: my iterthreader module

P: n/a
I have this iterthreader module that I've been working on for a while
now. It is similar to itertools.imap, but it calls each function in
its own thread and uses Queues for moving the data around. A better
name for it would probably be ithreadmap, but anyway...

The short explanation of it is if you have a loop like
for item in biglist:
print "The value for %s is %s" % (item, slowfunc(item))
or
for item,val in ((item, slowfunc(item)) for item in biglist):
print "The value for %s is %s" % (item, val)

you can simply rewrite it as

for item,val in iterthreader.Threader(slowfunc, biglist):
print "The value for %s is %s" % (item, val)

and it will hopefully run faster. The usual GIL issues still apply of
course.... You can also subclass it in various ways, but I almost
always just call it in the above manner.

So, can anyone find any obvious problems with it? I've been meaning to
re-post [1] it to the python cookbook, but I'd like to hear what
others think first. I'm not aware of any other module that makes this
particular use of threading this simple.

[1] I _think_ I posted it before, but that may have just been in a
comment

import threading
import Queue

class Threader:
def __init__(self, func=None, data=None, numthreads=2):
if not numthreads 0:
raise AssertionError("numthreads should be greater than 0")

if func:
self.handle_input=func
if data:
self.get_input = lambda : data

self._numthreads=numthreads
self.threads = []
self.run()
def __iter__(self):
return self

def next(self):
still_running, input, output = self.DQ.get()
if not still_running:
raise StopIteration
return input, output

def get_input(self):
raise NotImplementedError, "You must implement get_input as a
function that returns an iterable"

def handle_input(self, input):
raise NotImplementedError, "You must implement handle_input as
a function that returns anything"

def _handle_input(self):
while 1:
work_todo, input = self.Q.get()
if not work_todo:
break
self.DQ.put((True, input, self.handle_input(input)))

def cleanup(self):
"""wait for all threads to stop and tell the main iter to
stop"""
for t in self.threads:
t.join()
self.DQ.put((False,None,None))
def run(self):
self.Q=Queue.Queue()
self.DQ=Queue.Queue()
for x in range(self._numthreads):
t=threading.Thread(target=self._handle_input)
t.start()
self.threads.append(t)

try :
for x in self.get_input():
self.Q.put((True, x))
except NotImplementedError, e:
print e
for x in range(self._numthreads):
self.Q.put((False, None))

threading.Thread(target=self.cleanup).start()
--
- Justin

Jul 18 '06 #1
Share this question for a faster answer!
Share on Google+

This discussion thread is closed

Replies have been disabled for this discussion.