473,395 Members | 1,653 Online
Bytes | Software Development & Data Engineering Community
Post Job

Home Posts Topics Members FAQ

Join Bytes to post your question to a community of 473,395 software developers and data experts.

Producer-consumer threading problem

I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(items, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
Jun 27 '08 #1
10 4387
George Sakkis wrote:
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(items, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:

1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). I guess you could yield
None if there wasn't anything to consume to prevent blocking.

2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).

I don't know if either of these help or not.

-Larry
Jun 27 '08 #2
On Jun 10, 11:47*pm, Larry Bates <larry.ba...@websafe.com`wrote:
>
I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:
Ok, let me try again with a different example: I want to do what can
be easily done with 2.5 Queues using Queue.task_done()/Queue.join()
(see example at http://docs.python.org/lib/QueueObjects.html), but
instead of having to first put all items and then wait until all are
done, get each item as soon as it is done.
1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). *I guess you could yield
None if there wasn't anything to consume to prevent blocking.
Actually the way items are generated is not part of the problem; it
can be abstracted away as an arbitrary iterable input. As with all
iterables, "there are no more items" is communicated simply by a
StopIteration.
2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. *You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).
Twisted is interesting and very powerful but requires a different way
of thinking about the problem and designing a solution. More to the
point, callbacks often provide a less flexible and simple API than an
iterator that yields results (consumed items). For example, say that
you want to store the results to a dictionary. Using callbacks, you
would have to explicitly synchronize each access to the dictionary
since they may fire independently. OTOH an iterator by definition
yields items sequentially, so the client doesn't have to bother with
synchronization. Note that with "client" I mean the user of an API/
framework/library; the implementation of the library itself may of
course use callbacks under the hood (e.g. to put incoming results to a
Queue) but expose the API as a simple iterator.

George
Jun 27 '08 #3
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?
Jun 27 '08 #4
On Jun 10, 11:33 pm, George Sakkis <george.sak...@gmail.comwrote:
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated athttp://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting.

Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.
Carl Banks
Jun 27 '08 #5
On Jun 11, 1:59*am, Rhamphoryncus <rha...@gmail.comwrote:
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?
I just gave two reasons:
- Concurrency / interactivity. The main thread shouldn't wait for all
one million items to be produced to get to see even one of them.
- Limiting resources. Just like iterating over the lines of a file is
more memory efficient than reading the whole file in memory, getting
each consumed item as it becomes available is more memory efficient
than waiting for all of them to finish.

George
Jun 27 '08 #6
Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.
There's a decent implementation of this in the Python Cookbook,
Second Edition (9.4: Working with a Thread Pool), available from
Safari as a preview:
http://my.safaribooksonline.com/0596...2-CHP-9-SECT-4

Basically, there's a request_work function that adds (command,
data) pairs to the input Queue. The command 'stop' is used to
terminate each worker thread (there's the sentinel).
stop_and_free_thread_pool() just puts N ('stop', None) pairs and
join()s each thread.

The threadpool put()s the consumed items in an output Queue; they
can be retrieved concurrently using get(). You don't call join()
until you want to stop producing; you can get() at any time.

Geoff Gilmour-Taylor

(I ended up using this recipe in my own code, but with a completely
different stopping mechanism---I'm using the worker threads to control
subprocesses; I want to terminate the subprocesses but keep the worker
threads running---and a callback rather than an output queue.)
Jun 27 '08 #7
George Sakkis wrote:
On Jun 10, 11:47 pm, Larry Bates <larry.ba...@websafe.com`wrote:
>I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:

Ok, let me try again with a different example: I want to do what can
be easily done with 2.5 Queues using Queue.task_done()/Queue.join()
(see example at http://docs.python.org/lib/QueueObjects.html), but
instead of having to first put all items and then wait until all are
done, get each item as soon as it is done.
>1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). I guess you could yield
None if there wasn't anything to consume to prevent blocking.

Actually the way items are generated is not part of the problem; it
can be abstracted away as an arbitrary iterable input. As with all
iterables, "there are no more items" is communicated simply by a
StopIteration.
>2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).

Twisted is interesting and very powerful but requires a different way
of thinking about the problem and designing a solution. More to the
point, callbacks often provide a less flexible and simple API than an
iterator that yields results (consumed items). For example, say that
you want to store the results to a dictionary. Using callbacks, you
would have to explicitly synchronize each access to the dictionary
since they may fire independently. OTOH an iterator by definition
yields items sequentially, so the client doesn't have to bother with
synchronization. Note that with "client" I mean the user of an API/
framework/library; the implementation of the library itself may of
course use callbacks under the hood (e.g. to put incoming results to a
Queue) but expose the API as a simple iterator.

George
If you use a queue and the producer/collector are running in different threads
you don't have to wait for "to first put all items and then wait until all are
done". Producer can push items on the queue and while the collector
asynchronously pops them off.

I'm virtually certain that I read on this forum that dictionary access is atomic
if that helps.

-Larry
Jun 27 '08 #8
On Jun 11, 6:00 am, George Sakkis <george.sak...@gmail.comwrote:
On Jun 11, 1:59 am, Rhamphoryncus <rha...@gmail.comwrote:
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?

I just gave two reasons:
- Concurrency / interactivity. The main thread shouldn't wait for all
one million items to be produced to get to see even one of them.
Then don't wait. The main thread can easily do other work while the
producer and consumer threads go about their business.

- Limiting resources. Just like iterating over the lines of a file is
more memory efficient than reading the whole file in memory, getting
each consumed item as it becomes available is more memory efficient
than waiting for all of them to finish.
That's why you give Queue a maxsize. Put it at maybe 5 or 10. Enough
that the producer can operate in a burst (usually more efficient that
switching threads after each item), then the consumer can grab them
all in a burst.

Then again, you may find it easier to use an event-driven architecture
(like Twisted, as others have suggested.)
Jun 27 '08 #9
In article <1f**********************************@k37g2000hsf. googlegroups.com>,
George Sakkis <ge***********@gmail.comwrote:
>
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.
Take a look at the threading tutorial on my web page, specifically the
threadpool spider.
--
Aahz (aa**@pythoncraft.com) <* http://www.pythoncraft.com/

"as long as we like the same operating system, things are cool." --piranha
Jun 27 '08 #10
On Jun 11, 3:07 pm, Carl Banks <pavlovevide...@gmail.comwrote:
On Jun 10, 11:33 pm, George Sakkis <george.sak...@gmail.comwrote:
I pasted my current solution athttp://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

It seems like you're reinventing the wheel. The Queue class does all
this, and it's been thorougly battle-tested.
Synchronized queues are an extremely useful data structure in many
situations. The producer/consumer paradigm however is a more general
model and doesn't depend on any specific data structure.
So first of all, can you tell us why the following wouldn't work? It
might help us understand the issue you're facing (never mind the
produce and consume arguments for now--I'll cover that below).

def iter_consumed(items):
q = Queue.Queue()
sentinel = object()
def produce_all()
for item in items:
q.put()
q.put(sentinel)
producer = threading.Thread(target=produce_all)
producer.start()
try:
while True:
item = q.get()
if item is sentinel:
return
yield item
finally:
# for robustness, notify producer to wrap things up
# left as exercise
producer.join()
As it is, all this does is yield the original items, but slower, which
is pretty much useless. The whole idea is to transform some inputs to
some outputs. How exactly each input is mapped to an output is
irrelevant at this point; this is the power of the producer/consumer
model. Produce might mean "send an email to address X" and consume
might mean "wait for an automated email response, parse it and return
a value Y". No queue has to be involved; it *may* be involved of
course, but that's an implementation detail, it shouldn't make a
difference to iter_consumed().

If you replace "q.push"/"q.pop" with "produce"/"consume" respectively
and make the last two parameters, you're much closer to my idea.
What's left is getting rid of the sentinel, since the producer and the
consumer may have been written independently, not aware of
iter_consumed. E.g. for the email example, all producers and consumers
(there may be more than one) must agree in advance on a "sentinel
email". For a given situation that might not be a big issue, but then
again, if iter_consumed() is written once without assuming a sentinel,
it makes life easier for all future producers and consumers.
If you want to customize the effect of getting and putting, you can
subclass Queue and override the _get and _put methods (however, last
time I checked, the Queue class expects _put to always add an item to
the internal sequence representing the queue--not necessarily to the
top--and _get to always remove an item--not necessarily from the
bottom).
Assuming that you're talking about the stdlib Queue.Queue class and
not some abstract Queue interface, extending it may help for limited
customization but can only get you so far. For instance the producer
and the consumer may not live in the same address space, or even in
the same machine.
One issue from your function. This line:

done_remaining[1] += 1

is not atomic, but your code depends on it being so.
No, it doesn't; it is protected by the condition object.

George
Jun 27 '08 #11

This thread has been closed and replies have been disabled. Please start a new discussion.

Similar topics

4
by: Wai Yip Tung | last post by:
I'm attempting to turn some process than uses callback to return result into a more user friendly generator. I'm hitting some road block so any pointer would be appriciated. Let say there is an...
2
by: mag31 | last post by:
This is a technically challenging question but it would help me significantly if anyone can answer it. In essence I have the following: A factory class, which can create producer classes which...
4
by: Reinhold Schalk | last post by:
Hello, somewhere i've read that using strong names does assure two things: 1. Assure that the content of the assembly is not modified (that's ok in my opinion) 2. Assure that the assembly is...
3
by: Vern | last post by:
The following code retrieves data into a dataset, and then creates a dataview with a filter. This dataview is then attached to a combobox. When the effective date changes, I would like to see the...
1
by: Jakub Okaj | last post by:
I would like to use ASP like PageProducer in Delphi CGI aplication. How can I do this? Please help. QBA
2
by: Rich Wallace | last post by:
Hi all, Does anybody know where I can find an example of filling a CheckListBox with selected items from a SQL database? TIA -Rich
5
by: cozsmin | last post by:
hello , as u know wait() and notify() will not thow an exception if the method that calls them has the lock , or esle i misundrestood java :P this is the code that throws (unwanted) ...
2
by: dwhall | last post by:
I have 2 python scripts: examples of a producer and a filter, respectively: #! /usr/bin/env python import sys, time if __name__ == "__main__": while True: sys.stdout.write("hello.\r\n")...
12
by: test63 | last post by:
with which program, i can make a 3d game(like world of warcraft, alladin.....)? chears-
0
by: Charles Arthur | last post by:
How do i turn on java script on a villaon, callus and itel keypad mobile phone
0
by: ryjfgjl | last post by:
If we have dozens or hundreds of excel to import into the database, if we use the excel import function provided by database editors such as navicat, it will be extremely tedious and time-consuming...
0
by: emmanuelkatto | last post by:
Hi All, I am Emmanuel katto from Uganda. I want to ask what challenges you've faced while migrating a website to cloud. Please let me know. Thanks! Emmanuel
0
BarryA
by: BarryA | last post by:
What are the essential steps and strategies outlined in the Data Structures and Algorithms (DSA) roadmap for aspiring data scientists? How can individuals effectively utilize this roadmap to progress...
1
by: nemocccc | last post by:
hello, everyone, I want to develop a software for my android phone for daily needs, any suggestions?
0
by: Hystou | last post by:
There are some requirements for setting up RAID: 1. The motherboard and BIOS support RAID configuration. 2. The motherboard has 2 or more available SATA protocol SSD/HDD slots (including MSATA, M.2...
0
marktang
by: marktang | last post by:
ONU (Optical Network Unit) is one of the key components for providing high-speed Internet services. Its primary function is to act as an endpoint device located at the user's premises. However,...
0
Oralloy
by: Oralloy | last post by:
Hello folks, I am unable to find appropriate documentation on the type promotion of bit-fields when using the generalised comparison operator "<=>". The problem is that using the GNU compilers,...
0
tracyyun
by: tracyyun | last post by:
Dear forum friends, With the development of smart home technology, a variety of wireless communication protocols have appeared on the market, such as Zigbee, Z-Wave, Wi-Fi, Bluetooth, etc. Each...

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.