469,646 Members | 1,624 Online
Bytes | Developer Community
New Post

Home Posts Topics Members FAQ

Post your question to a community of 469,646 developers. It's quick & easy.

Calling Queue experts

I have a script which is based on the following code. Unfortunately,
it only works on Python 2.3 and not 2.5 because there is no esema or
fsema attribute in the 2.5 Queue. I am hunting through the Queue.py
code now to try to figure out how to make it work in 2.5, but as I am
a beginner, I am having difficulty and would appreciate your help.

Many thanks

Jon

import os
import Queue
import threading
import time
import cPickle

class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""
def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.
The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
if self.queue:
self.esema.release()
if self._full():
self.fsema.acquire()
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = []
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = []
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()
def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()
def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue[0]
del self.queue[0]
self.__sync()
return item

class counterThread(threading.Thread):
numberQueue = PickleQueue('/export/home/jrpf/data.pkl')
exitCounterQueue = Queue.Queue(1)

def run(self):
command = ''
i = 0
while 1:
self.numberQueue.put(i)
if i 10:
print "i 10 so attempting to exit"
wt.exit()
self.exit()
print i
try:
command = self.exitCounterQueue.get(block=False)
except Queue.Empty:
pass
if command == 'exit':
print "Counter thread exited"
break
i = i + 1
time.sleep(1)

def exit(self):
self.exitCounterQueue.put('exit')

def main():

ct = counterThread()
ct.setDaemon(True)
ct.start()
ct.join()

if __name__ == "__main__":
main()

Mar 26 '07 #1
3 1875
Got it. New PickleQueue class should be as follows:

import Queue
import cPickle

class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""
def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.
The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
print self.queue

def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()
def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()
def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue.popleft()
self.__sync()
return item

Mar 26 '07 #2

jrpfinch # Some other I/O problem, reraise error
jrpfinch raise err

I'd just execute a bare raise (without err). That way the caller gets the
stack trace of the actual IOError.

Skip
Mar 26 '07 #3
En Mon, 26 Mar 2007 07:29:32 -0300, jrpfinch <jr******@gmail.comescribió:
Got it. New PickleQueue class should be as follows:
Only a comment:
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
self.readfile may be left open in case of error, I'd use a try/finally.
And since it isn't used anywhere, I'd just use a local variable instead of
an instance attribute.
And the final write is not necesary when you have just read it - and
alters the "last-modified-time" (that may not be relevant for you, of
course, but as a general tool it may confuse other users).

--
Gabriel Genellina

Mar 26 '07 #4

This discussion thread is closed

Replies have been disabled for this discussion.

Similar topics

16 posts views Thread by William Stacey [MVP] | last post: by
7 posts views Thread by Jeremy Chaney | last post: by
12 posts views Thread by KIRAN | last post: by
4 posts views Thread by j_depp_99 | last post: by
7 posts views Thread by jomcfall97 | last post: by
By using this site, you agree to our Privacy Policy and Terms of Use.