By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
440,091 Members | 1,546 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 440,091 IT Pros & Developers. It's quick & easy.

Producer-consumer threading problem

P: n/a
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(items, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
Jun 27 '08 #1
Share this Question
Share on Google+
10 Replies


P: n/a
George Sakkis wrote:
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(items, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:

1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). I guess you could yield
None if there wasn't anything to consume to prevent blocking.

2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).

I don't know if either of these help or not.

-Larry
Jun 27 '08 #2

P: n/a
On Jun 10, 11:47*pm, Larry Bates <larry.ba...@websafe.com`wrote:
>
I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:
Ok, let me try again with a different example: I want to do what can
be easily done with 2.5 Queues using Queue.task_done()/Queue.join()
(see example at http://docs.python.org/lib/QueueObjects.html), but
instead of having to first put all items and then wait until all are
done, get each item as soon as it is done.
1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). *I guess you could yield
None if there wasn't anything to consume to prevent blocking.
Actually the way items are generated is not part of the problem; it
can be abstracted away as an arbitrary iterable input. As with all
iterables, "there are no more items" is communicated simply by a
StopIteration.
2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. *You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).
Twisted is interesting and very powerful but requires a different way
of thinking about the problem and designing a solution. More to the
point, callbacks often provide a less flexible and simple API than an
iterator that yields results (consumed items). For example, say that
you want to store the results to a dictionary. Using callbacks, you
would have to explicitly synchronize each access to the dictionary
since they may fire independently. OTOH an iterator by definition
yields items sequentially, so the client doesn't have to bother with
synchronization. Note that with "client" I mean the user of an API/
framework/library; the implementation of the library itself may of
course use callbacks under the hood (e.g. to put incoming results to a
Queue) but expose the API as a simple iterator.

George
Jun 27 '08 #3

P: n/a
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?
Jun 27 '08 #4

P: n/a
On Jun 10, 11:33 pm, George Sakkis <george.sak...@gmail.comwrote:
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated athttp://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting.

Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.
Carl Banks
Jun 27 '08 #5

P: n/a
On Jun 11, 1:59*am, Rhamphoryncus <rha...@gmail.comwrote:
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?
I just gave two reasons:
- Concurrency / interactivity. The main thread shouldn't wait for all
one million items to be produced to get to see even one of them.
- Limiting resources. Just like iterating over the lines of a file is
more memory efficient than reading the whole file in memory, getting
each consumed item as it becomes available is more memory efficient
than waiting for all of them to finish.

George
Jun 27 '08 #6

P: n/a
Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.
There's a decent implementation of this in the Python Cookbook,
Second Edition (9.4: Working with a Thread Pool), available from
Safari as a preview:
http://my.safaribooksonline.com/0596...2-CHP-9-SECT-4

Basically, there's a request_work function that adds (command,
data) pairs to the input Queue. The command 'stop' is used to
terminate each worker thread (there's the sentinel).
stop_and_free_thread_pool() just puts N ('stop', None) pairs and
join()s each thread.

The threadpool put()s the consumed items in an output Queue; they
can be retrieved concurrently using get(). You don't call join()
until you want to stop producing; you can get() at any time.

Geoff Gilmour-Taylor

(I ended up using this recipe in my own code, but with a completely
different stopping mechanism---I'm using the worker threads to control
subprocesses; I want to terminate the subprocesses but keep the worker
threads running---and a callback rather than an output queue.)
Jun 27 '08 #7

P: n/a
George Sakkis wrote:
On Jun 10, 11:47 pm, Larry Bates <larry.ba...@websafe.com`wrote:
>I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:

Ok, let me try again with a different example: I want to do what can
be easily done with 2.5 Queues using Queue.task_done()/Queue.join()
(see example at http://docs.python.org/lib/QueueObjects.html), but
instead of having to first put all items and then wait until all are
done, get each item as soon as it is done.
>1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). I guess you could yield
None if there wasn't anything to consume to prevent blocking.

Actually the way items are generated is not part of the problem; it
can be abstracted away as an arbitrary iterable input. As with all
iterables, "there are no more items" is communicated simply by a
StopIteration.
>2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).

Twisted is interesting and very powerful but requires a different way
of thinking about the problem and designing a solution. More to the
point, callbacks often provide a less flexible and simple API than an
iterator that yields results (consumed items). For example, say that
you want to store the results to a dictionary. Using callbacks, you
would have to explicitly synchronize each access to the dictionary
since they may fire independently. OTOH an iterator by definition
yields items sequentially, so the client doesn't have to bother with
synchronization. Note that with "client" I mean the user of an API/
framework/library; the implementation of the library itself may of
course use callbacks under the hood (e.g. to put incoming results to a
Queue) but expose the API as a simple iterator.

George
If you use a queue and the producer/collector are running in different threads
you don't have to wait for "to first put all items and then wait until all are
done". Producer can push items on the queue and while the collector
asynchronously pops them off.

I'm virtually certain that I read on this forum that dictionary access is atomic
if that helps.

-Larry
Jun 27 '08 #8

P: n/a
On Jun 11, 6:00 am, George Sakkis <george.sak...@gmail.comwrote:
On Jun 11, 1:59 am, Rhamphoryncus <rha...@gmail.comwrote:
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?

I just gave two reasons:
- Concurrency / interactivity. The main thread shouldn't wait for all
one million items to be produced to get to see even one of them.
Then don't wait. The main thread can easily do other work while the
producer and consumer threads go about their business.

- Limiting resources. Just like iterating over the lines of a file is
more memory efficient than reading the whole file in memory, getting
each consumed item as it becomes available is more memory efficient
than waiting for all of them to finish.
That's why you give Queue a maxsize. Put it at maybe 5 or 10. Enough
that the producer can operate in a burst (usually more efficient that
switching threads after each item), then the consumer can grab them
all in a burst.

Then again, you may find it easier to use an event-driven architecture
(like Twisted, as others have suggested.)
Jun 27 '08 #9

P: n/a
In article <1f**********************************@k37g2000hsf. googlegroups.com>,
George Sakkis <ge***********@gmail.comwrote:
>
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.
Take a look at the threading tutorial on my web page, specifically the
threadpool spider.
--
Aahz (aa**@pythoncraft.com) <* http://www.pythoncraft.com/

"as long as we like the same operating system, things are cool." --piranha
Jun 27 '08 #10

P: n/a
On Jun 11, 3:07 pm, Carl Banks <pavlovevide...@gmail.comwrote:
On Jun 10, 11:33 pm, George Sakkis <george.sak...@gmail.comwrote:
I pasted my current solution athttp://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

It seems like you're reinventing the wheel. The Queue class does all
this, and it's been thorougly battle-tested.
Synchronized queues are an extremely useful data structure in many
situations. The producer/consumer paradigm however is a more general
model and doesn't depend on any specific data structure.
So first of all, can you tell us why the following wouldn't work? It
might help us understand the issue you're facing (never mind the
produce and consume arguments for now--I'll cover that below).

def iter_consumed(items):
q = Queue.Queue()
sentinel = object()
def produce_all()
for item in items:
q.put()
q.put(sentinel)
producer = threading.Thread(target=produce_all)
producer.start()
try:
while True:
item = q.get()
if item is sentinel:
return
yield item
finally:
# for robustness, notify producer to wrap things up
# left as exercise
producer.join()
As it is, all this does is yield the original items, but slower, which
is pretty much useless. The whole idea is to transform some inputs to
some outputs. How exactly each input is mapped to an output is
irrelevant at this point; this is the power of the producer/consumer
model. Produce might mean "send an email to address X" and consume
might mean "wait for an automated email response, parse it and return
a value Y". No queue has to be involved; it *may* be involved of
course, but that's an implementation detail, it shouldn't make a
difference to iter_consumed().

If you replace "q.push"/"q.pop" with "produce"/"consume" respectively
and make the last two parameters, you're much closer to my idea.
What's left is getting rid of the sentinel, since the producer and the
consumer may have been written independently, not aware of
iter_consumed. E.g. for the email example, all producers and consumers
(there may be more than one) must agree in advance on a "sentinel
email". For a given situation that might not be a big issue, but then
again, if iter_consumed() is written once without assuming a sentinel,
it makes life easier for all future producers and consumers.
If you want to customize the effect of getting and putting, you can
subclass Queue and override the _get and _put methods (however, last
time I checked, the Queue class expects _put to always add an item to
the internal sequence representing the queue--not necessarily to the
top--and _get to always remove an item--not necessarily from the
bottom).
Assuming that you're talking about the stdlib Queue.Queue class and
not some abstract Queue interface, extending it may help for limited
customization but can only get you so far. For instance the producer
and the consumer may not live in the same address space, or even in
the same machine.
One issue from your function. This line:

done_remaining[1] += 1

is not atomic, but your code depends on it being so.
No, it doesn't; it is protected by the condition object.

George
Jun 27 '08 #11

This discussion thread is closed

Replies have been disabled for this discussion.