473,397 Members | 1,985 Online
Bytes | Software Development & Data Engineering Community
Post Job

Home Posts Topics Members FAQ

Join Bytes to post your question to a community of 473,397 software developers and data experts.

[Kamaelia] TCPClient: How to sense connection failure?

Hello,

I'm currently trying to implement a simulation program with Kamaelia
and need a reliable TCP connection to a data server.

From Twisted, I know that a method is called if the connection fails
by whatever reason. I tried to get the same results with Kamaelia's
TCPClient component. If I start up the component and try to connect
to a closed TCP port it fails and sends a message out of the signal
box, that's okay.

But if the connection attempt succeeds and, after some time, the
server drops the connection with full TCP handshake (FIN, FIN+ACK,
ACK), the component just hangs and does nothing. Is this by design,
or could there be an error in my setup?

Also, how long does a TCPClient component live -- or how can/should
I terminate it explicitly? I'm afraid that if I create
a "ReconnectingTCPClient" component, it could eat up memory over
long runtime with hanging TCPClients.

Regards,
Björn

--
BOFH excuse #246:

It must have been the lightning storm we had (yesterday) (last week)
(last month)

Jan 11 '08 #1
6 3559
"Bjoern Schliessmann" <usenet-mail,,...mwrote:
>I'm currently trying to implement a simulation program with Kamaelia
and need a reliable TCP connection to a data server.

From Twisted, I know that a method is called if the connection fails
by whatever reason. I tried to get the same results with Kamaelia's
TCPClient component. If I start up the component and try to connect
to a closed TCP port it fails and sends a message out of the signal
box, that's okay.

But if the connection attempt succeeds and, after some time, the
server drops the connection with full TCP handshake (FIN, FIN+ACK,
ACK), the component just hangs and does nothing. Is this by design,
or could there be an error in my setup?
Not sure about Kamelia, but I have found that when a FIN comes along,
a socket.recv() gives back an empty string, just like EOF on a file.

I always have my sockets unblocked and fitted with time outs, but then
I am basically a broken down assembler programmer, so there are probably
better techniques around.

Below is what I use - a sort of netstring, synced on a tilde, with human
readable length implementation and escaping of tildes and the escape character.

It seems to work reliably for me, and detects when the server goes down.

The code for a typical client is below. If anybody is interested I will post
the server
too, but it should be trivial to make, given the example below.

I hope the tabs survive the journey

- Hendrik

#start of code fragment

def sockget_len(s,L,data):
"""
This fills a buffer of given length from the socket s, recursively.

s is the socket
L is the length to receive
data is the buffer
"""

error = 0
req_L = L - len(data)
try:
data = data+s.recv(req_L)
except socket.error,msg: # broken pipes again
if 'timed out' in msg:
rec = '2'*L
return 2,rec # time out
print 'socket error while receiving',msg
rec = '1'*L
return 1,rec # error = 1 is a snafu
if not data:
print 'end of file while receiving'
rec = '0'*L
return 3,rec # This is end of file
if len(data) != L:
error,data = sockget_len(s,L,data)
return error,data
def sockget(s):
"""
Gets a transmission from host.
"""

while True:
tilde = ''
error,tilde = sockget_len(s,1,tilde) # sync up on tilde
if error == 1:
return error,''
elif error == 2:
return error,''
elif error == 3:
return error,''
if tilde == '~':
break

length = ''
error,length = sockget_len(s,4,length) # get the length of the data
if error == 1:
return error,'' # real error
elif error == 2:
return error,'' # Time out
elif error == 3:
return error,'' # End of file
L = int(length)
buf = ''
error,data = sockget_len(s,L,buf) # get the data of length L
return error, data # same errors as above 0 is all right
# client communications program

def comms_thread(qi,qo):
"""This listens for the latest values, and sends requests up."""

while True:

HOST = 'Linuxbox' # The remote host
PORT = 50007 # The same port as used by the server
socket.setdefaulttimeout(10.00)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

while True:
try:
qi.put('Connecting,')
s.connect((HOST, PORT))
break
except socket.error,msg:
print 'error msg is:',msg
time.sleep(10)
continue
print 'Connected - Time out is:',s.gettimeout()
qi.put('Connected,')

last_rx_time = time.time()
last_tx_time = time.time()
while True:

while True:
error,data = sockget(s) # see if a message from host
if error == 0 and data:
msg2 = data.replace('/\x81','~')
msg1 = msg2.replace('/\xd0','/')
qi.put(msg1)
print 'received',msg1
last_rx_time = time.time()
break
elif error == 1:
print 'Error after sockget'
break
if time.time() - last_rx_time 180:
print 'no comms from host for 3 minutes'
error = 1
break # time out ok, unless they are too long
if error == 2:
error = 0 # time outs are all right here
break
if error == 3:
error = 1
break # end of files are a snafu

if error == 1:
break

try:
i_string = qo.get(block=False) # see if stuff to transmit
except Queue.Empty:
if time.time()-last_tx_time 8.5:
i_string = 'Keepalive' # if not for a while, tell server we are alive
print 'sending keepalive'
else:
time.sleep(0.1) # else wait a while and carry on
continue

msg1 = i_string.replace('/','/\xd0')
msg2 = msg1.replace('~','/\x81')
length = str(len(msg2))
L = len(length)
if L == 1:
length = '000'+length
elif L == 2:
length = '00'+length
elif L == 3:
length = '0'+length
try:
s.send('~'+length+msg2)
last_tx_time = time.time()
except socket.error,msg:
print 'Socket error on transmit',msg
break
time.sleep(0.1)

s.close() # Formally close the broken thing
qi.put('Quit,') # Tell main thread its hopeless

sys.exit() # Clobber this thread


Jan 12 '08 #2
Hendrik van Rooyen wrote:
Not sure about Kamelia, but I have found that when a FIN comes
along, a socket.recv() gives back an empty string, just like EOF
on a file.
That Python socket interface can detect it I'm absolutely sure --
Twisted handles it.

I even pdb'ed Kamaelia and control flow ended at a different point
in case of connection closing -- but yielded control to the
scheduler immediately and never continued.
Below is what I use - a sort of netstring, synced on a tilde, with
human readable length implementation and escaping of tildes and
the escape character.
Thank you (though I hope I won't have to mess with socket
functions ;) ).
I hope the tabs survive the journey
Looks like they morphed to spaces.

Regards,
Björn

--
BOFH excuse #77:

Typo in the code

Jan 12 '08 #3
Bjoern Schliessmann wrote:
Hello,

I'm currently trying to implement a simulation program with Kamaelia
and need a reliable TCP connection to a data server.
The behaviour you're seeing sounds odd (which is hopefully encouraging :-),
but it's not clear from the description whether its a bug in your code or
Kamaelia. One question I really have as a result is what version are you
using?

Current release version 0.5.0, the version on /trunk, or the bleeding edge
version on /branches/private_MPS_Scratch. (I'm using the latter to run my
greylisting server - as are a few others).

All that said though, looking at the differences between versions, I'm not
convinced they're large enough to show the problem you're seeing.

I'm not about to rule out a bug I don't know about though :-)
From Twisted, I know that a method is called if the connection fails
by whatever reason. I tried to get the same results with Kamaelia's
TCPClient component. If I start up the component and try to connect
to a closed TCP port it fails and sends a message out of the signal
box, that's okay.
If you'd prefer more information in that message, please let me know.
(all components send out a message when they shutdown. For things that
send data out as one of their primary actions send out a producerFinished
message)
But if the connection attempt succeeds and, after some time, the
server drops the connection with full TCP handshake (FIN, FIN+ACK,
ACK), the component just hangs and does nothing. Is this by design,
or could there be an error in my setup?
It sounds like an error in your setup... but I hate saying that. (Doesn't
tell me or you what it is, and doesn't help change things to discourage or
detect mistakes in usage)

When the server drops the connection in my setups, the client disconnects
cleanly when the server dies, with the client code looking like this:
self.send(producerFinished(self,self.howDied), "signal")

Meaning you get a message telling you how the component shutdown as well as
the fact it shutdown. (If "howDied" is None, it's just a normal shutdown -
ie as a result of being told to shut down)

The function where this is managed is runClient in the class
Kamaelia.Internet.TCPClient.TCPClient
(fully qualified name)

The actual socket connections are handled by a class called
ConnectedSocketAdapter which manages all logic of checking for
errors, remote shutdown etc. That works the same for both servers
and clients so breakage in clients would show up as breakage in servers
as well, which would be particularly bad.
Also, how long does a TCPClient component live -- or how can/should
I terminate it explicitly? I'm afraid that if I create
a "ReconnectingTCPClient" component, it could eat up memory over
long runtime with hanging TCPClients.
That shouldn't be an issue (I hate the word "should"), but you can do this
using a carousel component. (I ought to write that as an example of how to
use the Carousel component)

In the meantime - whilst I check to see if there's a bug I didn't know
about, the following 2 cookbook entries may be of use:
* http://kamaelia.sourceforge.net/Cookbook/TCPSystems
* http://kamaelia.sourceforge.net/Cookbook/Carousels - allows you to make
something that exits reusable. It's a little awkward to get your head
around, but is quite useful when you do. (I've heard of others using
Carousel & TCPClient to make a reconnecting TCPClient in the past)

All that said, I'm not going to rule out a bug and look into it. (if you
have a simple example you find fails, please forward it to me :)

*thinks*

The following code may also be useful when debugging:

from Kamaelia.Chassis.Pipeline import Pipeline

class PeriodicWakeup(Axon.ThreadedComponent.threadedcomp onent):
interval = 300
def main(self):
while 1:
time.sleep(self.interval)
self.send("tick", "outbox")

class WakeableIntrospector(Axon.Component.component):
def main(self):
while 1:
Q = [ q.name for q in self.scheduler.listAllThreads() ]
Q.sort()
print "*debug* THREADS"+ str(Q)
self.scheduler.debuggingon = False
yield 1
while not self.dataReady("inbox"):
self.pause()
yield 1
while self.dataReady("inbox"):
self.recv("inbox")

Pipeline(
PeriodicWakeup(),
WakeableIntrospector(),
).activate()

If you put this code somewhere before your "run" call, you'll get periodic
output to tell you what's running. When debugging manually I'd drop the
interval to 3-10 seconds or so. I use 300 for a server.

Now, off to see if I can reproduce your problem... :)

Regards,
Michael.
--
http://kamaelia.sourceforge.net/Home
http://yeoldclue.com/blog

Jan 12 '08 #4
Michael Sparks wrote:
The behaviour you're seeing sounds odd (which is hopefully
encouraging :-), but it's not clear from the description whether
its a bug in your code or Kamaelia. One question I really have as
a result is what version are you using?
Oh sorry, it's the versions from MegaPack 1.4.0.
In the meantime - whilst I check to see if there's a bug I didn't
know about, the following 2 cookbook entries may be of use:
* http://kamaelia.sourceforge.net/Cookbook/TCPSystems
* http://kamaelia.sourceforge.net/Cookbook/Carousels - allows
you to make
something that exits reusable. It's a little awkward to get
your head around, but is quite useful when you do. (I've
heard of others using Carousel & TCPClient to make a
reconnecting TCPClient in the past)
Thanks for all the information.
All that said, I'm not going to rule out a bug and look into it.
(if you have a simple example you find fails, please forward it to
me :)
Sure, here is my code (but see below ;) ):

----snip----------------------------------------------------------
from Kamaelia.Internet.TCPClient import TCPClient
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Util.Console import ConsoleEchoer, ConsoleReader
from Axon.Component import component
from Axon.Ipc import shutdownMicroprocess, producerFinished

class Listener(component):

Inboxes = {"inbox": "Inbox",
"control": "control signals received here",
}

Outboxes = {"outbox": "(not used)",
"signal": "(not used)"
}

def main(self):
while True:
if self.dataReady("inbox"):
print "data from Inbox:", repr(self.recv("inbox"))
if self.dataReady("control"):
control_data = self.recv("control")
print repr(control_data)
if isinstance(control_data, shutdownMicroprocess):
print "Connection could not be established"
break
elif isinstance(control_data, producerFinished):
print "Connection closed"
break
yield 1

k = ConsoleReader(">>")
tcp_client = TCPClient("127.0.0.1", 1850)
listener = Listener()
Pipeline(tcp_client, listener).run()
----snip----------------------------------------------------------

So I'm just using a client and a helper object to display all data
behind it. I usually start the script in one VT, and "nc -l -p
1850" in another. Using wireshark, the packet sequence is almost
identical:

Client closes connection:
C: SYN
S: SYN,ACK
C: ACK
[connection established]
C: FIN,ACK
S: FIN,ACK
C: ACK

Client closes connection:
C: SYN
S: SYN,ACK
C: ACK
[connection established]
S: FIN,ACK
C: ACK
C: FIN,ACK
S: ACK

Looks like a perfectly normal handshake to me.
The following code may also be useful when debugging:
Cool, I've been looking for a code piece like that. :)

Whoops, the TCP client does in fact quit if the server closes
connection :) For some reason, my Listener doesn't quit. I thought
it's sufficient to exit the main method in some way to quit a
component? That's what I do using "break" in the
'if self.dataReady("control")' part of main.

Regards,
Björn

--
BOFH excuse #265:

The mouse escaped.

Jan 13 '08 #5
Bjoern Schliessmann wrote:
Whoops, the TCP client does in fact quit if the server closes
connection :)
Great - so it wasn't a problem with the TCPClient after all :-)
For some reason, my Listener doesn't quit. I thought
it's sufficient to exit the main method in some way to quit a
component? That's what I do using "break" in the
'if self.dataReady("control")' part of main.
It is sufficient, and running with Kamaelia from /trunk, your listener does
indeed shutdown correctly - so it looks like it's a bug in that release of
Kamaelia.

That's my bad really because we haven't done a full release in quite some
time.

Looking at the old code, it appears that in that code the TCPClient wasn't
forwarding shutdown messages correctly, so there *was* a bug, but there
doesn't seem to be now.

My suggestion for the moment would be to use the code on /trunk since this
is stable at present (development happens on branches rather than /trunk)
and that I really ought to sort out the next release - I really hadn't
realised just how long it had been since the last release!

steps:

~svn co https://kamaelia.svn.sourceforge.net...kamaelia/trunk
kamaelia-trunk
~cd kamaelia-trunk
~/kamaelia-trunkcd Code/Python/Axon
~/kamaelia-trunk/Code/Python/Axonsudo python setup.py install
~/kamaelia-trunk/Code/Python/Axoncd ../Kamaelia
~/kamaelia-trunk/Code/Python/Kamaeliasudo python setup.py install

I've tested using this, and your code works as you'd expect.

Regards,
Michael.

Jan 14 '08 #6
Michael Sparks wrote:
It is sufficient, and running with Kamaelia from /trunk, your
listener does indeed shutdown correctly
Great, thanks for validating. :)
My suggestion for the moment would be to use the code on /trunk
since this is stable at present (development happens on branches
rather than /trunk)
I will.

Regards,
Björn

--
BOFH excuse #243:

The computer fleetly, mouse and all.

Jan 14 '08 #7

This thread has been closed and replies have been disabled. Please start a new discussion.

Similar topics

0
by: Michael Sparks | last post by:
Kamaelia 0.1.2 has been released! What is it? =========== Kamaelia is a collection of Axon components designed for network protocol experimentation in a single threaded, select based...
3
by: Michael Sparks | last post by:
Hi, I'm posting a link to this since I hope it's of interest to people here :) I've written up the talk I gave at ACCU Python UK on the Kamaelia Framework, and it's been published as a BBC...
6
by: Michael Kennedy [UB] | last post by:
Hi, I have a project using the TcpClient and its associated NetworkStream. Everything works well except for one condition which I haven't found any information about dealing with: How do I...
0
by: Michael Sparks | last post by:
Hi, Apologies first to those outside the UK... Open Tech 2005* is a follow on from previous years' NotCon events which are community driven low cost events by geeks & developers for geeks &...
3
by: Daniel | last post by:
TcpClient close() method socket leak when i use TcpClient to open a connection, send data and close the TcpClient with myTcpClientInstance.Close(); it takes 60 seconds for the actual socket on...
5
by: Jeff Weber | last post by:
First, my question: Should I use Write or BeginWrite (sync or async) to stream data to my clients over a TCPClient connection. Details: On the server I have a custom circular data buffer that...
4
by: Michael | last post by:
Hi! (OK, slightly silly subject line :) I'm extremely pleased to say - Kamaelia 0.4.0 has been released! What's New & Changed? =====================
0
by: Michael Sparks | last post by:
Hello, I'd like to invite you to our first Kamaelia Open Space event. Our theme is "Making Software like Lego through intuitive useful concurrency". Perhaps you want to learn to use...
0
by: Michael | last post by:
Hi, We're (BBC Research) participating in Google's Summer of Code as a mentor organisation again, and I thought it worth spreading some extra publicity where I think there might be some...
0
by: emmanuelkatto | last post by:
Hi All, I am Emmanuel katto from Uganda. I want to ask what challenges you've faced while migrating a website to cloud. Please let me know. Thanks! Emmanuel
0
BarryA
by: BarryA | last post by:
What are the essential steps and strategies outlined in the Data Structures and Algorithms (DSA) roadmap for aspiring data scientists? How can individuals effectively utilize this roadmap to progress...
0
by: Hystou | last post by:
There are some requirements for setting up RAID: 1. The motherboard and BIOS support RAID configuration. 2. The motherboard has 2 or more available SATA protocol SSD/HDD slots (including MSATA, M.2...
0
marktang
by: marktang | last post by:
ONU (Optical Network Unit) is one of the key components for providing high-speed Internet services. Its primary function is to act as an endpoint device located at the user's premises. However,...
0
Oralloy
by: Oralloy | last post by:
Hello folks, I am unable to find appropriate documentation on the type promotion of bit-fields when using the generalised comparison operator "<=>". The problem is that using the GNU compilers,...
0
jinu1996
by: jinu1996 | last post by:
In today's digital age, having a compelling online presence is paramount for businesses aiming to thrive in a competitive landscape. At the heart of this digital strategy lies an intricately woven...
0
by: Hystou | last post by:
Overview: Windows 11 and 10 have less user interface control over operating system update behaviour than previous versions of Windows. In Windows 11 and 10, there is no way to turn off the Windows...
0
tracyyun
by: tracyyun | last post by:
Dear forum friends, With the development of smart home technology, a variety of wireless communication protocols have appeared on the market, such as Zigbee, Z-Wave, Wi-Fi, Bluetooth, etc. Each...
0
isladogs
by: isladogs | last post by:
The next Access Europe User Group meeting will be on Wednesday 1 May 2024 starting at 18:00 UK time (6PM UTC+1) and finishing by 19:30 (7.30PM). In this session, we are pleased to welcome a new...

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.