473,598 Members | 2,916 Online
Bytes | Software Development & Data Engineering Community
+ Post

Home Posts Topics Members FAQ

Producer-consumer threading problem

I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(i tems, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
Jun 27 '08 #1
10 4408
George Sakkis wrote:
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(i tems, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:

1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). I guess you could yield
None if there wasn't anything to consume to prevent blocking.

2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).

I don't know if either of these help or not.

-Larry
Jun 27 '08 #2
On Jun 10, 11:47*pm, Larry Bates <larry.ba...@we bsafe.com`wrote :
>
I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:
Ok, let me try again with a different example: I want to do what can
be easily done with 2.5 Queues using Queue.task_done ()/Queue.join()
(see example at http://docs.python.org/lib/QueueObjects.html), but
instead of having to first put all items and then wait until all are
done, get each item as soon as it is done.
1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). *I guess you could yield
None if there wasn't anything to consume to prevent blocking.
Actually the way items are generated is not part of the problem; it
can be abstracted away as an arbitrary iterable input. As with all
iterables, "there are no more items" is communicated simply by a
StopIteration.
2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. *You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).
Twisted is interesting and very powerful but requires a different way
of thinking about the problem and designing a solution. More to the
point, callbacks often provide a less flexible and simple API than an
iterator that yields results (consumed items). For example, say that
you want to store the results to a dictionary. Using callbacks, you
would have to explicitly synchronize each access to the dictionary
since they may fire independently. OTOH an iterator by definition
yields items sequentially, so the client doesn't have to bother with
synchronization . Note that with "client" I mean the user of an API/
framework/library; the implementation of the library itself may of
course use callbacks under the hood (e.g. to put incoming results to a
Queue) but expose the API as a simple iterator.

George
Jun 27 '08 #3
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?
Jun 27 '08 #4
On Jun 10, 11:33 pm, George Sakkis <george.sak...@ gmail.comwrote:
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated athttp://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting.

Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.
Carl Banks
Jun 27 '08 #5
On Jun 11, 1:59*am, Rhamphoryncus <rha...@gmail.c omwrote:
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?
I just gave two reasons:
- Concurrency / interactivity. The main thread shouldn't wait for all
one million items to be produced to get to see even one of them.
- Limiting resources. Just like iterating over the lines of a file is
more memory efficient than reading the whole file in memory, getting
each consumed item as it becomes available is more memory efficient
than waiting for all of them to finish.

George
Jun 27 '08 #6
Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.
There's a decent implementation of this in the Python Cookbook,
Second Edition (9.4: Working with a Thread Pool), available from
Safari as a preview:
http://my.safaribooksonline.com/0596...2-CHP-9-SECT-4

Basically, there's a request_work function that adds (command,
data) pairs to the input Queue. The command 'stop' is used to
terminate each worker thread (there's the sentinel).
stop_and_free_t hread_pool() just puts N ('stop', None) pairs and
join()s each thread.

The threadpool put()s the consumed items in an output Queue; they
can be retrieved concurrently using get(). You don't call join()
until you want to stop producing; you can get() at any time.

Geoff Gilmour-Taylor

(I ended up using this recipe in my own code, but with a completely
different stopping mechanism---I'm using the worker threads to control
subprocesses; I want to terminate the subprocesses but keep the worker
threads running---and a callback rather than an output queue.)
Jun 27 '08 #7
George Sakkis wrote:
On Jun 10, 11:47 pm, Larry Bates <larry.ba...@we bsafe.com`wrote :
>I had a little trouble understanding what exact problem it is that you are
trying to solve but I'm pretty sure that you can do it with one of two methods:

Ok, let me try again with a different example: I want to do what can
be easily done with 2.5 Queues using Queue.task_done ()/Queue.join()
(see example at http://docs.python.org/lib/QueueObjects.html), but
instead of having to first put all items and then wait until all are
done, get each item as soon as it is done.
>1) Write the producer as a generator using yield method that yields a result
every time it is called (something like os.walk does). I guess you could yield
None if there wasn't anything to consume to prevent blocking.

Actually the way items are generated is not part of the problem; it
can be abstracted away as an arbitrary iterable input. As with all
iterables, "there are no more items" is communicated simply by a
StopIteration.
>2) Usw somethink like Twisted insted that uses callbacks instead to handle
multiple asynchronous calls to produce. You could have callbacks that don't do
anything if there is nothing to consume (sort of null objects I guess).

Twisted is interesting and very powerful but requires a different way
of thinking about the problem and designing a solution. More to the
point, callbacks often provide a less flexible and simple API than an
iterator that yields results (consumed items). For example, say that
you want to store the results to a dictionary. Using callbacks, you
would have to explicitly synchronize each access to the dictionary
since they may fire independently. OTOH an iterator by definition
yields items sequentially, so the client doesn't have to bother with
synchronization . Note that with "client" I mean the user of an API/
framework/library; the implementation of the library itself may of
course use callbacks under the hood (e.g. to put incoming results to a
Queue) but expose the API as a simple iterator.

George
If you use a queue and the producer/collector are running in different threads
you don't have to wait for "to first put all items and then wait until all are
done". Producer can push items on the queue and while the collector
asynchronously pops them off.

I'm virtually certain that I read on this forum that dictionary access is atomic
if that helps.

-Larry
Jun 27 '08 #8
On Jun 11, 6:00 am, George Sakkis <george.sak...@ gmail.comwrote:
On Jun 11, 1:59 am, Rhamphoryncus <rha...@gmail.c omwrote:
Why not use a normal Queue, put a dummy value (such as None) in when
you're producer has finished, and have the main thread use the normal
Thread.join() method on all your child threads?

I just gave two reasons:
- Concurrency / interactivity. The main thread shouldn't wait for all
one million items to be produced to get to see even one of them.
Then don't wait. The main thread can easily do other work while the
producer and consumer threads go about their business.

- Limiting resources. Just like iterating over the lines of a file is
more memory efficient than reading the whole file in memory, getting
each consumed item as it becomes available is more memory efficient
than waiting for all of them to finish.
That's why you give Queue a maxsize. Put it at maybe 5 or 10. Enough
that the producer can operate in a burst (usually more efficient that
switching threads after each item), then the consumer can grab them
all in a burst.

Then again, you may find it easier to use an event-driven architecture
(like Twisted, as others have suggested.)
Jun 27 '08 #9
In article <1f************ *************** *******@k37g200 0hsf.googlegrou ps.com>,
George Sakkis <ge***********@ gmail.comwrote:
>
I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally ; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.
Take a look at the threading tutorial on my web page, specifically the
threadpool spider.
--
Aahz (aa**@pythoncra ft.com) <* http://www.pythoncraft.com/

"as long as we like the same operating system, things are cool." --piranha
Jun 27 '08 #10

This thread has been closed and replies have been disabled. Please start a new discussion.

Similar topics

4
2327
by: Wai Yip Tung | last post by:
I'm attempting to turn some process than uses callback to return result into a more user friendly generator. I'm hitting some road block so any pointer would be appriciated. Let say there is an existing method producer(x, cb). It calls the supplied cb() every time when there is data available. The definititon of cb should be: def cb(data)
2
1724
by: mag31 | last post by:
This is a technically challenging question but it would help me significantly if anyone can answer it. In essence I have the following: A factory class, which can create producer classes which inherit an interface (standard factory design pattern). I would like to make this more versatile and do the following:
4
1853
by: Reinhold Schalk | last post by:
Hello, somewhere i've read that using strong names does assure two things: 1. Assure that the content of the assembly is not modified (that's ok in my opinion) 2. Assure that the assembly is really from the "fabricator" (?) If these two point are correct (i'm not sure), i have a problem with point 2. To assure the authentity of the fabricator, the public key (which is a part of the manifest) has to be checked against a certificate.
3
11437
by: Vern | last post by:
The following code retrieves data into a dataset, and then creates a dataview with a filter. This dataview is then attached to a combobox. When the effective date changes, I would like to see the newly filtered data without having to create a new dataview each time. Is that possible? Also, do I need to attach the dataview to the combo box each time? if (AgencyCounties == null || sPropertyState.Text != prevPropertyState) {...
1
1423
by: Jakub Okaj | last post by:
I would like to use ASP like PageProducer in Delphi CGI aplication. How can I do this? Please help. QBA
2
2624
by: Rich Wallace | last post by:
Hi all, Does anybody know where I can find an example of filling a CheckListBox with selected items from a SQL database? TIA -Rich
5
7542
by: cozsmin | last post by:
hello , as u know wait() and notify() will not thow an exception if the method that calls them has the lock , or esle i misundrestood java :P this is the code that throws (unwanted) exceptions : (or u can download it from : http://www.geocities.com/mndt_0/files/WaitingForThread.java at this link it has the spaces )
2
1446
by: dwhall | last post by:
I have 2 python scripts: examples of a producer and a filter, respectively: #! /usr/bin/env python import sys, time if __name__ == "__main__": while True: sys.stdout.write("hello.\r\n") time.sleep(0.000001)
12
4286
by: test63 | last post by:
with which program, i can make a 3d game(like world of warcraft, alladin.....)? chears-
0
7894
by: Hystou | last post by:
Most computers default to English, but sometimes we require a different language, especially when relocating. Forgot to request a specific language before your computer shipped? No problem! You can effortlessly switch the default language on Windows 10 without reinstalling. I'll walk you through it. First, let's disable language synchronization. With a Microsoft account, language settings sync across devices. To prevent any complications,...
0
8284
Oralloy
by: Oralloy | last post by:
Hello folks, I am unable to find appropriate documentation on the type promotion of bit-fields when using the generalised comparison operator "<=>". The problem is that using the GNU compilers, it seems that the internal comparison operator "<=>" tries to promote arguments from unsigned to signed. This is as boiled down as I can make it. Here is my compilation command: g++-12 -std=c++20 -Wnarrowing bit_field.cpp Here is the code in...
1
8046
by: Hystou | last post by:
Overview: Windows 11 and 10 have less user interface control over operating system update behaviour than previous versions of Windows. In Windows 11 and 10, there is no way to turn off the Windows Update option using the Control Panel or Settings app; it automatically checks for updates and installs any it finds, whether you like it or not. For most users, this new feature is actually very convenient. If you want to control the update process,...
0
8262
tracyyun
by: tracyyun | last post by:
Dear forum friends, With the development of smart home technology, a variety of wireless communication protocols have appeared on the market, such as Zigbee, Z-Wave, Wi-Fi, Bluetooth, etc. Each protocol has its own unique characteristics and advantages, but as a user who is planning to build a smart home system, I am a bit confused by the choice of these technologies. I'm particularly interested in Zigbee because I've heard it does some...
0
6711
agi2029
by: agi2029 | last post by:
Let's talk about the concept of autonomous AI software engineers and no-code agents. These AIs are designed to manage the entire lifecycle of a software development project—planning, coding, testing, and deployment—without human intervention. Imagine an AI that can take a project description, break it down, write the code, debug it, and then launch it, all on its own.... Now, this would greatly impact the work of software developers. The idea...
0
5437
by: conductexam | last post by:
I have .net C# application in which I am extracting data from word file and save it in database particularly. To store word all data as it is I am converting the whole word file firstly in HTML and then checking html paragraph one by one. At the time of converting from word file to html my equations which are in the word document file was convert into image. Globals.ThisAddIn.Application.ActiveDocument.Select();...
0
3894
by: TSSRALBI | last post by:
Hello I'm a network technician in training and I need your help. I am currently learning how to create and manage the different types of VPNs and I have a question about LAN-to-LAN VPNs. The last exercise I practiced was to create a LAN-to-LAN VPN between two Pfsense firewalls, by using IPSEC protocols. I succeeded, with both firewalls in the same network. But I'm wondering if it's possible to do the same thing, with 2 Pfsense firewalls...
0
3938
by: adsilva | last post by:
A Windows Forms form does not have the event Unload, like VB6. What one acts like?
1
2410
by: 6302768590 | last post by:
Hai team i want code for transfer the data from one system to another through IP address by using C# our system has to for every 5mins then we have to update the data what the data is updated we have to send another system

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.