Hi,
I try to implement a reusable socket class to send and receive data.
It seems to work but I have 2 problems :
1) I rely on Socket.Available to detect that the connection is closed (no
more data to expect). Sometimes, Socket.Available returns 0 but the other
end of the connection did not close it !
2) This class will be used by many other classes so I have to use the
minimum system resource. I just found that the first parameter for
Socket.Poll is in microseconds ! I thought it was milliseconds.
How can I improve/correct this class ?
Thanks in advance.
Here below, you will find the code for this class (TcpHelper).
It is used as following :
TcpHelper transport = new TcpHelper;
transport.OnConnectionFailed += new EventHandler (...);
transport.CallbackReceivedData += new EventHandler (...);
transport.Open (existingSocket, caller);
or
transport.Open (hostname, port);
transport.StartReading ();
the code :
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace IprRouting
{
public class TcpEventData : System.EventArgs
{
private byte [] _header;
private byte [] _data;
public TcpEventData (byte [] header, byte [] data)
{
_header = header;
_data = data;
}
public byte [] Header
{
get { return _header; }
}
public byte [] Data
{
get { return _data; }
}
}
public class TcpHelper
{
private Socket _socket = null;
private bool _mustRun = true;
private string _hostName = string.Empty;
private IPAddress _ipAddress = IPAddress.IPv6None;
private string _caller = string.Empty;
private const int PollTimeout = 100;
public event EventHandler CallbackReceivedData;
public event EventHandler OnConnectionFailed;
private static string localHostname = string.Empty;
private static IPAddress [] localIpAddresses = null;
public TcpHelper ()
{
}
#region Properties
public string HostName
{
get { return _hostName; }
}
public IPAddress IPAddress
{
get { return _ipAddress; }
}
public string RemoteEndPoint
{
get
{
if (_socket == null)
return "Not connected";
else
{
if (! _socket.Connected)
return "Not connected";
else
return _socket.RemoteEndPoint.ToString
();
}
}
}
#endregion // Properties
// remove domain part in host name (i.e. in "xxx.yyy.zzz",
// remove ".yyy.zzz"
private string StandardizeHostname (string hostname)
{
int dotIndex = hostname.IndexOf ('.');
if (dotIndex != -1)
{
return hostname.Substring (0, dotIndex).ToUpper ();
}
else
return hostname;
}
public void StartReading ()
{
Thread handleTcp = new Thread (new ThreadStart
(ReadTcp));
if (_caller.Length > 0)
{
if (_hostName.Equals ("localhost"))
handleTcp.Name = "TcpHelper " + _caller;
else
handleTcp.Name = String.Format ("TcpHelper {0}
{1}", _caller, _hostName);
}
else
handleTcp.Name = String.Format ("TcpHelper {0}",
_hostName);
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::StartReading " +
handleTcp.Name);
handleTcp.Priority = ThreadPriority.Normal;
handleTcp.IsBackground = true; // thread will be closed
when application exits
handleTcp.Start ();
}
private void CloseIfActive ()
{
if (_socket != null)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::CloseIfActive
closing existing socket " +
RemoteEndPoint);
Close (false);
}
}
public void Open (Socket socket, string caller)
{
IPEndPoint ipEndPoint = (IPEndPoint)
socket.RemoteEndPoint;
_ipAddress = ipEndPoint.Address;
IPHostEntry hostInfo = TcpHelper.GetHostByAddress
(_ipAddress);
if (hostInfo != null)
_hostName = StandardizeHostname
(hostInfo.HostName);
else
_hostName = _ipAddress.ToString ();
_caller = caller;
CloseIfActive ();
_socket = socket;
}
public short Open (string hostName, int port)
{
short returnCode = 0;
_hostName = hostName;
returnCode = Open (port);
return returnCode;
}
private short Open (int port)
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::Open open " +
_hostName);
if (_hostName.Length <= 0)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::Open invalid
host name " + _hostName);
return 4;
}
CloseIfActive ();
IPHostEntry hostInfo = TcpHelper.GetHostInfo (_hostName);
if (hostInfo == null)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::Open cannot get
host info " + _hostName);
return 5;
}
else
{
_hostName = StandardizeHostname
(hostInfo.HostName);
_ipAddress = hostInfo.AddressList [0];
}
if (TcpHelper.IsLocalHost (hostInfo.AddressList))
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::Open skip local
host " + _hostName);
return 6;
}
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::Open opening " +
_hostName +
", IP address " + _ipAddress + " port #" +
port.ToString ());
IPEndPoint endPoint = new IPEndPoint (_ipAddress, port);
_socket = new Socket (AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
_socket.SetSocketOption (SocketOptionLevel.Socket,
SocketOptionName.SendTimeout, TcpServer.TcpSendTimeout);
_socket.SetSocketOption (SocketOptionLevel.Socket,
SocketOptionName.ReceiveTimeout, TcpServer.TcpReceiveTimeout);
_socket.SetSocketOption (SocketOptionLevel.Socket,
SocketOptionName.KeepAlive, 1);
_socket.Connect (endPoint);
return 0;
}
private static bool IsLocalHost (IPAddress [] ipAddresses)
{
if (localIpAddresses == null)
{
localHostname = Dns.GetHostName ();
localIpAddresses = Dns.GetHostByName
(localHostname).AddressList;
}
foreach (IPAddress ipaddr in ipAddresses)
{
foreach (IPAddress ipaddrLocal in localIpAddresses)
{
if (ipaddrLocal.Equals (ipaddr))
return true;
}
}
return false;
}
private IPHostEntry GetHostByName (string hostname)
{
IPHostEntry hostInfo = null;
// try
// {
hostInfo = Dns.GetHostByName (_hostName);
/* }
catch (Exception ex)
{
Util.TraceError ("Util::GetHostByName exception
catched " + ex);
}
*/
return hostInfo;
}
private static IPHostEntry GetHostInfo (string host)
{
IPHostEntry hostInfo = null;
try
{
hostInfo = Dns.Resolve (host);
}
catch (Exception ex)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error
(String.Format ("TcpHelper::GetHostInfo
{0} exception catched {1}",
host, ex.ToString ()));
}
return hostInfo;
}
public static IPHostEntry GetHostByAddress (IPAddress
ipAddress)
{
IPHostEntry hostInfo = null;
try
{
hostInfo = Dns.GetHostByAddress (ipAddress);
}
catch (Exception ex)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error
("TcpHelper::GetHostByAddress exception catched " + ex);
}
return hostInfo;
}
public void ReadTcp ()
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info
(String.Format
("TcpHelper::ReadTcp thread started for socket
{0}",
RemoteEndPoint));
int bytesRead = 0;
_mustRun = true;
bool stopReading = false;
try
{
byte [] header = new byte [IprProtocol.HeaderSize];
while ((_mustRun) && (! stopReading))
{ // read header
if (_socket.Poll (PollTimeout,
SelectMode.SelectRead))
{
if (_socket.Available == 0)
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info
("TcpHelper::ReadTcp header no available bytes to read, probably
connection closed by peer");
stopReading = true;
break;
}
if (_socket.Available >=
IprProtocol.HeaderSize)
{
bytesRead = _socket.Receive
(header, IprProtocol.HeaderSize, 0);
if (Log.Current.IsInfoEnabled)
{
Log.Current.Info
("TcpHelper::ReadTcp header " +
bytesRead.ToString () +
" bytes received");
Log.Current.Info
(String.Format ("header 0
= {0} 1 = {1}",
header [0].ToString
("X2"), header [1].ToString ("X2")));
}
if (bytesRead > 0)
{ // read command
if (bytesRead !=
IprProtocol.HeaderSize)
{
if
(Log.Current.IsErrorEnabled)
Log.Current.Error
("TcpHelper::ReadTcp: invalid header, " + bytesRead + " bytes
received");
stopReading = true;
break ;
}
uint commandSize = header [1];
if (commandSize <= 0)
{
if
(Log.Current.IsErrorEnabled)
Log.Current.Error
("TcpHelper::ReadTcp invalid header, commandSize = " + commandSize);
stopReading = true;
break;
}
else
{
//
byte [] command = new byte [commandSize + 1];
//
command [0] = header [0]; // keep SES command
in command buffer
byte [] command = new
byte [commandSize];
bytesRead = 0;
while ((_mustRun) && (!
stopReading))
{
if (_socket.Poll
(PollTimeout, SelectMode.SelectRead))
{
if
(_socket.Available == 0)
{
if
(Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::ReadTcp command no available bytes to
read, probably connection closed by peer");
stopReading = true;
break;
}
while
((_mustRun) && (_socket.Available >= 0) && (bytesRead < commandSize))
{
//
bytesRead =
_socket.Receive (command, 1, commandSize, 0);
bytesRead
+= _socket.Receive
(command, bytesRead,
(int) (commandSize - bytesRead),
0);
if
(Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::ReadTcp command " +
bytesRead.ToString () + " bytes received");
if
(bytesRead == 0)
{
if
(Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::ReadTcp command no available bytes to
read, probably connection closed by peer");
stopReading = true;
break;
}
if
(bytesRead < commandSize)
{
if
(Log.Current.IsInfoEnabled)
Log.Current.Info
("TcpHelper::ReadTcp: not enough bytes available, " +
bytesRead +
" bytes received, " + commandSize + " bytes expected");
}
}
if (bytesRead
== commandSize)
{
if
(CallbackReceivedData != null)
{
CallbackReceivedData (this, new TcpEventData (header, command));
}
else
{
if
(Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::ReadTcp no callback registered");
}
break;
}
}
}
}
}
else
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info
("TcpHelper::ReadTcp header no more bytes to read, probably connection
closed by peer");
stopReading = true;
}
}
}
}
}
catch (Exception ex)
{
if (ex is SocketException)
{
/*
if (((SocketException)
ex).ErrorCode == (int) SocketErrorCodes.InterruptedFunctionCall)
Util.TraceOther
("TcpHelper::ReadTcp InterruptedFunctionCall, should be a graceful
shutdown");
else
{
*/
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::ReadTcp
error code = " +
((SocketException) ex).ErrorCode);
// }
}
else
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::ReadTcp
Exception catched " + ex);
}
if (_mustRun) // not graceful shutdown (not called by
Close () )
{
CloseConnection ();
TcpInfoEventArgs tcpInfo =
new TcpInfoEventArgs (_socket);
if (OnConnectionFailed != null)
OnConnectionFailed (this, tcpInfo);
else
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::ReadTcp no
callback defined");
}
else
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::ReadTcp graceful
shutdown");
_mustRun = true; // reset flag for new running
}
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::ReadTcp handling
finished host " + _hostName);
}
private void CloseConnection ()
{
if (Log.Current.IsInfoEnabled)
{
if (_hostName.Length > 0)
Log.Current.Info
(String.Format
("TcpHelper::CloseConnection closing
socket {0} on host {1}",
RemoteEndPoint, _hostName));
else
Log.Current.Info
(String.Format
("TcpHelper::CloseConnection closing
socket {0}",
RemoteEndPoint));
}
if (_socket != null)
{
if (_socket.Connected)
_socket.Shutdown (SocketShutdown.Both);
_socket.Close ();
_socket = null;
}
else
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::CloseConnection
socket is null");
}
public short Close (bool keepConnected)
{
if (Log.Current.IsInfoEnabled)
{
if (_hostName.Length > 0)
Log.Current.Info ("TcpHelper::Close closing
host " + _hostName);
else
Log.Current.Info ("TcpHelper::Close closing
host " + _socket.RemoteEndPoint);
}
_mustRun = false;
// allow reading thread to terminate
Thread.Sleep (PollTimeout);
// keepConnected: typically happens after OpenCanal is
received,
// the MainCommandHandler closes but the IprSocket that
has been
// given the Tcp socket is using it => keep it opened
if ((! keepConnected) && (_socket != null))
CloseConnection ();
return 0;
}
public int Send (byte [] data)
{
int bytesSent = 0;
try
{
if (Log.Current.IsInfoEnabled)
Log.Current.Info ("TcpHelper::Send sending " +
data.Length +
" bytes to client " + RemoteEndPoint);
if ((_socket != null) && (_socket.Connected))
bytesSent = _socket.Send (data);
else
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::Send TCP socket
not connected");
if (bytesSent != data.Length)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::Send not
all bytes sent, " +
bytesSent + " != " + data.Length);
return -1;
}
}
catch (Exception ex)
{
if (Log.Current.IsErrorEnabled)
Log.Current.Error ("TcpHelper::Send Exception
catched: " + ex);
return -1;
}
return bytesSent;
}
}
}