473,890 Members | 2,071 Online
Bytes | Software Development & Data Engineering Community
+ Post

Home Posts Topics Members FAQ

Question regarding Queue object

Hello!

I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part

WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

This is my current solution:

def _get_with_ids(s elf,wait, timeout, ids):
to = timeout
msg = None
saved = []
while True:
start = time.clock()
msg =self.q.get(wai t, to)
if msg and msg['id'] in ids:
break;
# not the expecting message, save it.
saved.append(ms g)
to = to - (time.clock()-start)
if to <= 0:
break
# put the saved messages back to the queue
for m in saved:
self.q.put(m, True)
return msg

br, Terry
Jun 27 '08 #1
11 2644
On Apr 27, 6:27 pm, Terry <terry.yin...@g mail.comwrote:
Hello!

I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part

WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

This is my current solution:

def _get_with_ids(s elf,wait, timeout, ids):
to = timeout
msg = None
saved = []
while True:
start = time.clock()
msg =self.q.get(wai t, to)
if msg and msg['id'] in ids:
break;
# not the expecting message, save it.
saved.append(ms g)
to = to - (time.clock()-start)
if to <= 0:
break
# put the saved messages back to the queue
for m in saved:
self.q.put(m, True)
return msg

br, Terry
I just found that Queue is written in Python, maybe I can override it.
Jun 27 '08 #2
WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?
You could try a defaultdict containing queues, one queue per message ID.

Or you could implement your own thread-safe LookAheadQueue class.

David
Jun 27 '08 #3
(re-cc-ing the list)

On Sun, Apr 27, 2008 at 4:40 PM, Terry Yin <te**********@g mail.comwrote:
Defaultdict is not an option because there will be a lot of message IDs (and
increasing). I will implement LookAheadQueue class by overriding the Queue
class.

Thanks for your kind advice.

BTW, I have been in old-fashion telecommunicati on R&D for years, where
messages and state machines are heavily used in software development. And
this makes me automatically resort to messages between task-specific
processes/threads when designing any software, even in python. I'm wondering
if this is the right choice, or it's already not a modern way of design.
There are a lot of ways you could go about it, those 2 were the first
that came to mind.

Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.

If your apps are heavily threaded you might take a look at Stackless
Python: http://www.stackless.com/

David.
Jun 27 '08 #4
David <wi******@gmail .comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.
Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.

Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.

--
Nick Craig-Wood <ni**@craig-wood.com-- http://www.craig-wood.com/nick
Jun 27 '08 #5
I've never used it myself but you may find candygram interesting;
http://candygram.sourceforge.net, which AFAIK implements Erlang-style
message queues in Python.
Jun 27 '08 #6
On Apr 28, 5:30 pm, Nick Craig-Wood <n...@craig-wood.comwrote:
David <wizza...@gmail .comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.

Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.

Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.

--
Nick Craig-Wood <n...@craig-wood.com--http://www.craig-wood.com/nick

I do have one Queue per thread. The problem is the thread can not peek
into the Queue and select msg with certain ID first.
Jun 27 '08 #7
On Apr 28, 10:48 pm, "dcof...@gmail. com" <dcof...@gmail. comwrote:
I've never used it myself but you may find candygram interesting;http://candygram.sourceforge.net, which AFAIK implements Erlang-style
message queues in Python.
Thank you. I will look at candygram and stackless. I believe my
solution lies in either of them.
Jun 27 '08 #8
On 27 Apr, 12:27, Terry <terry.yin...@g mail.comwrote:
Hello!

I'm trying to implement a message queue among threads using Queue. The
message queue has two operations:
PutMsg(id, msg) # *this is simple, just combine the id and msg as one
and put it into the Queue.
WaitMsg(ids, msg) # this is the hard part

WaitMsg will get only msg with certain ids, but this is not possible
in Queue object, because Queue provides no method to peek into the
message queue and fetch only matched item.

Now I'm using an ugly solution, fetch all the messages and put the not
used ones back to the queue. But I want a better performance. Is there
any alternative out there?

This is my current solution:

* * def _get_with_ids(s elf,wait, timeout, ids):
* * * * to = timeout
* * * * msg = None
* * * * saved = []
* * * * while True:
* * * * * * start = time.clock()
* * * * * * msg =self.q.get(wai t, to)
* * * * * * if msg and msg['id'] in ids:
* * * * * * * * break;
* * * * * * # not the expecting message, save it.
* * * * * * saved.append(ms g)
* * * * * * to = to - (time.clock()-start)
* * * * * * if to <= 0:
* * * * * * * * break
* * * * # put the saved messages back to the queue
* * * * for m in saved:
* * * * * * self.q.put(m, True)
* * * * return msg

br, Terry
Wy put them back in the queue?
You could have a defaultdict with the id as key and a list of
unprocessed messages with that id as items.
Your _get_by_ids function could first look into the unprocessed
messages for items with that ids and then
look into the queue, putting any unprocessed item in the dictionary,
for later processing.
This should improve the performances, with a little complication of
the method code (but way simpler
that implementing your own priority-based queue).

Ciao
-----
FB
Jun 27 '08 #9
Terry <te**********@g mail.comwrote:
On Apr 28, 5:30 pm, Nick Craig-Wood <n...@craig-wood.comwrote:
David <wizza...@gmail .comwrote:
Another idea would be to have multiple queues, one per thread or per
message type "group". The producer thread pushes into the appropriate
queues (through an intelligent PutMsg function), and the consumer
threads pull from the queues they're interested in and ignore the
others.
Unfortunately a thread can only wait on one Queue at once (without
polling). So really the only efficient solution is one Queue per
thread.

Make an intelligent PutMsg function which knows which Queue (or
Queues) each message needs to be put in and all the threads will have
to do is Queue.get() and be sure they've got a message they can deal
with.

I do have one Queue per thread. The problem is the thread can not peek
into the Queue and select msg with certain ID first.
My point is don't put messages that the thread doesn't need in the
queue in the first place. Ie move that logic into PutMsg.

--
Nick Craig-Wood <ni**@craig-wood.com-- http://www.craig-wood.com/nick
Jun 27 '08 #10

This thread has been closed and replies have been disabled. Please start a new discussion.

Similar topics

0
2673
by: Jason Evans | last post by:
Hi All, I am writing my own implementation of queue via a linked list, note not a LinkedList, and was running into trouble with the clone method. I was wondering if anyone could point out some not so obvious errors I have made.. Cheers
5
1302
by: Juri | last post by:
Hi all, I have a question regarding the << operator. I wrote a simple queue class, which contains add(char), del() and empty() functions. They works fine. Now I want its output. If I write the following way, it gives me the correct output. while(!q1.empty()){ // This prints "abc" cout<<q1.del();
3
1795
by: kelkel | last post by:
Hi all, I got a question here which about using queue. i have create a class of queue. Public Sub Add(ByVal data As Object) SyncLock Me _Data.Enqueue(data) Monitor.Pulse(Me) End SyncLock End Sub Public Function GetNext() As Object
18
2093
by: Nick Z. | last post by:
I am writing a reusable class for logging. My goal is to make it as fast and as robust as possible, while keeping memory usage to the lowest. All members are static. For some reason I'm stuck on the following design question. Obviously you need a stream to write the log file. Should this stream be created every time the log needs to be written, or should the stream be a member variable of the logging class and only be opened and closed...
2
1727
by: www.brook | last post by:
I have a VC++.net code. Two threads are created, one thread keeps appending elements to the queue, another keeps deleting an element from the queue. I tried to use monitor, but it seems that the there is problem. in the thread of the deleting, seems that the deleting function entered twice and both try to delete the queue. The first thread find it is not empty, and try to delete an element, but before this thread actually delete it,...
6
1297
by: Joe Van Dyk | last post by:
#include <queue> #include <string> #include <iostream> using namespace std; class Foo { public: string data;
1
3918
by: Andrew Robert | last post by:
Hi everyone, Could someone help explain what I am doing wrong in this code block? This code block is an excerpt from a larger file that receives transmitted files via IBM WebSphere MQSeries an drops it to the local file system. Transmission of the file works as designed but it has a flaw.
15
3461
by: sat | last post by:
i have seen the check status then it is showing like : Type Of Thread Status HoldL Resting Admin Thread Resting Worker Thread Resting So now how can i activate them again? Knut Stolze wrote:
5
3809
by: admin | last post by:
ok This is my main. Pretty much it goes through each category and starts up 4 worker threads that then ask for groups to gether from. My problem is that when the thread gets done it keeps the mysql connections open so I end up with quite a few at the end. Is there a different way that I should do this? class Program { static string categories = { "emulation" , "audio" , "console" , "anime" , "xxx" , "tv" , "pictures" , "video" };
0
11210
Oralloy
by: Oralloy | last post by:
Hello folks, I am unable to find appropriate documentation on the type promotion of bit-fields when using the generalised comparison operator "<=>". The problem is that using the GNU compilers, it seems that the internal comparison operator "<=>" tries to promote arguments from unsigned to signed. This is as boiled down as I can make it. Here is my compilation command: g++-12 -std=c++20 -Wnarrowing bit_field.cpp Here is the code in...
0
10795
jinu1996
by: jinu1996 | last post by:
In today's digital age, having a compelling online presence is paramount for businesses aiming to thrive in a competitive landscape. At the heart of this digital strategy lies an intricately woven tapestry of website design and digital marketing. It's not merely about having a website; it's about crafting an immersive digital experience that captivates audiences and drives business growth. The Art of Business Website Design Your website is...
0
10444
tracyyun
by: tracyyun | last post by:
Dear forum friends, With the development of smart home technology, a variety of wireless communication protocols have appeared on the market, such as Zigbee, Z-Wave, Wi-Fi, Bluetooth, etc. Each protocol has its own unique characteristics and advantages, but as a user who is planning to build a smart home system, I am a bit confused by the choice of these technologies. I'm particularly interested in Zigbee because I've heard it does some...
1
8001
isladogs
by: isladogs | last post by:
The next Access Europe User Group meeting will be on Wednesday 1 May 2024 starting at 18:00 UK time (6PM UTC+1) and finishing by 19:30 (7.30PM). In this session, we are pleased to welcome a new presenter, Adolph Dupré who will be discussing some powerful techniques for using class modules. He will explain when you may want to use classes instead of User Defined Types (UDT). For example, to manage the data in unbound forms. Adolph will...
0
7154
by: conductexam | last post by:
I have .net C# application in which I am extracting data from word file and save it in database particularly. To store word all data as it is I am converting the whole word file firstly in HTML and then checking html paragraph one by one. At the time of converting from word file to html my equations which are in the word document file was convert into image. Globals.ThisAddIn.Application.ActiveDocument.Select();...
0
6032
by: adsilva | last post by:
A Windows Forms form does not have the event Unload, like VB6. What one acts like?
1
4653
by: 6302768590 | last post by:
Hai team i want code for transfer the data from one system to another through IP address by using C# our system has to for every 5mins then we have to update the data what the data is updated we have to send another system
2
4255
muto222
by: muto222 | last post by:
How can i add a mobile payment intergratation into php mysql website.
3
3261
bsmnconsultancy
by: bsmnconsultancy | last post by:
In today's digital era, a well-designed website is crucial for businesses looking to succeed. Whether you're a small business owner or a large corporation in Toronto, having a strong online presence can significantly impact your brand's success. BSMN Consultancy, a leader in Website Development in Toronto offers valuable insights into creating effective websites that not only look great but also perform exceptionally well. In this comprehensive...

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.