By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
440,222 Members | 2,416 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 440,222 IT Pros & Developers. It's quick & easy.

Queue.Queue-like class without the busy-wait

P: n/a
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.

Because I know someone will ask: no, the busy-waiting hasn't been a
problem in my app. I'm just interested in reading the code.

p

Jul 18 '05 #1
Share this Question
Share on Google+
29 Replies


P: n/a
Paul L. Du Bois wrote:
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.

Because I know someone will ask: no, the busy-waiting hasn't been a
problem in my app. I'm just interested in reading the code.


I don't believe the term "busy-wait" applies here.
Busy-waiting is (in my experience) where you run in
tight loops without releasing the CPU, consuming all
CPU time available.

I'm fairly certain that while Queue (and other things
that wait in Python) does have a loop, you'll find that
it's not a busy-waiting loop and you definitely don't
get 100% CPU usage during the wait. It goes to sleep
for increasingly large periods of time while waiting,
if the code in threading._Condition is any indication.

-Peter
Jul 18 '05 #2

P: n/a
Peter Hansen wrote:
I don't believe the term "busy-wait" applies here.
[Explanation]


Yes, well, you're right. I was thinking of calling it
"slacker-waiting" but didn't want to come off too cute.

p

Jul 18 '05 #3

P: n/a
Op 2005-03-24, Paul L. Du Bois schreef <po******@gmail.com>:
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.


I started once, using the Timer class in the Threading Module to
break the lock. However the Timer class uses the same kind of
sleep-polling loop, to delay the exection and allow an intermediate
cancel, as the loop that is used in Queue.Queue, so that was no
gain.

I have still played with the idea, but haven't worked anything out
since then.

--
Antoon Pardon
Jul 18 '05 #4

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I started once, using the Timer class in the Threading Module to
break the lock. However the Timer class uses the same kind of
sleep-polling loop, to delay the exection and allow an intermediate
cancel, as the loop that is used in Queue.Queue, so that was no
gain.


I've never checked this code but it wouldn't have occurred to me that
Queue uses any kind of timeout loop. Can't it work the obvious way
with a semaphore?
Jul 18 '05 #5

P: n/a
Op 2005-03-25, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I started once, using the Timer class in the Threading Module to
break the lock. However the Timer class uses the same kind of
sleep-polling loop, to delay the exection and allow an intermediate
cancel, as the loop that is used in Queue.Queue, so that was no
gain.


I've never checked this code but it wouldn't have occurred to me that
Queue uses any kind of timeout loop. Can't it work the obvious way
with a semaphore?


And how is this semaphore going to be released if the timeout is
reached?

--
Antoon Pardon
Jul 18 '05 #6

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I've never checked this code but it wouldn't have occurred to me that
Queue uses any kind of timeout loop. Can't it work the obvious way
with a semaphore?


And how is this semaphore going to be released if the timeout is
reached?


I meant a semaphore to synchronize the queue when adding or removing
objects. Timeout would be handled with sigalarm or select.

Jul 18 '05 #7

P: n/a
Op 2005-03-25, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
> I've never checked this code but it wouldn't have occurred to me that
> Queue uses any kind of timeout loop. Can't it work the obvious way
> with a semaphore?
And how is this semaphore going to be released if the timeout is
reached?


I meant a semaphore to synchronize the queue when adding or removing
objects.


Last I looked there was a lock used for that.

The loop is only for when you cant remove or add an element immediatly
and there is a timeout.
Timeout would be handled with sigalarm or select.


How is select going to help? IMO you can't put a Queue in a select call.
And it is doubtfull if working with sigalarm will do the trick.

First of all is the problem the signal module in python is very limited.
IIRC all signals are routed to the main thread. So breaking a lock
by having the thread signaled is impossible in python.

You may provide your own signal module, but that may not be enough.
The last time I experimented with a pthreads in C, locks didn't
break by signalling the thread. That might be a bug, but I wouldn't
know since I'm not familiar with the pthread specifications.

--
Antoon Pardon

Jul 18 '05 #8

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I meant a semaphore to synchronize the queue when adding or removing
objects.
Last I looked there was a lock used for that.


OK, that amounts to the same thing.
The loop is only for when you cant remove or add an element immediatly
and there is a timeout.
Timeout would be handled with sigalarm or select.


How is select going to help? IMO you can't put a Queue in a select call.
And it is doubtfull if working with sigalarm will do the trick.


You could open a socket to your own loopback port and then select on
it, or something like that. The select call takes a timeout parameter.
First of all is the problem the signal module in python is very limited.
IIRC all signals are routed to the main thread. So breaking a lock
by having the thread signaled is impossible in python.


A signal handler in the main thread could release a lock that the
thread is waiting on.
Jul 18 '05 #9

P: n/a
Op 2005-03-25, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
> I meant a semaphore to synchronize the queue when adding or removing
> objects.


Last I looked there was a lock used for that.


OK, that amounts to the same thing.
The loop is only for when you cant remove or add an element immediatly
and there is a timeout.

> Timeout would be handled with sigalarm or select.


How is select going to help? IMO you can't put a Queue in a select call.
And it is doubtfull if working with sigalarm will do the trick.


You could open a socket to your own loopback port and then select on
it, or something like that. The select call takes a timeout parameter.


Well maybe you could use an os.pipe as a timeout lock then. When the lock is
instantiated you put one byte in it. Aquiring the lock is implemented by
reading one byte, releasing the lock is implemented by writing a byte.
Aquiring the lock with a timeout would make use of select.

It would require carefull coding, since you want to prevent the thread
blocking because of select returning indicating it could be read, but
between the select and the actual read an other thread already consumed
the byte.
First of all is the problem the signal module in python is very limited.
IIRC all signals are routed to the main thread. So breaking a lock
by having the thread signaled is impossible in python.


A signal handler in the main thread could release a lock that the
thread is waiting on.


This wouldn't work. A thread would have no way knowing for what
purpose the lock was released, because the lock was released
by the thread holding the lock or because the signal handler
released the lock, both would look the same for the thread
aquiring the lock.

There is also the problem you can't direct which thread will
aquire the lock. Suppose you have two threads waiting on a
lock, one plain, one with a timeout. Your signal handler
kicks in and releases the lock. There is a good chance
the first thread will now aquire the thread and the thread
that used a timeout will continue to be blocked.
Jul 18 '05 #10

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
Well maybe you could use an os.pipe as a timeout lock then. When the lock is
instantiated you put one byte in it. Aquiring the lock is implemented by
reading one byte, releasing the lock is implemented by writing a byte.
Aquiring the lock with a timeout would make use of select.


Hmm, if I understand what you're saying, you'd need a separate pipe
for every lock. Interesting idea but I think it would burn too many
file descriptors if you used a lot of them. My mentioning select also
is showing my age. That was the way of doing short sleeps before
usleep became widespread.
A signal handler in the main thread could release a lock that the
thread is waiting on.


This wouldn't work. A thread would have no way knowing for what
purpose the lock was released, because the lock was released
by the thread holding the lock or because the signal handler
released the lock, both would look the same for the thread
aquiring the lock.


Yes, you'd need a separate lock for each blocked thread. There would
be a list of locks waiting for timeouts and the sigalarm handler would
release any for which a wakeup was due. You could use a priority
queue to maintain the timeout list, so that adding or servicing a
timeout would be O(log n).

The current implementation appears to wake up every few msec (50 msec
maximum if the thread stays blocked a long time) and check if the
timeout has expired. If you have 1000 threads doing that, it can
burn a fair amount of cycles.
Jul 18 '05 #11

P: n/a
Op 2005-03-25, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
Well maybe you could use an os.pipe as a timeout lock then. When the lock is
instantiated you put one byte in it. Aquiring the lock is implemented by
reading one byte, releasing the lock is implemented by writing a byte.
Aquiring the lock with a timeout would make use of select.


Hmm, if I understand what you're saying, you'd need a separate pipe
for every lock. Interesting idea but I think it would burn too many
file descriptors if you used a lot of them. My mentioning select also
is showing my age. That was the way of doing short sleeps before
usleep became widespread.
> A signal handler in the main thread could release a lock that the
> thread is waiting on.


This wouldn't work. A thread would have no way knowing for what
purpose the lock was released, because the lock was released
by the thread holding the lock or because the signal handler
released the lock, both would look the same for the thread
aquiring the lock.


Yes, you'd need a separate lock for each blocked thread. There would
be a list of locks waiting for timeouts and the sigalarm handler would
release any for which a wakeup was due. You could use a priority
queue to maintain the timeout list, so that adding or servicing a
timeout would be O(log n).


Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme. I didn't
like using a signal handler because you never know what other
modules might have use for signals and how they might interfere.
Try it out and let me know what you think.
Not thoroughly tested:

------------------------------- tlock.py --------------------------------------

import threading
import os

import time

from heapq import heappush, heappop
from weakref import ref

from select import select

heapmutex = threading.Lock()
heap = []
heappipe = os.pipe()
sentinel = 365.25 * 50 * 24 * 3600 # 50 jaar
heappush(heap, (time.time() + sentinel, None, None, None))

class _Plock:

def __init__(self):
self.mutex = threading.Lock()
self.broken = False

def acquire(self):
self.mutex.acquire()

def release(self):
self.mutex.release()

class TimeOut(Exception):
pass
class Tlock:

def __init__(self):
self.mutex = threading.Lock()
self.locktable = [_Plock()]

def acquire(self, timeout=None):
self.mutex.acquire()
newlock = _Plock()
newlock.acquire()
self.locktable.append(newlock)
prevlock = self.locktable[-2]
if len(self.locktable) > 2 and timeout is not None:
heapmutex.acquire()
heappush(heap, (time.time() + timeout, ref(prevlock), self.mutex, self.locktable))
os.write(heappipe[1] , '-')
heapmutex.release()
self.mutex.release()
prevlock.acquire()
if prevlock.broken:
raise TimeOut, "lock timed out"

def release(self):
self.mutex.acquire()
self.locktable[0].release()
del self.locktable[0]
self.locktable[0].release()
self.mutex.release()
def lock_breaker():

heapfd = heappipe[0]
while True:
heapmutex.acquire()
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
while timeout <= 0.0:
lck = pl()
if lck is not None:
mutex.acquire()
try:
try:
i = table.index(lck, 1)
del table[i]
lck.broken = True
lck.release()
# lck.release()
except ValueError:
pass
finally:
mutex.release()
heappop(heap)
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
heapmutex.release()
rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
if rdlst:
os.read(rdlst[0],1)

breaker = threading.Thread(target = lock_breaker)
breaker.setDaemon(True)
breaker.start()

if __name__ == "__main__":

from time import sleep
from random import randint

T = Tlock()

rg = 5

def thrd(Id):

for Nr in xrange(20):
try:
print "Trying %d (loop %d)" % (Id, Nr)
T.acquire(randint(0,rg))
print "Entering %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
print "Leaving %d (loop %d)" % (Id, Nr)
T.release()
except TimeOut, ErrId:
print "Failed %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
for i in xrange(rg):
th = threading.Thread(target=thrd, args=(i,))
th.start()
Jul 18 '05 #12

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme.


Thanks, I'll look at it. Why don't you use usleep instead of a pipe?
I decided over the weekend that using a separate thread with usleep is
the simplest thing to do in pure Python. If you want to use sigalarm,
that should be put in the low level C sigalarm handler so it gets
taken care separately from the Python interpreter does anything with
the signal. Locks should also be low level primitives.

I don't know how this stuff maps onto PyPy but I sincerely hope the
looping stuff goes away. Having a series of processing steps
connected by queues is a perfectly good way to organize a program, but
if there's even just 20 steps, then waiting for all those 50 msec
wakeups adds a whole second to the processing latency if the system
has been sleeping for a while. The latency really only needs to be in
the microseconds on a system with efficient enough threading.
Jul 18 '05 #13

P: n/a
Op 2005-03-29, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme.
Thanks, I'll look at it. Why don't you use usleep instead of a pipe?


Because with the pipe the "sleep" can be indeterminate.

The select make the thread sleep until either of the folowing
happens.

1) A timeout, which means one of the locks has to be broken

2) A byte was received. This means a lock was tried to be
acquired and inserted in the heap, so the timeout may
need to be recalculated. (acquiring a lock, sends a
byte over the pipe)
I decided over the weekend that using a separate thread with usleep is
the simplest thing to do in pure Python.


I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.

--
Antoon Pardon
Jul 18 '05 #14

P: n/a
Op 2005-03-29, Antoon Pardon schreef <ap*****@forel.vub.ac.be>:
Op 2005-03-29, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme.


Thanks, I'll look at it. Why don't you use usleep instead of a pipe?


Because with the pipe the "sleep" can be indeterminate.

The select make the thread sleep until either of the folowing
happens.

1) A timeout, which means one of the locks has to be broken

2) A byte was received. This means a lock was tried to be
acquired and inserted in the heap, so the timeout may
need to be recalculated. (acquiring a lock, sends a
byte over the pipe)
I decided over the weekend that using a separate thread with usleep is
the simplest thing to do in pure Python.


I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.


And here is a small patch for it. It corrects the acquiring and
releasing of the heapmutex.

--- tlock.py 2005-03-29 14:25:09.000000000 +0200
+++ src/python/tlock.py 2005-03-29 14:25:43.000000000 +0200
@@ -67,6 +67,7 @@
heapmutex.acquire()
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
+ heapmutex.release()
while timeout <= 0.0:
lck = pl()
if lck is not None:
@@ -82,10 +83,11 @@
pass
finally:
mutex.release()
+ heapmutex.acquire()
heappop(heap)
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
- heapmutex.release()
+ heapmutex.release()
rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
if rdlst:
os.read(rdlst[0],1)
@@ -107,17 +109,17 @@

for Nr in xrange(20):
try:
- print "Trying %d (loop %d)" % (Id, Nr)
+ print "Trying %2d (loop %2d)" % (Id, Nr)
T.acquire(randint(0,rg))
- print "Entering %d (loop %d)" % (Id, Nr)
+ print "Entering %2d (loop %2d)" % (Id, Nr)
sleep(randint(0,rg))
- print "Leaving %d (loop %d)" % (Id, Nr)
+ print "Leaving %2d (loop %2d)" % (Id, Nr)
T.release()
except TimeOut, ErrId:
- print "Failed %d (loop %d)" % (Id, Nr)
+ print "Failed %2d (loop %2d)" % (Id, Nr)
sleep(randint(0,rg))
- for i in xrange(rg):
+ for i in xrange(5 * rg):
th = threading.Thread(target=thrd, args=(i,))
th.start()
Jul 18 '05 #15

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.


I think I understand. My original idea was to use a heapq to be able
to know exactly when the next pending timeout is due, and usleep for
long enough to wake up at just the right time. Then you service the
timeout, pop the heap to find when the next timeout after that is, and
usleep again. No polling loops and no pipe. But it could be that
someone inserts a new timeout while you're sleeping, that's due before
you're scheduled to wake up. Your pipe scheme takes care of that,
since any thread can write to the pipe and wake up the blocked thread
at any time.

Really, the culprit here is the weak signalling scheme in Python.
There needs to be a way to send signals to threads, or raise
asynchronous exceptions in them. There's been some discussion in
sourceforge about that, but the issues involved are complex.

I think the best bet for the short term is handling it at the C level,
with sigalarm. Another way is to have chained sigalarm handlers in
the main thread.
Jul 18 '05 #16

P: n/a
Op 2005-03-29, Paul Rubin schreef <http>:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.
I think I understand. My original idea was to use a heapq to be able
to know exactly when the next pending timeout is due, and usleep for
long enough to wake up at just the right time. Then you service the
timeout, pop the heap to find when the next timeout after that is, and
usleep again. No polling loops and no pipe. But it could be that
someone inserts a new timeout while you're sleeping, that's due before
you're scheduled to wake up. Your pipe scheme takes care of that,
since any thread can write to the pipe and wake up the blocked thread
at any time.


Right, that is the idea.
Really, the culprit here is the weak signalling scheme in Python.
There needs to be a way to send signals to threads, or raise
asynchronous exceptions in them. There's been some discussion in
sourceforge about that, but the issues involved are complex.
Well I have raised this issue before and as far as I understand,
the big problem seems to be the various kind of behaviour you
can get depending on what platform you are working, so writing
a module so that python programs behave the same on various
platforms seems a hell of a job.

So I decided not to pester the python people for this kind
of functionality, although I would very much like to have
it.

I have been playing with the C-API and have somekind of
class that allows one thread to raise an excetion in an
other but that wouldn't be a solution here, since the
raised exception will not manifest itself while the
thread is in a C-function.
I think the best bet for the short term is handling it at the C level,
with sigalarm. Another way is to have chained sigalarm handlers in
the main thread.


Possible, but I don't have the time to investigate that
possibility now.

--
Antoon Pardon
Jul 18 '05 #17

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
There needs to be a way to send signals to threads, or raise
asynchronous exceptions in them. There's been some discussion in
sourceforge about that, but the issues involved are complex.
Well I have raised this issue before and as far as I understand, the
big problem seems to be the various kind of behaviour you can get
depending on what platform you are working, so writing a module so
that python programs behave the same on various platforms seems a
hell of a job.


I think there are some issues that are even more fundamentally
confusing. It's hard to figure out exactly what behavior is
desirable, let alone how to implement it.
So I decided not to pester the python people for this kind of
functionality, although I would very much like to have it.
Yes, I hope something happens sometime, but I don't have any immediate
concrete suggestions. I'm not enough of a concurrency whiz to know how
other languages handle it.
I have been playing with the C-API and have somekind of class that
allows one thread to raise an excetion in an other but that wouldn't
be a solution here, since the raised exception will not manifest
itself while the thread is in a C-function.


Yes, that sounds hairy if done in a general way.
I think the best bet for the short term is handling it at the C level,
with sigalarm. Another way is to have chained sigalarm handlers in
the main thread.


Possible, but I don't have the time to investigate that possibility now.


Actually there's a simple and obvious approach: Linux and Windows both
already implement semaphore objects with timeouts (see "man semop"
under Linux). Other modern Unixes probably also have them. So I'd
think it would be straightforward to just make a C module that wraps
these semaphores with the C API.
Jul 18 '05 #18

P: n/a
Op 2005-03-30, Paul Rubin schreef <http>:
> I think the best bet for the short term is handling it at the C level,
> with sigalarm. Another way is to have chained sigalarm handlers in
> the main thread.


Possible, but I don't have the time to investigate that possibility now.


Actually there's a simple and obvious approach: Linux and Windows both
already implement semaphore objects with timeouts (see "man semop"
under Linux). Other modern Unixes probably also have them. So I'd
think it would be straightforward to just make a C module that wraps
these semaphores with the C API.


I'm not sure that this would be an acceptable approach. I did the man
semop and it indicates this is part of system V IPC. This makes me
fear that semaphores will use file descriptors or other resources
that are only available in a limited amount. Not usefull if you are
talking about thousands of threads.

--
Antoon Pardon
Jul 18 '05 #19

P: n/a
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I'm not sure that this would be an acceptable approach. I did the man
semop and it indicates this is part of system V IPC. This makes me
fear that semaphores will use file descriptors or other resources
that are only available in a limited amount. Not usefull if you are
talking about thousands of threads.


That would be terrible, if semaphores are as heavy as file descriptors.
I'd like to hope the OS's are better designed than that.

So, if we have to do this at user level, the two best choices seem to
be either your pipe method, or an asynchronous sigalarm handler that
goes and releases any timed out locks. The sigalarm method is
conceptually cleaner, but it involves hacking C code, so it's not so
easy. Plus I think using sigalarm results in more total system calls
if there's lots of timeouts. I do believe that the current scheme
with the slow-motion spinlocks is pretty revolting and that any of the
alternatives we've discussed are better.

I wonder what the Pypy folks are doing. It would be great if they
have some general treatment of asynchronous exceptions.
Jul 18 '05 #20

P: n/a
Cool Code!

One possible sticking point is that I think select only works on
network sockets on windows. This would make the code not crossplatforn.

john

Jul 18 '05 #21

P: n/a
Thinking about cross-platform issues. I found this, from the venerable
Tim Peters to be enlightening for python's choice of design:

"It's possible to build a better Queue implementation that runs only on
POSIX systems, or only on Windows systems, or only on one of a dozen
other less-popular target platforms. The current implementation works
fine on all of them, although is suboptimal compared to what could be
done in platform-specific Queue implementations. "

Here is a link:
http://groups-beta.google.com/group/...1f680b2dac320c

The whole thread (oops a pun) is worth a read.

john

Jul 18 '05 #22

P: n/a
Op 2005-03-31, py****@gmail.com schreef <py****@gmail.com>:
Cool Code!

One possible sticking point is that I think select only works on
network sockets on windows. This would make the code not crossplatforn.


As far as I understand, what I did with pipes, can be done just as
fine with network sockets. If someone want to rewrite the code
to do so and make the code more crossplatform, I'll happily
incorperate it. I'm just not familiar enough with sockets to do
it myself.

--
Antoon Pardon
Jul 18 '05 #23

P: n/a
"Paul L. Du Bois" <po******@gmail.com> writes:
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.


This isn't a Queue.Queue replacement, but it implements a buffer
intended for inter-thread transmission, so it could be adjusted to
mimic Queue semantics fairly easily. In fact, internally it actually
keeps write chunks in a list until read for better performance, so
just removing the coalesce process would be the first step.

It was written specifically to minimize latency (which is a
significant issue with the polling loop in the normal Python Queue
implementation) and CPU usage in support of a higher level
Win32-specific serial I/O class, so it uses Win32 events to handle the
signaling for the key events when waiting.

The fundamental issue with the native Python lock is that to be
minimalistic in what it requires from each OS, it doesn't impose a
model of being able to wait on an event signal - that's the key thing
you need to have (a timed blocking wait on some signalable construct)
to be most efficient for these operations - which is what I use the
Win32 Event for.

-- David

- - - - - - - - - - - - - - - - - - - - - - - - -

import thread
import win32event as we

class Buffer:
"""A thread safe unidirectional data buffer used to represent data
traveling to or from the application and serial port handling threads.

This class is used as an underlying implementation mechanism by SerialIO.
Application code should not typically need to access this directly, but
can handle I/O through SerialIO.

Note that we use Windows event objects rather than Python's because
Python's OS-independent versions are not very efficient with timed waits,
imposing internal latencies and CPU usage due to looping around a basic
non-blocking construct. We also use the lower layer thread lock rather
than threading's to minimize overhead.
"""

def __init__(self, notify=None):
self.lock = thread.allocate_lock()
self.has_data = we.CreateEvent(None,1,0,None)
self.clear()
self.notify = notify

def _coalesce(self):
if self.buflist:
self.buffer += ''.join(self.buflist)
self.buflist = []

def __len__(self):
self.lock.acquire()
self._coalesce()
result = len(self.buffer)
self.lock.release()
return result

def clear(self):
self.lock.acquire()
self.buffer = ''
self.buflist = []
self.lock.release()

def get(self, size=0, timeout=None):
"""Retrieve data from the buffer, up to 'size' bytes (unlimited if
0), but potentially less based on what is available. If no
data is currently available, it will wait up to 'timeout' seconds
(forever if None, no blocking if 0) for some data to arrive"""

self.lock.acquire()
self._coalesce()

if not self.buffer:
# Nothing buffered, wait until something shows up (timeout
# rules match that of threading.Event)
self.lock.release()
if timeout is None:
win_timeout = we.INFINITE
else:
win_timeout = int(timeout * 1000)
rc = we.WaitForSingleObject(self.has_data, win_timeout)
self.lock.acquire()
self._coalesce()

if not size:
size = len(self.buffer)

result_len = min(size,len(self.buffer))
result = self.buffer[:result_len]
self.buffer = self.buffer[result_len:]
we.ResetEvent(self.has_data)
self.lock.release()
return result

def put_back(self,data):
self.lock.acquire()
self.buffer = data + self.buffer
self.lock.release()
we.SetEvent(self.has_data)
if self.notify:
self.notify()

def put(self, data):
self.lock.acquire()
self.buflist.append(data)
self.lock.release()
we.SetEvent(self.has_data)
if self.notify:
self.notify()
Jul 18 '05 #24

P: n/a
Have you looked at this? A paper about adding asynchronous exceptions
to Python.

http://www.cs.williams.edu/~freund/papers/02-lwl2.ps
Jul 18 '05 #25

P: n/a
Paul Rubin <http> wrote:
Antoon Pardon <ap*****@forel.vub.ac.be> writes:
I'm not sure that this would be an acceptable approach. I did the man
semop and it indicates this is part of system V IPC. This makes me
fear that semaphores will use file descriptors or other resources
that are only available in a limited amount. Not usefull if you are
talking about thousands of threads.


That would be terrible, if semaphores are as heavy as file descriptors.
I'd like to hope the OS's are better designed than that.


I believe futex is the thing you want for a modern linux. Not
very portable though.

From futex(4)

The Linux kernel provides futexes ('Fast Userspace muTexes') as a
building block for fast userspace locking and semaphores. Futexes are
very basic and lend themselves well for building higher level locking
abstractions such as POSIX mutexes.

This page does not set out to document all design decisions but
restricts itself to issues relevant for application and library devel-
opment. Most programmers will in fact not be using futexes directly but
instead rely on system libraries built on them, such as the NPTL
pthreads implementation.

A futex is identified by a piece of memory which can be shared between
different processes. In these different processes, it need not have
identical addresses. In its bare form, a futex has semaphore semantics;
it is a counter that can be incremented and decremented atomically;
processes can wait for the value to become positive.

Futex operation is entirely userspace for the non-contended case. The
kernel is only involved to arbitrate the contended case. As any sane
design will strive for non-contension, futexes are also optimised for
this situation.

In its bare form, a futex is an aligned integer which is only touched
by atomic assembler instructions. Processes can share this integer over
mmap, via shared segments or because they share memory space, in which
case the application is commonly called multithreaded.

--
Nick Craig-Wood <ni**@craig-wood.com> -- http://www.craig-wood.com/nick
Jul 18 '05 #26

P: n/a
Nick Craig-Wood <ni**@craig-wood.com> writes:
I believe futex is the thing you want for a modern linux. Not
very portable though.


That's really cool, but I don't see how it can be a pure userspace
operation if the futex has a timeout. The kernel must need to keep
track of the timeouts. However, since futexes can be woken by any
thread, the whole thing can be done with just one futex. In fact the
doc mentions something about using a file descriptor to support
asynchronous wakeups, but it's confusing whether that applies here.
Jul 18 '05 #27

P: n/a
Paul Rubin <http> wrote:
Nick Craig-Wood <ni**@craig-wood.com> writes:
I believe futex is the thing you want for a modern linux. Not
very portable though.


That's really cool, but I don't see how it can be a pure userspace
operation if the futex has a timeout. The kernel must need to keep
track of the timeouts. However, since futexes can be woken by any
thread, the whole thing can be done with just one futex. In fact the
doc mentions something about using a file descriptor to support
asynchronous wakeups, but it's confusing whether that applies here.


No it isn't pure user space, only for the non-contended case which for
most locks is the most frequent operation.

Futex operation is entirely userspace for the non-contended
case. The kernel is only involved to arbitrate the contended
case. As any sane design will strive for non-contension,
futexes are also optimised for this situation.

--
Nick Craig-Wood <ni**@craig-wood.com> -- http://www.craig-wood.com/nick
Jul 18 '05 #28

P: n/a
py****@gmail.com <py****@gmail.com> wrote:
Thinking about cross-platform issues. I found this, from the venerable
Tim Peters to be enlightening for python's choice of design:

"It's possible to build a better Queue implementation that runs only on
POSIX systems, or only on Windows systems, or only on one of a dozen
other less-popular target platforms. The current implementation works
fine on all of them, although is suboptimal compared to what could be
done in platform-specific Queue implementations. "

Here is a link:
http://groups-beta.google.com/group/...1f680b2dac320c


Interesting thread.

How about leaving the current threading alone, but adding a pthreads
module for those OSes which can use or emulate posix threads? Which is
windows and most unixes?
--
Nick Craig-Wood <ni**@craig-wood.com> -- http://www.craig-wood.com/nick
Jul 18 '05 #29

P: n/a
Op 2005-04-02, Paul Rubin schreef <http>:
Have you looked at this? A paper about adding asynchronous exceptions
to Python.

http://www.cs.williams.edu/~freund/papers/02-lwl2.ps


Looks interresting, but I doubt python will have it in the near
future. I'm very pessimitic about python development in this
area.

--
Antoon Pardon
Jul 18 '05 #30

This discussion thread is closed

Replies have been disabled for this discussion.