By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
446,142 Members | 1,937 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 446,142 IT Pros & Developers. It's quick & easy.

Suggested generator to add to threading module.

P: n/a
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next

I considered suggesting it for itertools, but really it's thread
specific so I am suggesting it for the threading module.

Andrae Muys
Jul 18 '05 #1
Share this Question
Share on Google+
17 Replies


P: n/a
Andrae Muys wrote:
I considered suggesting it for itertools, but really it's thread
specific so I am suggesting it for the threading module.


If you are willing to contribute this function, please submit
a patch to sf.net/projects/python, including documentation changes,
and test cases.

Regards,
Martin

Jul 18 '05 #2

P: n/a
In article <79**************************@posting.google.com >,
Andrae Muys <am***@shortech.com.au> wrote:

Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next


I'm not sure this is generic enough to go in the standard library.
Usually, I'd recommend that someone wanting this functionality consider
other options in addition to this (such as using Queue.Queue()).
--
Aahz (aa**@pythoncraft.com) <*> http://www.pythoncraft.com/

A: No.
Q: Is top-posting okay?
Jul 18 '05 #3

P: n/a
Andrae Muys wrote:
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next


Is there any reason why the lock is not shared among threads?
From the looks of this, it doesn't synchronize anything
between different threads. Am I missing something?

Kind regards,
Ype

email at xs4all.nl
Jul 18 '05 #4

P: n/a
On Fri, Jan 16, 2004 at 08:42:36PM +0100, Ype Kingma wrote:
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next


Is there any reason why the lock is not shared among threads?
From the looks of this, it doesn't synchronize anything

between different threads. Am I missing something?


Yes, I think so. You'd use the same "serialise" generator object in
multiple threads, like this:

p = seralise(producer_generator())
threads = [thread.start_new(worker_thread, (p,))
for t in range(num_workers)]

Jeff

Jul 18 '05 #5

P: n/a
[Andrae Muys]
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next

[Ype Kingma] Is there any reason why the lock is not shared among threads?
From the looks of this, it doesn't synchronize anything
between different threads. Am I missing something?

[Jeff Epler] Yes, I think so. You'd use the same "serialise" generator object in
multiple threads, like this:

p = seralise(producer_generator())
threads = [thread.start_new(worker_thread, (p,))
for t in range(num_workers)]


Hmm. I think Ype is right: the above code does not correctly serialise
access to a generator.

The above serialise function is a generator which wraps a generator.
This presumably is in order to prevent the wrapped generators .next()
method being called simultaneously from multiple threads (which is
barred: PEP 255: "Restriction: A generator cannot be resumed while it
is actively running")

http://www.python.org/peps/pep-0255.html

However, the above implementation re-creates the problem by using an
outer generator to wrap the inner one. The outer's .next() method will
then potentially be called simultaneously by multiple threads. The
following code illustrates the problem

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next

def squares(n):
i = 1
while i < n:
yield i*i
i = i+1

def worker_thread(iter, markers):
markers[thread.get_ident()] = 1
results = [] ; clashes = 0
while 1:
try:
results.append(iter.next())
except StopIteration:
break
except ValueError, ve:
if str(ve) == "generator already executing":
clashes = clashes + 1
del markers[thread.get_ident()]
print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
len(results), clashes)

numthreads = 10 ; threadmarkers = {}
serp = serialise(squares(100))
threads = [thread.start_new_thread(worker_thread,\
(serp, threadmarkers)) for t in xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
time.sleep(0.1)
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I believe that the following definition of serialise will correct the
problem (IFF I've understood the problem correctly :-)

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

class serialise:
"Wrap a generator in an iterator for thread-safe access"

def __init__(self, gen):
self.lock = threading.Lock()
self.gen = gen

def __iter__(self):
return self

def next(self):
self.lock.acquire()
try:
return self.gen.next()
finally:
self.lock.release()

def squares(n):
i = 1
while i < n:
yield i*i
i = i+1

def worker_thread(iter, markers):
markers[thread.get_ident()] = 1
results = [] ; clashes = 0
while 1:
try:
results.append(iter.next())
except StopIteration:
break
except ValueError, ve:
if str(ve) == "generator already executing":
clashes = clashes + 1
del markers[thread.get_ident()]
print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
len(results), clashes)

numthreads = 10 ; threadmarkers = {}
serp = serialise(squares(100))
threads = [thread.start_new_thread(worker_thread,\
(serp, threadmarkers)) for t in xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
time.sleep(0.1)
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

Also, I don't know if I'm happy with relying on the fact that the
generator raises StopIteration for *every* .next() call after the
actual generated sequence has ended. The above code depends on the
exhausted generator raising StopIteration in every thread. This seems
to me the kind of thing that might be python-implementation specific.
For example, the original "Simple Generators" specification, PEP 255,
makes no mention of expected behaviour of generators when multiple
calls are made to the its .next() method after the iteration is
exhausted. That I can see anyway? Am I wrong?

http://www.python.org/peps/pep-0255.html

regards,

--
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan: http://xhaus.com/contact/alan
Jul 18 '05 #6

P: n/a
Aahz wrote:
In article <79**************************@posting.google.com >,
Andrae Muys <am***@shortech.com.au> wrote:
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next

I'm not sure this is generic enough to go in the standard library.
Usually, I'd recommend that someone wanting this functionality consider
other options in addition to this (such as using Queue.Queue()).


While I fully appreciate the importance of a Queue.Queue in implementing
a producer/consumer task relationship, this particular function provides
serialised access to a *passive* data-stream. With the increasing
sophistication of itertools and I feel there maybe an argument for
supporting shared access to a generator.

Anyway I thought it was worth offering as a possible bridge between the
itertools and threading modules. If I'm mistaken, then it's no major loss.

Andrae

Jul 18 '05 #7

P: n/a
Alan,

you wrote:
[Andrae Muys]
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next
[Ype Kingma] Is there any reason why the lock is not shared among threads?
From the looks of this, it doesn't synchronize anything
between different threads. Am I missing something?

[Jeff Epler]
Yes, I think so. You'd use the same "serialise" generator object in
multiple threads, like this:

p = seralise(producer_generator())
threads = [thread.start_new(worker_thread, (p,))
for t in range(num_workers)]


Hmm. I think Ype is right: the above code does not correctly serialise
access to a generator.


Well, I just reread PEP 255, and I can assure you a was missing something...
The above serialise function is a generator which wraps a generator.
This presumably is in order to prevent the wrapped generators .next()
method being called simultaneously from multiple threads (which is
barred: PEP 255: "Restriction: A generator cannot be resumed while it
is actively running")

http://www.python.org/peps/pep-0255.html

However, the above implementation re-creates the problem by using an
outer generator to wrap the inner one. The outer's .next() method will
then potentially be called simultaneously by multiple threads. The
I agree (after rereading the PEP.)
following code illustrates the problem

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next

def squares(n):
i = 1
while i < n:
yield i*i
i = i+1

def worker_thread(iter, markers):
markers[thread.get_ident()] = 1
results = [] ; clashes = 0
while 1:
try:
results.append(iter.next())
except StopIteration:
break
except ValueError, ve:
if str(ve) == "generator already executing":
clashes = clashes + 1
del markers[thread.get_ident()]
print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
len(results), clashes)

numthreads = 10 ; threadmarkers = {}
serp = serialise(squares(100))
threads = [thread.start_new_thread(worker_thread,\
(serp, threadmarkers)) for t in xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
time.sleep(0.1)
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I believe that the following definition of serialise will correct the
problem (IFF I've understood the problem correctly :-)

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

class serialise:
"Wrap a generator in an iterator for thread-safe access"

def __init__(self, gen):
self.lock = threading.Lock()
self.gen = gen

def __iter__(self):
return self

def next(self):
self.lock.acquire()
try:
return self.gen.next()
finally:
self.lock.release()
Looks like a candidate for inclusion in a standard library to me.
def squares(n):
i = 1
while i < n:
yield i*i
i = i+1

def worker_thread(iter, markers):
markers[thread.get_ident()] = 1
results = [] ; clashes = 0
while 1:
try:
results.append(iter.next())
except StopIteration:
break
except ValueError, ve:
if str(ve) == "generator already executing":
clashes = clashes + 1
del markers[thread.get_ident()]
print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
len(results), clashes)

numthreads = 10 ; threadmarkers = {}
serp = serialise(squares(100))
threads = [thread.start_new_thread(worker_thread,\
(serp, threadmarkers)) for t in xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
time.sleep(0.1)
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

Also, I don't know if I'm happy with relying on the fact that the
generator raises StopIteration for *every* .next() call after the
actual generated sequence has ended. The above code depends on the
exhausted generator raising StopIteration in every thread. This seems
to me the kind of thing that might be python-implementation specific.
For example, the original "Simple Generators" specification, PEP 255,
makes no mention of expected behaviour of generators when multiple
calls are made to the its .next() method after the iteration is
exhausted. That I can see anyway? Am I wrong?


Quoting from PEP 234:
http://www.python.org/peps/pep-0234.html

"Once a particular iterator object has raised StopIteration, will
it also raise StopIteration on all subsequent next() calls?
....
Resolution: once StopIteration is raised, calling it.next()
continues to raise StopIteration."

Thanks to all for the help,

Ype

Jul 18 '05 #8

P: n/a
[Andrae Muys]
> Found myself needing serialised access to a shared generator from
> multiple threads. Came up with the following
>
> def serialise(gen):
> lock = threading.Lock()
> while 1:
> lock.acquire()
> try:
> next = gen.next()
> finally:
> lock.release()
> yield next
[Ype Kingma] Is there any reason why the lock is not shared among threads?
From the looks of this, it doesn't synchronize anything
between different threads. Am I missing something?
[Jeff Epler] Yes, I think so. You'd use the same "serialise" generator object in
multiple threads, like this:

p = seralise(producer_generator())
threads = [thread.start_new(worker_thread, (p,))
for t in range(num_workers)]

[Alan Kennedy] Hmm. I think Ype is right: the above code does not correctly serialise
access to a generator.
[Ype Kingma]
Well, I just reread PEP 255, and I can assure you a was missing
something...
Ype,

Ah: I see now. You thought it didn't work, but for a different reason
than the one I pointed out. You thought that the lock was not shared
between threads, though as Jeff pointed out, it is if you use it the
right way.

But it still doesn't work.

[Alan Kennedy] I believe that the following definition of serialise will correct the
problem (IFF I've understood the problem correctly :-)

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

class serialise:
"Wrap a generator in an iterator for thread-safe access"

def __init__(self, gen):
self.lock = threading.Lock()
self.gen = gen

def __iter__(self):
return self

def next(self):
self.lock.acquire()
try:
return self.gen.next()
finally:
self.lock.release()
[Ype Kingma] Looks like a candidate for inclusion in a standard library to me.
Well, maybe :-)

To be honest, I don't have the time to write test cases, docs and
patches. So I think I'll just leave it for people to find in the
Google Groups archives ...

[Alan Kennedy] Also, I don't know if I'm happy with relying on the fact that the
generator raises StopIteration for *every* .next() call after the
actual generated sequence has ended. The above code depends on the
exhausted generator raising StopIteration in every thread. This seems
to me the kind of thing that might be python-implementation specific.
For example, the original "Simple Generators" specification, PEP 255,
makes no mention of expected behaviour of generators when multiple
calls are made to the its .next() method after the iteration is
exhausted. That I can see anyway? Am I wrong?


[Ype Kingma] Quoting from PEP 234:
http://www.python.org/peps/pep-0234.html

"Once a particular iterator object has raised StopIteration, will
it also raise StopIteration on all subsequent next() calls?
...
Resolution: once StopIteration is raised, calling it.next()
continues to raise StopIteration."


Yes, that clears the issue up nicely. Thanks for pointing that out.

So the same code will run correctly in Jython 2.3 and IronPython
(awaited with anticipation).

regards,

--
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan: http://xhaus.com/contact/alan
Jul 18 '05 #9

P: n/a
[Subject line changed to allow thread to be found more easily in
google-groups]

Alan Kennedy <al****@hotmail.com> wrote in message news:<40***************@hotmail.com>...
[Alan Kennedy]
I believe that the following definition of serialise will correct the
problem (IFF I've understood the problem correctly :-)

It does look like the following version will work, I was too focused
on synchronising the underlying generator, and forgot that my code
also needed to be re-entrant. Thanks for catching my mistake.
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

class serialise:
"Wrap a generator in an iterator for thread-safe access"

def __init__(self, gen):
self.lock = threading.Lock()
self.gen = gen

def __iter__(self):
return self

def next(self):
self.lock.acquire()
try:
return self.gen.next()
finally:
self.lock.release()


[Ype Kingma]
Looks like a candidate for inclusion in a standard library to me.


Well, maybe :-)

To be honest, I don't have the time to write test cases, docs and
patches. So I think I'll just leave it for people to find in the
Google Groups archives ...


Andrae Muys
Jul 18 '05 #10

P: n/a
aa**@pythoncraft.com (Aahz) wrote in message news:<bu**********@panix1.panix.com>...
In article <79**************************@posting.google.com >,
Andrae Muys <am***@shortech.com.au> wrote:

Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next


I'm not sure this is generic enough to go in the standard library.
Usually, I'd recommend that someone wanting this functionality consider
other options in addition to this (such as using Queue.Queue()).


I'm curious to know how a Queue.Queue() provides the same
functionality? I have always considered a Queue.Queue() to be an
inter-thread communcation primitive. serialise() (at least the
corrected version discussed later in this thread) is strictly a
synchronisation primitive.

Andrae
Jul 18 '05 #11

P: n/a
In article <79**************************@posting.google.com >,
Andrae Muys <am***@shortech.com.au> wrote:
aa**@pythoncraft.com (Aahz) wrote in message news:<bu**********@panix1.panix.com>...
In article <79**************************@posting.google.com >,
Andrae Muys <am***@shortech.com.au> wrote:

Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next


I'm not sure this is generic enough to go in the standard library.
Usually, I'd recommend that someone wanting this functionality consider
other options in addition to this (such as using Queue.Queue()).


I'm curious to know how a Queue.Queue() provides the same
functionality? I have always considered a Queue.Queue() to be an
inter-thread communcation primitive. serialise() (at least the
corrected version discussed later in this thread) is strictly a
synchronisation primitive.


Well, yes; Queue.Queue() provides both synchronization *and* data
protection. In some ways, it's overkill for this specific problem, but
my experience is that there are so many different ways to approach this
class of problems and so many ways to screw up threaded applications,
it's best to learn one swiss-army knife that can handle almost everything
you want to throw at it.
--
Aahz (aa**@pythoncraft.com) <*> http://www.pythoncraft.com/

A: No.
Q: Is top-posting okay?
Jul 18 '05 #12

P: n/a
[Andrae Muys]
I'm curious to know how a Queue.Queue() provides the same
functionality? I have always considered a Queue.Queue() to be an
inter-thread communcation primitive.
Not exactly.

Queue.Queue is a *thread-safe* communication primitive: you're not
required to have seperate threads at both ends of a Queue.Queue, but
it is guaranteed to work correctly if you do have multiple threads.

From the module documentation

"""
The Queue module implements a multi-producer, multi-consumer FIFO
queue. It is especially useful in threads programming when information
must be exchanged safely between multiple threads. The Queue class in
this module implements all the required locking semantics. It depends
on the availability of thread support in Python.
"""

http://www.python.org/doc/current/lib/module-Queue.html
serialise() (at least the
corrected version discussed later in this thread) is strictly a
synchronisation primitive.


Just as Queue.Queue is a synchronisation primitive: a very flexible
and useful primitive that happens to be usable in a host of different
scenarios.

I think I'm with Aahz on this one: when faced with this kind of
problem, I think it is best to use a tried and tested inter-thread
communication paradigm, such as Queue.Queue. In this case, Queue.Queue
fits the problem (which is just a variation of the producer/consumer
problem) naturally. Also, I doubt very much if there is much excessive
resource overhead when using Queue.Queues.

As you've already seen from your first cut of the code, writing
thread-safe code is an error-prone process, and it's sometimes
difficult to figure out all the possibile calling combinations when
multiple threads are involved.

But if you'd used Queue.Queue, well this whole conversation would
never have come up, would it ;-)

regards,

--
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan: http://xhaus.com/contact/alan
Jul 18 '05 #13

P: n/a
[Andrae Muys]
Moved to email for higher bandwidth. Feel free to quote to usenet if
you desire.
[Alan Kennedy]
I think I'm with Aahz on this one: when faced with this kind of
problem, I think it is best to use a tried and tested inter-thread
communication paradigm, such as Queue.Queue. In this case, Queue.Queue
fits the problem (which is just a variation of the producer/consumer
problem) naturally. Also, I doubt very much if there is much excessive
resource overhead when using Queue.Queues.


[Andrae Muys]Well I'm puzzled, because I couldn't see an easy way to use Queue.Queue
to achieve this because this isn't a strict producer/consumer problem.
I am trying to synchronise multiple consumers, but I don't have a
producer. So the only way I can see to use Queue.Queue to achieve
this is to spawn a thread specifically to convert the iterator in to
a producer.


Andrae,

I thought it best to continue this discussion on UseNet, to perhaps
get more opinions.

Yes, you're right. Using a Queue in this situation does require the
use of a dedicated thread for the producer. There is no way to "pull"
values from a generator to multiple consumers through a Queue.Queue.
The values have to be "pushed" onto the Queue.Queue by some producing
thread of execution.

The way I see it, the options are

Option 1. Spawn a separate thread to execute the producing generator.
However, this has problems:-

A: How do the threads recognise the end of the generated sequence?
This is not a simple problem: the Queue simply being empty does not
necessarily signify the end of the sequence (e.g., the producer thread
might not be getting its fair share of CPU time).

B: The Queue acts as a (potentially infinite) buffer for the generated
values, thus eliminating one of the primary benefits of generators:
their efficient "generate when required" nature. This can be helped
somewhat by limiting the number of entries in the Queue, but it is
still slightly unsatisfactory.

C: A thread of execution has to be dedicated to the producer, thus
consuming resources.

Option 2. Fill the Queue with values from a main thread which executes
the generator to exhaustion. The consuming threads simply peel values
from the Queue. Although this saves on thread overhead, it is the
least desirable in terms of memory overhead: the number of values
generated by the generator and buffered in the Queue could be very
large.

Option 3. Use the same paradigm as your original paradigm, i.e. there
is no producer thread and the consuming threads are themselves
responsible for calling the generator.next() method: access to this
method is synchronised on a threading.Lock. I really like this
solution, because values are only generated on demand, with no
temporary storage of values required.

I think that an ideal solution would be to create a dedicated class
for synchronising a generator, as my example did, BUT to implement the
same interface as Queue.Queue, so that client code would remain
ignorant that it was dealing with a generator.

Here is my version of such a beast

# -=-=-=-=-= file GenQueue.py =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import threading

class Empty(Exception): pass
class Exhausted(StopIteration): pass
class IllegalOperation(Exception): pass

class GenQueue:
"Simulate a Queue.Queue, with values produced from a generator"

def __init__(self, gen):
self.lock = threading.Lock()
self.gen = gen

def __iter__(self):
return self

def _get(self, block=1):
if self.lock.acquire(block):
try:
try:
return self.gen.next()
except StopIteration:
raise Exhausted
finally:
self.lock.release()
else:
raise Empty

def next(self):
return self._get(1)

def get(self, block=1):
return self._get(block)

def get_nowait(self):
return self._get(0)

def put(self, item, block=1):
raise IllegalOperation

def put_nowait(self, item):
self.put(item, 0)

def full(self):
return False

def empty(self):
return False

def qsize(self):
return 1j

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

And here is some code that tests it

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

import sys
import time
import thread
import GenQueue

def squares(n):
i = 1
while i <= n:
yield i*i
i = i+1

def test_blockget(results, queue):
while 1:
try:
results.append(queue.get())
except GenQueue.Empty:
raise TestFailure
except GenQueue.Exhausted:
break

def test_iter(results, queue):
for v in queue:
results.append(v)

def test_noblockget(results, queue):
while 1:
try:
results.append(queue.get_nowait())
except GenQueue.Empty:
pass
except GenQueue.Exhausted:
break

def threadwrap(func, queue, markers):
markers[thread.get_ident()] = 1
results = []
func(results, queue)
print "Test %s: Thread %5s: %d results." % (func.__name__, \
thread.get_ident(), len(results))
del markers[thread.get_ident()]

def test():
numthreads = 10
for tfunc in (test_blockget, test_iter, test_noblockget):
print "Test: %s ------------------------------->" % tfunc.__name__
threadmarkers = {}
q = GenQueue.GenQueue(squares(100))
threads = [thread.start_new_thread(threadwrap,\
(tfunc, q, threadmarkers)) for t in
xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
time.sleep(0.1)

if __name__ == "__main__":
test()

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I find the combination of the iteration protocol and a Queue
intriguing: in this case, Queue.get() and iter.next() mean the same
thing. Or maybe I'm just being sucked in by the syntactic niceties of
something like

def worker(inq, outq):
for thing in inq: outq.put(thing.work())

I'm interested to hear other opinions about the commonalities and
differences between Queues and iterators.

One problem that is always in the back of my mind these days is how
one could write a dispatch-based coroutine scheduler that would work
efficiently when in communication (through Queue.Queues?) with
independently executing coroutine schedulers running on other
processors in the box. (And before you jump in shouting "Global
Interpreter Lock!", remember jython + generators will be able to do
this).

Not that I need such a thing: it's just a fun thing to think about,
like crosswords :-)

cheers,

--
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan: http://xhaus.com/contact/alan
Jul 18 '05 #14

P: n/a

Even easier:

Q = Queue.Queue()
Q.put(gen)
def thread():
a = Q.get()
use = a.next()
Q.put(a)
#do whatever you need

Of course you could just as easily use a single lock and a class:

class lockedgen:
def __init__(self, gen):
self.g = gen
self.l = threading.Lock()
def get(self):
self.l.acquire()
a = self.g.next()
self.l.release()
return a

generator = lockedgen(gen)
def thread():
use = generator.get()
#do what you need
- Josiah
Jul 18 '05 #15

P: n/a
In article <20**************************@uci.edu>,
Josiah Carlson <jc******@uci.edu> wrote:

Of course you could just as easily use a single lock and a class:

class lockedgen:
def __init__(self, gen):
self.g = gen
self.l = threading.Lock()
def get(self):
self.l.acquire()
a = self.g.next()
self.l.release()
return a


This is one case where you *DEFINITELY* want to use try/finally:

class lockedgen:
def __init__(self, gen):
self.g = gen
self.l = threading.Lock()
def get(self):
self.l.acquire()
try:
a = self.g.next()
finally:
self.l.release()
return a
--
Aahz (aa**@pythoncraft.com) <*> http://www.pythoncraft.com/

A: No.
Q: Is top-posting okay?
Jul 18 '05 #16

P: n/a
In article <40***************@hotmail.com>,
Alan Kennedy <al****@hotmail.com> wrote:

Yes, you're right. Using a Queue in this situation does require the
use of a dedicated thread for the producer. There is no way to "pull"
values from a generator to multiple consumers through a Queue.Queue.
The values have to be "pushed" onto the Queue.Queue by some producing
thread of execution.
Correct.
The way I see it, the options are

Option 1. Spawn a separate thread to execute the producing generator.
However, this has problems:-

A: How do the threads recognise the end of the generated sequence?
This is not a simple problem: the Queue simply being empty does not
necessarily signify the end of the sequence (e.g., the producer thread
might not be getting its fair share of CPU time).

B: The Queue acts as a (potentially infinite) buffer for the generated
values, thus eliminating one of the primary benefits of generators:
their efficient "generate when required" nature. This can be helped
somewhat by limiting the number of entries in the Queue, but it is
still slightly unsatisfactory.

C: A thread of execution has to be dedicated to the producer, thus
consuming resources.


There are a number of ways of mitigating A and B; they mostly involve
using an extra Queue.Queue to send tokens to the generator thread when a
consumer wants data. The generator thread then sends back a token that
(among other things) contains an attribute specifically for notifying
the consumer that the generator is exhausted. See
http://www.pythoncraft.com/OSCON2001...dPoolSpider.py
and
http://www.pythoncraft.com/OSCON2001/FibThreaded.py
for examples that show the technique, though they're not directly
relevant to this case.

My point is that I haven't (yet) seen many good use cases for sharing a
generator between threads, and I'm guessing that many people will try
using generators inappropriately for problems that really are better
suited to Queue.Queue.
--
Aahz (aa**@pythoncraft.com) <*> http://www.pythoncraft.com/

A: No.
Q: Is top-posting okay?
Jul 18 '05 #17

P: n/a
aa**@pythoncraft.com (Aahz) wrote in message news:<bu**********@panix1.panix.com>...
My point is that I haven't (yet) seen many good use cases for sharing a
generator between threads, and I'm guessing that many people will try
using generators inappropriately for problems that really are better
suited to Queue.Queue.


A globally unique ID, such as:

"What filename should I store this page under?"

The standard library has (several versions of) similar functionality
for temporary filenames. They aren't all threadsafe, they often
enforce the "temporary" aspect, they run into hashing collision
problems eventually, there is no good way to include even approximate
ordering information, etc...

The fact that these are in the standard library suggests that it is a
common use case. The fact that there are several different versions
each with their own problems suggests that the problem is hard enough
to justify putting a good solution in the library.

--

-jJ Take only memories. Leave not even footprints.
Jul 18 '05 #18

This discussion thread is closed

Replies have been disabled for this discussion.