TCPSession.cs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. using System;
  2. using System.Net.Sockets;
  3. using System.Threading.Tasks;
  4. namespace XGame.Framework.Network
  5. {
  6. public class TCPSession : IRemoteSession, IDisposable
  7. {
  8. readonly int TIMEOUT = 4000;
  9. private IAsyncResult _asyncResultRead;
  10. private IAsyncResult _asyncResultWrite;
  11. private Socket _socket;
  12. private NetworkStream _tcpStream;
  13. private byte[] _buffers;
  14. private volatile SessionStatus _status = SessionStatus.FREE;
  15. public SessionStatus Status => _status;
  16. public SessionAddress Address { get; private set; }
  17. private ISessionContext _context;
  18. internal TCPSession(int bufferSize, int timeout, ISessionContext context)
  19. {
  20. _buffers = new byte[bufferSize];
  21. TIMEOUT = timeout;
  22. _context = context;
  23. }
  24. #region Connect Method
  25. private void OnDnsCompleted(SessionAddress address)
  26. {
  27. if (string.IsNullOrEmpty(address.IP))
  28. {
  29. OnConnectFail(new Exception("[Session] 域名解析失败或IP为空"));
  30. return;
  31. }
  32. Task.Factory.StartNew(() =>
  33. {
  34. try
  35. {
  36. _socket = new Socket(address.IsIpv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork, SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp);
  37. IAsyncResult connectAsync = _socket.BeginConnect(address.IP, address.PORT, null, _socket);
  38. _status = SessionStatus.CONNECTING;
  39. bool success = connectAsync.AsyncWaitHandle.WaitOne(TIMEOUT, true);
  40. if (success && _socket.Connected)
  41. {
  42. _socket.EndConnect(connectAsync);
  43. _tcpStream = new NetworkStream(_socket);
  44. _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); //设置无延迟发送
  45. OnConnected();
  46. }
  47. else
  48. {
  49. OnConnectTimeout();
  50. }
  51. }
  52. catch (Exception e)
  53. {
  54. OnConnectFail(e);
  55. }
  56. });
  57. }
  58. public void Connect(AddressInfo info)
  59. {
  60. if (Status != SessionStatus.FREE)
  61. {
  62. Log.Warn($"[Session]SessionStatus:{Status} 已连接或者正在连接,又尝试连接");
  63. return;
  64. }
  65. OnStartConnect();
  66. if (Address != null && Address.Domain == info.Address && Address.PORT == info.Port)
  67. {
  68. OnDnsCompleted(Address);
  69. }
  70. else
  71. {
  72. Address = new SessionAddress(info);
  73. Address.ParseDnsAsync(OnDnsCompleted);
  74. }
  75. }
  76. public bool IsConnected(bool bPrecise)
  77. {
  78. if (null != _socket)
  79. {
  80. if (Status == SessionStatus.CONNECTED && _socket.Connected)
  81. {
  82. if (!bPrecise)
  83. {
  84. return true;
  85. }
  86. try
  87. {
  88. return !(_socket.Poll(1000, SelectMode.SelectRead) && _socket.Available == 0);
  89. }
  90. catch (SocketException e)
  91. {
  92. Log.Warn($"[Session]Error:SocketException --socket poll error: {e.Message} errorcode:{e.SocketErrorCode}");
  93. return false;
  94. }
  95. catch (Exception e)
  96. {
  97. Log.Warn("[Session]Error:Exception --socket poll error: {0}", e.Message);
  98. return false;
  99. }
  100. }
  101. }
  102. return false;
  103. }
  104. private void OnConnectFail(Exception e)
  105. {
  106. _status = SessionStatus.CONNECT_FAIL;
  107. _context.Synchronizer.Enqueue(ESessionCode.ConnectFail, e);
  108. }
  109. private void OnConnectTimeout()
  110. {
  111. _status = SessionStatus.CONNECT_TIMEOUT;
  112. _context.Synchronizer.Enqueue(ESessionCode.ConnectTimeout, null);
  113. }
  114. private void OnConnected()
  115. {
  116. _status = SessionStatus.CONNECTED;
  117. _context.Synchronizer.Enqueue(ESessionCode.Connected, null);
  118. StartReceive();
  119. }
  120. private void OnStartConnect()
  121. {
  122. _context.Synchronizer.Enqueue(ESessionCode.StartConnect, null);
  123. }
  124. #endregion
  125. #region Disconnect Method
  126. public void Disconnect()
  127. {
  128. if (Status != SessionStatus.FREE)
  129. {
  130. try
  131. {
  132. if (_asyncResultRead != null && !_asyncResultRead.IsCompleted)
  133. {
  134. _asyncResultRead.AsyncWaitHandle.Close();
  135. _socket.Shutdown(SocketShutdown.Receive);
  136. }
  137. if (_asyncResultWrite != null && !_asyncResultWrite.IsCompleted)
  138. {
  139. _asyncResultWrite.AsyncWaitHandle.Close();
  140. _socket.Shutdown(SocketShutdown.Send);
  141. }
  142. if (_tcpStream != null)
  143. {
  144. _tcpStream.Close();
  145. _tcpStream.Dispose();
  146. }
  147. _socket.Close();
  148. _socket.Dispose();
  149. }
  150. catch (Exception e)
  151. {
  152. OnDisconnectError(e);
  153. }
  154. _status = SessionStatus.FREE;
  155. OnDisconnect();
  156. }
  157. }
  158. private void OnDisconnect()
  159. {
  160. _context.Synchronizer.Enqueue(ESessionCode.Disconnect, null);
  161. }
  162. private void OnDisconnectError(Exception e)
  163. {
  164. _context.Synchronizer.Enqueue(ESessionCode.DisconnectError, e);
  165. }
  166. #endregion
  167. #region Send Method
  168. public bool Send(byte[] bytes, int offset, int length)
  169. {
  170. try
  171. {
  172. _asyncResultWrite = _tcpStream.BeginWrite(bytes, offset, length, EndSend, bytes);
  173. return true;
  174. }
  175. catch (Exception e)
  176. {
  177. SessionBufferPool.Recycle(bytes);
  178. OnSendError(e);
  179. }
  180. return false;
  181. }
  182. private void EndSend(IAsyncResult ar)
  183. {
  184. try
  185. {
  186. _tcpStream.EndWrite(ar);
  187. }
  188. catch (Exception e)
  189. {
  190. OnSendError(e);
  191. }
  192. finally
  193. {
  194. var buffer = ar.AsyncState as byte[];
  195. SessionBufferPool.Recycle(buffer);
  196. }
  197. }
  198. private void OnSendError(Exception e)
  199. {
  200. _status = SessionStatus.SEND_ERROR;
  201. _context.Synchronizer.Enqueue(ESessionCode.SendException, e);
  202. }
  203. #endregion
  204. #region Receive Method
  205. public void StartReceive()
  206. {
  207. try
  208. {
  209. _asyncResultRead = _tcpStream.BeginRead(_buffers, 0, _buffers.Length, EndReceive, _socket);
  210. }
  211. catch (Exception e)
  212. {
  213. OnReceiveError(e);
  214. }
  215. }
  216. private void EndReceive(IAsyncResult asyncReceive)
  217. {
  218. try
  219. {
  220. int length = _tcpStream.EndRead(asyncReceive);
  221. if (length > 0)
  222. {
  223. OnRead(_buffers, 0, length);//处理
  224. StartReceive();
  225. }
  226. else
  227. {
  228. OnRemoteClose();
  229. }
  230. }
  231. catch (Exception e)
  232. {
  233. OnReceiveError(e);
  234. }
  235. }
  236. private void OnReceiveError(Exception e)
  237. {
  238. _status = SessionStatus.RECV_ERROR;
  239. _context.Synchronizer.Enqueue(ESessionCode.RecvError, e);
  240. }
  241. private void OnRead(byte[] bytes, int offset, int length)
  242. {
  243. var buffer = SessionBufferPool.Acquire();
  244. Array.Copy(bytes, offset, buffer, 0, length);
  245. _context.Synchronizer.Enqueue(buffer, 0, length);
  246. }
  247. private void OnRemoteClose()
  248. {
  249. _status = SessionStatus.CLOSED;
  250. _context.Synchronizer.Enqueue(ESessionCode.SessionClose, null);
  251. }
  252. #endregion
  253. void IDisposable.Dispose()
  254. {
  255. Disconnect();
  256. _context = null;
  257. }
  258. }
  259. }