Alex Martelli <al***@mac.comw rote:
DarkBlue <no****@nixmail .comwrote:
try for 10 seconds
if database.connec ted :
do your remote thing
except raise after 10 seconds
abort any connection attempt
do something else
Sure, see function setdefaulttimeo ut in module socket. Just call it as
you wish before trying the DB connection and catch the resulting
exception.
It would be nice to have language support for the general case, ie a
general purpose timeout.
In python 2.5 wouldn't it be nice to write
with timeout(seconds =10) as time_exceeded:
do potentially time consuming stuff not necessarily involving sockets
if time_exceeded:
report an error or whatever
Here is some code I came up with to implement this idea for
python2.3+. It should be cross platform but I only tested it on
linux. I think there may still be a threading bug in there (see
FIXME), but it seems to work. It requires ctypes for access to
PyThreadState_S etAsyncExc).
It is mostly test code as it took absolutely ages to debug! Threading
code is hard ;-)
............... ............... ............... ............... .
"""
General purpose timeout mechanism not using alarm(), ie cross platform
Eg
from timeout import Timeout, TimeoutError
def might_infinite_ loop(arg):
while 1:
pass
try:
Timeout(10, might_infinite_ loop, "some arg")
except TimeoutError:
print "Oops took too long"
else:
print "Ran just fine"
"""
import threading
import time
import sys
import ctypes
import os
class TimeoutError(Ex ception):
"""Thrown on a timeout"""
PyThreadState_S etAsyncExc = ctypes.pythonap i.PyThreadState _SetAsyncExc
_c_TimeoutError = ctypes.py_objec t(TimeoutError)
class Timeout(threadi ng.Thread):
"""
A General purpose timeout class
timeout is int/float in seconds
action is a callable
*args, **kwargs are passed to the callable
"""
def __init__(self, timeout, action, *args, **kwargs):
threading.Threa d.__init__(self )
self.action = action
self.args = args
self.kwargs = kwargs
self.stopped = False
self.exc_value = None
self.end_lock = threading.Lock( )
# start subtask
self.setDaemon( True) # FIXME this shouldn't be needed but is, indicating sub tasks aren't ending
self.start()
# Wait for subtask to end naturally
self.join(timeo ut)
# Use end_lock to kill the thread in a non-racy
# fashion. (Using isAlive is racy). Poking exceptions into
# the Thread cleanup code isn't a good idea either
if self.end_lock.a cquire(False):
# gained end_lock =sub thread is still running
# sub thread is still running so kill it with a TimeoutError
self.exc_value = TimeoutError()
PyThreadState_S etAsyncExc(self .id, _c_TimeoutError )
# release the lock so it can progress into thread cleanup
self.end_lock.r elease()
# shouldn't block since we've killed the thread
self.join()
# re-raise any exception
if self.exc_value:
raise self.exc_value
def run(self):
self.id = threading._get_ ident()
try:
self.action(*se lf.args, **self.kwargs)
except:
self.exc_value = sys.exc_value
# only end if we can acquire the end_lock
self.end_lock.a cquire()
if __name__ == "__main__":
def _spin(t):
"""Spins for t seconds"""
start = time.time()
end = start + t
while time.time() < end:
pass
def _test_time_limi t(name, expecting_time_ out, t_limit, fn, *args, **kwargs):
"""Test Timeout"""
start = time.time()
if expecting_time_ out:
print "Test",name,"sh ould timeout"
else:
print "Test",name,"sh ouldn't timeout"
try:
Timeout(t_limit , fn, *args, **kwargs)
except TimeoutError, e:
if expecting_time_ out:
print "Timeout generated OK"
else:
raise RuntimeError("W asn't expecting TimeoutError Here")
else:
if expecting_time_ out:
raise RuntimeError("W as expecting TimeoutError Here")
else:
print "No TimeoutError generated OK"
elapsed = time.time() - start
print "That took",elapsed," seconds for timeout of",t_limit
def test():
"""Test code"""
# NB the commented out bits of code don't work with this type
# of timeout nesting.
# no nesting
_test_time_limi t("simple #1", True, 5, _spin, 10)
_test_time_limi t("simple #2", False, 10, _spin, 5)
# 1 level of nesting
_test_time_limi t("nested #1", True, 4, _test_time_limi t,
"nested #1a", True, 5, _spin, 10)
_test_time_limi t("nested #2", False, 6, _test_time_limi t,
"nested #2a", True, 5, _spin, 10)
#_test_time_lim it("nested #3", True, 4, _test_time_limi t,
# "nested #3a", False, 10, _spin, 5)
_test_time_limi t("nested #4", False, 6, _test_time_limi t,
"nested #4a", False, 10, _spin, 5)
# 2 level of nesting
_test_time_limi t("nested #5", True, 3, _test_time_limi t,
"nested #5a", True, 4, _test_time_limi t,
"nested #5b", True, 5, _spin, 10)
#_test_time_lim it("nested #6", True, 3, _test_time_limi t,
# "nested #6a", False, 6, _test_time_limi t,
# "nested #6b", True, 5, _spin, 10)
#_test_time_lim it("nested #7", True, 3, _test_time_limi t,
# "nested #7a", True, 4, _test_time_limi t,
# "nested #7b", False, 10, _spin, 5)
#_test_time_lim it("nested #8", True, 3, _test_time_limi t,
# "nested #8a", False, 6, _test_time_limi t,
# "nested #8b", False, 10, _spin, 5)
_test_time_limi t("nested #9", False, 7, _test_time_limi t,
"nested #9a", True, 4, _test_time_limi t,
"nested #9b", True, 5, _spin, 10)
_test_time_limi t("nested #10", False, 7, _test_time_limi t,
"nested #10a",False, 6, _test_time_limi t,
"nested #10b",True, 5, _spin, 10)
#_test_time_lim it("nested #11", False, 7, _test_time_limi t,
# "nested #11a",True, 4, _test_time_limi t,
# "nested #11b",False, 10, _spin, 5)
_test_time_limi t("nested #12", False, 7, _test_time_limi t,
"nested #12a",False, 6, _test_time_limi t,
"nested #12b",False, 10, _spin, 5)
print "All tests OK"
test()
--
Nick Craig-Wood <ni**@craig-wood.com--
http://www.craig-wood.com/nick