By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
459,290 Members | 1,195 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 459,290 IT Pros & Developers. It's quick & easy.

Thread's, async_chat and asyncore

P: n/a

I'm using the asyncore and _chat modules to create a network server. I
also have, running in a separate thread(s), a "producer" which needs to
"push" data onto the network connection(s). (This is all for a
multi-player game server, so the threads would be individual games,
that need to update the players when new events happen in the game)

I currently have a module level list of async_chat instances, which the
thread then reads and uses the 'push' method of async_chat.

I've done some very quick tests, but wanted to know more formally if
the async_chat.push() method is thread safe?

ie. can i be sure that the data that the thread pushes onto the
connection will always be complete, and never "over-written" by another

Here is an example, which creates a whack of threads, which push their
message down the first connection made to the server. I just use a
telnet client to test it. I haven't seen any over-writing of values,
but this is a very basic test/example of what i'm doing. I just want to
make sure that this _is_ ok, or if i should be using a Queue...

import asynchat
import asyncore
import socket
from threading import Thread
import time
import random

clients = []

class pulse_thread(Thread):
def __init__(self, string):
self.msg = string

def run(self):
while True:

class Chat(asynchat.async_chat):

def __init__(self, sock, addr, terminator="\r\n", name="me"):

asynchat.async_chat.__init__(self, conn=sock)

self.rmq = []
self.terminator = terminator
# push self onto module client list, so threads can access me

def collect_incoming_data(self, data):
"""Buffer the data"""

def found_terminator(self):
print "".join(self.rmq)
self.rmq = []

class cServer(asyncore.dispatcher):

# constructor method
def __init__(self, addr, terminator="\r\n"):

# initalize asyncore

self.terminator = terminator

# create a socket to listen on
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)

# bind the socket to the ip/port

# listen ??? don't know about the 5 ????

# EVENT handle_accept - fired when server accepts an new connection
def handle_accept (self):
# fire the accept method > returns a connection object
conn, addr = self.accept()

# extend the connection with cSock Class (another asyncore)
#cSock(conn, addr)
if __name__ == '__main__':

for i in range(1,99):
x = pulse_thread(":%02d:\n\r" % (i,))

myserver = cServer(('',7000))


Please let me know if this is totally the _wrong_ way to do it!


Oct 4 '05 #1
Share this Question
Share on Google+
3 Replies

P: n/a
Gaaa, i just did a run with 999 threads pushing data, and i'm getting
"IndexError: deque" problems, so i'm guessing its _not_ thread safe...



Oct 4 '05 #2

P: n/a
Hi Jos,

Have you looked at Kamaelia? Its a project I'm involved in to create a
framework for highly concurrent systems, geared particularly for
suitability for network server/client applications.

A system is built out of many small components, each with their own
pseudo-thread of execution (implemented as python generators). They
communicate with each other by reading from and sending to local inboxes
and outboxes. These are then linked together, causing messages to be
passed from component to component.

We're hopefully going to be making a new release soon, with some more
interesting facilities. A Kamaelia system doing something similar could
look something like this:

----code snippet starts----

#!/usr/bin/env python

from Axon.Component import component
import time
import random

class PulseGenerator(component):
def __init__(self, msg):
self.msg = msg

def main(self): # microprocess (generator)
while 1:
while time.time() < t:
yield 1 # hand back control to scheduler
t += random.random()
self.send( self.msg, "outbox")
if __name__ == "__main__":
from Kamaelia.Util.Splitter import Plug, PlugSplitter
from Kamaelia.Chassis.ConnectedServer import SimpleServer
from Kamaelia.Util.passThrough import passThrough
from Axon.Scheduler import scheduler

producer = PlugSplitter(PulseGenerator("hello!\n")).activate( )

def newClientHandler():
return Plug(producer, passThrough()).activate()

SimpleServer( protocol = newClientHandler, port = 1601 ).activate()

----code snippet ends----

The SimpleServer component uses the factory function 'newClientHandler' to
create a component to handle the new client connection. In this case, its
a 'plug' component that connects to a splitter component. The purpose of
the splitter is to copy output to multiple destinations. In this case, the
output comes from a PulseGenerator component.

In the above system there's only one producer. To deal with multiple
games, I guess you'd have to write a component that handles the initial
'conversation' with the client to establish which game they want to
connect to, before then creating the 'plug' component to connect to the
appropriate game server.

This is of course only one way to do it, and the 'plug' arrangement might
not be the best eventual solution. We've also got a lookup-service, called
the Co-ordinating Assistant Tracker. It translates name to references to
inboxes or outboxes on components. You could, for example, use this
service to look up components handling specific games.

I'd encourage you to take a look at our project site on sourceforge
( ). There's some more introductory material and a
tutorial or two which should give you a better idea about the system.

We'd be interested to know what you think, and whether you think you could
build your application using it.


| Matt Hammond
| R&D Engineer, BBC Research & Development, Tadworth, Surrey, UK.
Oct 4 '05 #3

P: n/a
I will take a look at it, thanks!

Oct 6 '05 #4

This discussion thread is closed

Replies have been disabled for this discussion.