468,556 Members | 2,403 Online
Bytes | Developer Community
New Post

Home Posts Topics Members FAQ

Post your question to a community of 468,556 developers. It's quick & easy.

Question regarding Queue object

Hello!

I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part

WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

This is my current solution:

def _get_with_ids(self,wait, timeout, ids):
to = timeout
msg = None
saved = []
while True:
start = time.clock()
msg =self.q.get(wait, to)
if msg and msg['id'] in ids:
break;
# not the expecting message, save it.
saved.append(msg)
to = to - (time.clock()-start)
if to <= 0:
break
# put the saved messages back to the queue
for m in saved:
self.q.put(m, True)
return msg

br, Terry
Jun 27 '08 #1
11 2425
On Apr 27, 6:27 pm, Terry <terry.yin...@gmail.comwrote:
Hello!

I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part

WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

This is my current solution:

def _get_with_ids(self,wait, timeout, ids):
to = timeout
msg = None
saved = []
while True:
start = time.clock()
msg =self.q.get(wait, to)
if msg and msg['id'] in ids:
break;
# not the expecting message, save it.
saved.append(msg)
to = to - (time.clock()-start)
if to <= 0:
break
# put the saved messages back to the queue
for m in saved:
self.q.put(m, True)
return msg

br, Terry
I just found that Queue is written in Python, maybe I can override it.
Jun 27 '08 #2
WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?
You could try a defaultdict containing queues, one queue per message ID.

Or you could implement your own thread-safe LookAheadQueue class.

David
Jun 27 '08 #3
(re-cc-ing the list)

On Sun, Apr 27, 2008 at 4:40 PM, Terry Yin <te**********@gmail.comwrote:
Defaultdict is not an option because there will be a lot of message IDs (and
increasing). I will implement LookAheadQueue class by overriding the Queue
class.

Thanks for your kind advice.

BTW, I have been in old-fashion telecommunication R&D for years, where
messages and state machines are heavily used in software development. And
this makes me automatically resort to messages between task-specific
processes/threads when designing any software, even in python. I'm wondering
if this is the right choice, or it's already not a modern way of design.
There are a lot of ways you could go about it, those 2 were the first
that came to mind.

Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.

If your apps are heavily threaded you might take a look at Stackless
Python: http://www.stackless.com/

David.
Jun 27 '08 #4
David <wi******@gmail.comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.
Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.

Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.

--
Nick Craig-Wood <ni**@craig-wood.com-- http://www.craig-wood.com/nick
Jun 27 '08 #5
I've never used it myself but you may find candygram interesting;
http://candygram.sourceforge.net, which AFAIK implements Erlang-style
message queues in Python.
Jun 27 '08 #6
On Apr 28, 5:30 pm, Nick Craig-Wood <n...@craig-wood.comwrote:
David <wizza...@gmail.comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.

Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.

Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.

--
Nick Craig-Wood <n...@craig-wood.com--http://www.craig-wood.com/nick

I do have one Queue per thread. The problem is the thread can not peek
into the Queue and select msg with certain ID first.
Jun 27 '08 #7
On Apr 28, 10:48 pm, "dcof...@gmail.com" <dcof...@gmail.comwrote:
I've never used it myself but you may find candygram interesting;http://candygram.sourceforge.net, which AFAIK implements Erlang-style
message queues in Python.
Thank you. I will look at candygram and stackless. I believe my
solution lies in either of them.
Jun 27 '08 #8
On 27 Apr, 12:27, Terry <terry.yin...@gmail.comwrote:
Hello!

I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # *this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part

WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

This is my current solution:

* * def _get_with_ids(self,wait, timeout, ids):
* * * * to = timeout
* * * * msg = None
* * * * saved = []
* * * * while True:
* * * * * * start = time.clock()
* * * * * * msg =self.q.get(wait, to)
* * * * * * if msg and msg['id'] in ids:
* * * * * * * * break;
* * * * * * # not the expecting message, save it.
* * * * * * saved.append(msg)
* * * * * * to = to - (time.clock()-start)
* * * * * * if to <= 0:
* * * * * * * * break
* * * * # put the saved messages back to the queue
* * * * for m in saved:
* * * * * * self.q.put(m, True)
* * * * return msg

br, Terry
Wy put them back in the queue?
You could have a defaultdict with the id as key and a list of
unprocessed messages with that id as items.
Your _get_by_ids function could first look into the unprocessed
messages for items with that ids and then
look into the queue, putting any unprocessed item in the dictionary,
for later processing.
This should improve the performances, with a little complication of
the method code (but way simpler
that implementing your own priority-based queue).

Ciao
-----
FB
Jun 27 '08 #9
Terry <te**********@gmail.comwrote:
On Apr 28, 5:30 pm, Nick Craig-Wood <n...@craig-wood.comwrote:
David <wizza...@gmail.comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.
Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.

Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.

I do have one Queue per thread. The problem is the thread can not peek
into the Queue and select msg with certain ID first.
My point is don't put messages that the thread doesn't need in the
queue in the first place. Ie move that logic into PutMsg.

--
Nick Craig-Wood <ni**@craig-wood.com-- http://www.craig-wood.com/nick
Jun 27 '08 #10
On Apr 29, 3:01 pm, Dennis Lee Bieber <wlfr...@ix.netcom.comwrote:
On Sun, 27 Apr 2008 03:27:59 -0700 (PDT), Terry <terry.yin...@gmail.com>
declaimed the following in comp.lang.python:
I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part
WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.
Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

Create your own queue class -- including locking objects.

Implement the queue itself (I've not looked at how Queue.Queue is
really done) as a priority queue (that is, a simple list ordered by your
ID -- new items are inserted after all existing items with the same or
lower ID number).

Surround list manipulations with a lock based on a Condition.

Now, the trick -- the .get(ID) sequence being something like (this
is pseudo-code):

while True:
self.condition.acquire()
scan self.qlist for first entry with ID
if found:
remove entry from self.qlist
self.condition.release()
return entry
self.condition.wait()

-=-=-=-=- the .put(ID, data) looks like

self.condition.acquire()
scan self.qlist for position to insert (ID, data)
self.condition.notifyAll()
self.condition.release()

-=-=-=-=-

Essentially, if the first pass over the list does not find an entry
to return, it waits for a notify to occur... and notification will only
occur when some other thread puts new data into the list.
--
Wulfraed Dennis Lee Bieber KD6MOG
wlfr...@ix.netcom.com wulfr...@bestiaria.com
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: web-a...@bestiaria.com)
HTTP://www.bestiaria.com/
Yes, now I have a similar solution in my code. But after read the
stackless python, I'm thinking if I can move to stackless, which might
improve the performance of my thread. Because I'm trying to simulate
some behavior of the real world (trading), I believe there will be a
lot of threads in the future in my program.
Jun 27 '08 #11
On Apr 29, 5:30 pm, Nick Craig-Wood <n...@craig-wood.comwrote:
Terry <terry.yin...@gmail.comwrote:
On Apr 28, 5:30 pm, Nick Craig-Wood <n...@craig-wood.comwrote:
David <wizza...@gmail.comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.
Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.
Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.
I do have one Queue per thread. The problem is the thread can not peek
into the Queue and select msg with certain ID first.

My point is don't put messages that the thread doesn't need in the
queue in the first place. Ie move that logic into PutMsg.

--
Nick Craig-Wood <n...@craig-wood.com--http://www.craig-wood.com/nick
Well, I'm simulating the real world. It's like that you wouldn't drop
or proceed a task when you already started your lunch, just save it
and process it later when you finish your lunch.
Of course the task sender can send the task again and again if he got
not ack from you. But that's just one possible situation in the real
world, and not an efficient one.
Jun 27 '08 #12

This discussion thread is closed

Replies have been disabled for this discussion.

Similar topics

reply views Thread by Jason Evans | last post: by
5 posts views Thread by Juri | last post: by
18 posts views Thread by Nick Z. | last post: by
2 posts views Thread by www.brook | last post: by
6 posts views Thread by Joe Van Dyk | last post: by
15 posts views Thread by sat | last post: by
reply views Thread by NPC403 | last post: by
1 post views Thread by UniDue | last post: by
By using this site, you agree to our Privacy Policy and Terms of Use.