473,396 Members | 1,895 Online
Bytes | Software Development & Data Engineering Community
Post Job

Home Posts Topics Members FAQ

Join Bytes to post your question to a community of 473,396 software developers and data experts.

TaskQueue

I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads

Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.

If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.

So, the idea is to create a subclass of Queue that increments a counter
when objects are enqueued, that provides a method for worker threads to
decrement the counter when the work is done, and that offers a blocking
join() method that waits until the counter is zero.

There's are working implementation at:

http://aspn.activestate.com/ASPN/Coo.../Recipe/475160

Here is an example:

def worker():
while 1:
task = q.get()
do_work(task)
q.task_done()

# setup queue and launch worker threads
q = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()

# load the tasks
for elem in tasklist:
q.put(elem)

# block until they are done
q.join()

# process with other stuff
. . .
There are some competing approaches. One is to attach sentinel objects
to the end of the line as a way for consumer threads to know that they
should shut down. Then a regular t.join() can be used to block until
the consumers threads have shut-down. This approach is
straight-forward, but it depends on 1) complicating the consumer logic
to include sentinel detection and thread shut-down, 2) complicating the
producer logic to append one sentinel for each consumer when the data
stream is done, 3) actually knowing when the data stream is done. The
latter is a problem in one of my programs because the consumer
sometimes uses a divide-and-conquer approach, resulting in two new
subtasks being loaded to the queue. IOW, the only way to know when all
the inputs have been handled is to have the queue empty and all
consumers inactive (i.e. processing complete).

def worker():
while 1:
task = q.get()
if task is None: # check for sentinel
return
do_work(task)

# setup queue and launch worker threads
q = Queue()
threads = []
for i in range(num_worker_threads):
t = Thread(target=worker)
threads.append(t)
t.start()

# load the tasks
for elem in tasklist:
q.put(elem)
for i in range(num_worker_threads):
q.put(None) # load sentinels

# block until they are done
for t in threads:
t.join()

# process with other stuff
. . .

Another approach is to set-up a second queue for results. Each
consumer loads a result when it is done with processing an input. The
producer or main thread then becomes responsible for matching all
inputs to the corresponding results. This isn't complicated
in-practice but it isn't pretty either:

def worker():
while 1:
task = tasks_in.get()
do_work(task)
tasks_out.put(None) # enqueue a None result
when done with task

# setup queues and launch worker threads
tasks_in = Queue()
tasks_out = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()

# load the tasks
n = len(tasklist)
for elem in tasklist:
tasks_in.put(elem)

# block until they are done
for i in range(n):
tasks_out.get()

# process with other stuff
. . .

This approach becomes messier if the task loading occurs at multiple
points inside a more complex producer function. Also, it doesn't work
well with the worker thread divide-and-conquer situation discussed
above.

Mar 21 '06 #1
8 1308
Raymond Hettinger:
There are some competing approaches. One is to attach sentinel objects
to the end of the line as a way for consumer threads to know that they
should shut down. Then a regular t.join() can be used to block until
the consumers threads have shut-down. This approach is
straight-forward, but it depends on 1) complicating the consumer logic
to include sentinel detection and thread shut-down,
Writing this observation was way more complicated than writing the code
that's required to implement it :-)

if task is None:
break

I use 'None' as the sentinel. This is a larger snippet from my own
threadpool with the context of this code:

def run(self):
while True:
task = self.workQueue.get()
if task is None:
break
try:
task.do()
except:
task.unhandledException()
self.resultQueue.put(task)
2) complicating the producer logic to append one sentinel for each consumer
when the data stream is done
for i in range(self.numberOfThreads):
self.workQueue.put(None)

Again, more characters in your observation than in the code.
3) actually knowing when the data stream is done.


def doingWork(self):
return self.numberOfTasks > 0

Which is possible because of:

def putTask(self,task):
self.workQueue.put(task)
self.numberOfTasks += 1

def getTask(self):
task = self.resultQueue.get()
self.numberOfTasks -= 1
return task

--
René Pijlman

Wat wil jij leren? http://www.leren.nl
Mar 21 '06 #2
Raymond Hettinger wrote:
I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads

Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.

If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.


Hmm. How about this: the Producer can cause an exception to be raised
in any Consumers waiting on the Queue, or vice-versa.

I remember encountering a similar problem when writing a slide viewer
that prefetched and slides. I wanted to have the ability to terminate
the slide show early. But what you have is five slides sitting in the
Queue when the user quits (the UI was in the consumer thread, of
course). Now how do you signal the Producer thread to exit? It seemed
to me that allowing one thread to raise an exception in another was the
most straightforward way to do this. (In my case is was the Consumer
raising it in the Producer.)

I briefly considered a subclass of Queue to implement it, but it was
too difficult a thing that I would have wanted to work on at that point
in time. I implemented some workaround that I don't remember, and
forgot about the issue until your post.

But yeah, something like an InterruptableQueue might be a nice thing to
have.
Carl Banks

Mar 21 '06 #3
Carl Banks wrote:
But yeah, something like an InterruptableQueue might be a nice thing to
have.


Ok, I see now that InterruptableQueue wouldn't help the OP, though it
would have helped me in my situation, so it'd still be a good idea.

Carl Banks

Mar 21 '06 #4

Rene Pijlman wrote:
2) complicating the producer logic to append one sentinel for each consumer
when the data stream is done


for i in range(self.numberOfThreads):
self.workQueue.put(None)


Or, you could just put one sentinel in the Queue, and subclass the
Queue's _get method not to take the sentinel out. It might help keep
bookkeeping down (and it seems Raymond was in a situation where keeping
track of threads wasn't so easy).

BTW, for sentinels, I recommend creating one using:

sentinel = object()

Because, although it's not applicable to your example, sometimes None
is an object you want to pass through. (I think Alex Martelli came up
with that one.)

Carl Banks

Mar 21 '06 #5
Carl Banks:
Rene Pijlman:
for i in range(self.numberOfThreads):
self.workQueue.put(None)
Or, you could just put one sentinel in the Queue, and subclass the
Queue's _get method not to take the sentinel out.


Ah yes, clever trick. But you'd have to worry about thread-safety of your
subclass though.
BTW, for sentinels, I recommend creating one using:

sentinel = object()

Because, although it's not applicable to your example, sometimes None
is an object you want to pass through. (I think Alex Martelli came up
with that one.)


You mean this post, I guess:
http://mail.python.org/pipermail/pyt...er/244750.html

I dunno, but I cannot think of a better way to say "this has no possible
use" than to pass None. Place yourself in the position of someone who
doesn't know exactly what object() is and who encounters it in the code.
I've just spent 5 minutes trying to find some reference information and I
gave up ("object" is a lousy search word).

--
René Pijlman

Wat wil jij leren? http://www.leren.nl
Mar 21 '06 #6
Raymond Hettinger wrote:
I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads

Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.

If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.

So, the idea is to create a subclass of Queue that increments a counter
when objects are enqueued, that provides a method for worker threads to
decrement the counter when the work is done, and that offers a blocking
join() method that waits until the counter is zero.

Hi Raymond, your approach seems like it would be something I would use.

I'm wonder if threads could be implemented as a type of generator object
where it runs, and the yield waits for next() to be called, instead of
next() waiting for yield. Then you can do...
thread Worker(args):
while 1:
# do stuff
...
yield result # wait for next() to be called.
if done: break

my_thread = Worker(args)
results = list(my_thread) # get all values as they are produced
It would be easy to put these in a list and iterate it.
thread Worker(args):
# Do something with args
...
yield None

# Start threads
active_threads = [Worker(args1), Worker(args2), Worker(args3)]

for T in active_threads:
for _ in T: pass # Make sure they are finished.
If this can be done, then all the messy parts possibly could be handled
within python C code, and not by python source code. If you know how to
make a generator, then you would know how to do "simple" threads. And
maybe anything needing more than this would be better off being an
external task anyway?

I'm sure there are lots of um... issues. ;-)

Cheers,
Ron

Mar 21 '06 #7
Rene Pijlman wrote:
Carl Banks:
Rene Pijlman:
for i in range(self.numberOfThreads):
self.workQueue.put(None)
Or, you could just put one sentinel in the Queue, and subclass the
Queue's _get method not to take the sentinel out.


Ah yes, clever trick. But you'd have to worry about thread-safety of your
subclass though.


Queue worries about this for you. The Queue class only calls _get when
it has the queue's mutex, so you can assume thread safety when
subclassing it.

BTW, for sentinels, I recommend creating one using:

sentinel = object()

Because, although it's not applicable to your example, sometimes None
is an object you want to pass through. (I think Alex Martelli came up
with that one.)


You mean this post, I guess:
http://mail.python.org/pipermail/pyt...er/244750.html

I dunno, but I cannot think of a better way to say "this has no possible
use" than to pass None.


The problem is, sometimes None has a possible use. In your example I
don't think it's a big deal; you're using None more like a "null
pointer" than a "sentinel" anyways. But it's not a good idea for a
generic implementation.

Say you were writing a FiniteQueue class that implemented the permament
sentinel trick to indicate to all consumer threads that the queue is
done. For your own code, you may know that you'll never need to pass
None through the queue, but you can't assume that for all code.

Place yourself in the position of someone who
doesn't know exactly what object() is and who encounters it in the code.
I've just spent 5 minutes trying to find some reference information and I
gave up ("object" is a lousy search word).


I would hope the word "sentinel" in "sentinel = object()" would give
that person some clue as to what it is. :)
Carl Banks

Mar 22 '06 #8
Carl Banks:
Rene Pijlman:
Ah yes, clever trick. But you'd have to worry about thread-safety of your
subclass though.
Queue worries about this for you. The Queue class only calls _get when
it has the queue's mutex, so you can assume thread safety when
subclassing it.


Ah yes, I overlooked the underscore. But you'd have to worry about
everything get() is doing before it calls _get() though :-)
Say you were writing a FiniteQueue class that implemented the permament
sentinel trick to indicate to all consumer threads that the queue is
done. For your own code, you may know that you'll never need to pass
None through the queue, but you can't assume that for all code.


Ah, this is where assert proves its value.

class FiniteQueue(object):
def putTask(self,task):
assert task is not None
self.workQueue.put(task)
self.numberOfTasks += 1
# ...

--
René Pijlman

Wat wil jij leren? http://www.leren.nl
Mar 22 '06 #9

This thread has been closed and replies have been disabled. Please start a new discussion.

Similar topics

7
by: sayoyo | last post by:
Hi, Is there some way that we can reuse a thread by replacing the runnable object of the thread? like a thread is not "alive" anymore, then we remove the runnable object(is it possible????) and...
3
by: Aahz | last post by:
If you were going to name three or five essential recipes from the Python Cookbook suitable for beginners, what would you pick? Yes, this is for _Python for Dummies_, so idioms that aren't in the...
12
by: johnny | last post by:
I have taken a look at the code that dose one download at time, in multi threaded manner: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/465531 What I wanted to do is, make it download...
5
by: johnny | last post by:
I have a module called ftp and I have another module called processKick. What I need is to have processKick, create fork and execute ftp like below. Relevant processKick code as follows: def...
3
by: johnny | last post by:
I have python script does ftp download in a multi threaded way. Each thread downloads a file, close the file, calls the comman line to convert the .doc to pdf. Command line should go ahead and...
4
by: rdabane | last post by:
Hi, I'm trying to perform following operation from inside the python script 1. Open a shell ( start a process ) 2. Send command1 to the process 3. Get output from the process 4. Send command2...
0
by: ryjfgjl | last post by:
In our work, we often receive Excel tables with data in the same format. If we want to analyze these data, it can be difficult to analyze them because the data is spread across multiple Excel files...
1
by: nemocccc | last post by:
hello, everyone, I want to develop a software for my android phone for daily needs, any suggestions?
0
by: Hystou | last post by:
There are some requirements for setting up RAID: 1. The motherboard and BIOS support RAID configuration. 2. The motherboard has 2 or more available SATA protocol SSD/HDD slots (including MSATA, M.2...
0
marktang
by: marktang | last post by:
ONU (Optical Network Unit) is one of the key components for providing high-speed Internet services. Its primary function is to act as an endpoint device located at the user's premises. However,...
0
by: Hystou | last post by:
Most computers default to English, but sometimes we require a different language, especially when relocating. Forgot to request a specific language before your computer shipped? No problem! You can...
0
jinu1996
by: jinu1996 | last post by:
In today's digital age, having a compelling online presence is paramount for businesses aiming to thrive in a competitive landscape. At the heart of this digital strategy lies an intricately woven...
0
by: Hystou | last post by:
Overview: Windows 11 and 10 have less user interface control over operating system update behaviour than previous versions of Windows. In Windows 11 and 10, there is no way to turn off the Windows...
0
tracyyun
by: tracyyun | last post by:
Dear forum friends, With the development of smart home technology, a variety of wireless communication protocols have appeared on the market, such as Zigbee, Z-Wave, Wi-Fi, Bluetooth, etc. Each...
0
agi2029
by: agi2029 | last post by:
Let's talk about the concept of autonomous AI software engineers and no-code agents. These AIs are designed to manage the entire lifecycle of a software development project—planning, coding, testing,...

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.