NetModule.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. using System;
  2. using XGame.Framework.Interfaces;
  3. using XGame.Framework.Json;
  4. using XGame.Framework.Time;
  5. namespace XGame.Framework.Network
  6. {
  7. public partial class NetModule : INetModule, IDisposable, IHeartbeatListener, IMsgReceiverListener, ISessionContext
  8. {
  9. public IMsgSender Sender { get; private set; }
  10. public IMsgGenerator Generator { get; private set; }
  11. public IMsgProcesser Processer { get; private set; }
  12. public IFrameSynchronizer Synchronizer { get; private set; }
  13. public ISerializer Serializer { get; private set; }
  14. public ITimeModule Time { get; private set; }
  15. private IHeartbeat _heartBeat;
  16. private MsgContextCache _contextCache;
  17. private MsgContextCache ContextCache => _contextCache ??= new MsgContextCache();
  18. /// <summary>网络消息打印开关</summary>
  19. public bool IsDebug { get; set;}
  20. /// <summary>加密开关,SetKey时默认设置</summary>
  21. public bool IsOpenEncrypt { get; private set;}
  22. private IMsgEncryptor _encryptor;
  23. public IMsgEncryptor Encryptor => _encryptor ??= new RC4Encryptor();
  24. private IMsgCompressor _compressor;
  25. public IMsgCompressor Compressor => _compressor ??= new ZstdCompressor();
  26. #region 网络会话变量
  27. private TCPSession _tcpRemote;
  28. private WSSession _wsRemote;
  29. private IRemoteSession _curRemote;
  30. private AddressInfo _addressInfo;
  31. #endregion
  32. private IBytesReader _bytesReader;
  33. private INetModuleListener _listener;
  34. public NetModule(ITimeModule time, IMsgGenerator generator, INetModuleListener listener)
  35. {
  36. Time = time;
  37. Generator = generator;
  38. _listener = listener;
  39. Sender = new MsgSender(this);
  40. var receiver = new MsgReceiver(this, this);
  41. _bytesReader = new BytesReader(receiver);
  42. Processer = new MsgProcesser(this, this);
  43. Serializer = new JsonSerializer(generator);
  44. Synchronizer = new FrameSynchronizer(_bytesReader, Processer, this);
  45. }
  46. #region 接口实现
  47. public void SetEncryptKey(string key)
  48. {
  49. IsOpenEncrypt = !string.IsNullOrEmpty(key);
  50. if (IsOpenEncrypt)
  51. {
  52. Encryptor.SetKey(key);
  53. }
  54. }
  55. public void Connect(AddressInfo address)
  56. {
  57. Assert.IsNotNull(address, $"[NetModule] 连接失败, AddressInfo 不能为空.");
  58. _addressInfo = address;
  59. InnerConnect(address);
  60. }
  61. public void Disconnect()
  62. {
  63. _heartBeat?.Stop();
  64. Synchronizer.Reset();
  65. _bytesReader.Reset();
  66. _curRemote?.Disconnect();
  67. _curRemote = null;
  68. }
  69. public void Reconnect()
  70. {
  71. Disconnect();
  72. Assert.IsNotNull(_addressInfo, $"[NetModule] 重连失败, AddressInfo 不能为空.");
  73. InnerConnect(_addressInfo);
  74. }
  75. void INetModule.Send(IMsgRequest msg, bool isFilter)
  76. {
  77. var args = new RequestEventArgs()
  78. {
  79. protoId = msg.ProtocolID,
  80. isFilter = isFilter,
  81. };
  82. if (!IsConnected)
  83. {
  84. args.code = ESessionCode.SendNoConnect;
  85. }
  86. else if (!Sender.IsCanSend(msg, isFilter))
  87. {
  88. args.code = ESessionCode.SendFilted;
  89. }
  90. else
  91. {
  92. if (IsDebug)
  93. {
  94. Log.Info("[NetModule] Send <{1}> {2}: Seq:{4} Timestamp:{0}\n{3}",
  95. Time.GetNowTime(ClockType.Client), msg.ProtocolID, msg.GetType().Name, XJson.ToJson(msg), msg.InstanceID);
  96. }
  97. ContextCache.Push(msg.InstanceID, msg.ProtocolID, msg.Context);
  98. args.seqId = msg.InstanceID;
  99. args.code = Sender.Send(msg, IsOpenEncrypt);
  100. }
  101. _listener?.OnRequest(args);
  102. }
  103. void INetModule.StartHeartbeat()
  104. {
  105. if (IsConnected)
  106. {
  107. var impl = _heartBeat ??= new Heartbeat(this, this);
  108. impl.Start();
  109. }
  110. else
  111. {
  112. Log.Error($"[NetModule] 心跳启动失败, 网络还没连接或初始化.");
  113. }
  114. }
  115. void INetModule.StopHeartbeat()
  116. {
  117. _heartBeat?.Stop();
  118. }
  119. void IDisposable.Dispose()
  120. {
  121. _listener = null;
  122. _bytesReader = null;
  123. (Synchronizer as IDisposable)?.Dispose();
  124. (Processer as IDisposable)?.Dispose();
  125. (Sender as IDisposable)?.Dispose();
  126. (_heartBeat as IDisposable)?.Dispose();
  127. _heartBeat = null;
  128. _contextCache?.Dispose();
  129. _contextCache = null;
  130. _compressor = null;
  131. _encryptor = null;
  132. (_tcpRemote as IDisposable)?.Dispose();
  133. (_wsRemote as IDisposable)?.Dispose();
  134. _curRemote = null;
  135. _tcpRemote = null;
  136. _wsRemote = null;
  137. _addressInfo = null;
  138. }
  139. public IRemoteSession Session
  140. {
  141. get
  142. {
  143. Assert.IsNotNull(_curRemote, $"[NetModule] 网络还没连接或初始化.");
  144. return _curRemote;
  145. }
  146. }
  147. void IHeartbeatListener.OnTimeout()
  148. {
  149. //TODO 广播
  150. _listener?.OnHeartbeatTimeout();
  151. }
  152. #endregion
  153. #region IMsgReceiverListener 实现
  154. void IMsgReceiverListener.OnHeartbeatReceived()
  155. {
  156. _heartBeat?.Receive();
  157. }
  158. void IMsgReceiverListener.OnPreMessage(IMessage message)
  159. {
  160. Sender.VerifyInstanceID(message.InstanceID);
  161. if (message is IMsgResponse)
  162. {
  163. var seqID = message.InstanceID;
  164. var protoID = message.ProtocolID;
  165. if (ContextCache.Pop(seqID, out var reqProtocol, out var reqContext))
  166. {
  167. if (protoID != reqProtocol)
  168. {
  169. if (protoID != NetDefine.GATEWAY_ERROR_ID)//protoId=1为网关异常消息,由业务处理,框架屏蔽此项的提示
  170. Log.Error($"[NetModule] [Response] ProtoID和Seq没有对应 recv-ProtoID:{protoID} seq:{seqID} seq-ProtoID:{reqProtocol}");
  171. _listener?.OnResponseSeqError(seqID, reqProtocol, protoID);
  172. }
  173. else
  174. { // 将缓存的Context赋值给Response
  175. message.Context = reqContext;
  176. }
  177. }
  178. else
  179. {
  180. //收到 没有context的消息
  181. if (protoID != NetDefine.GATEWAY_ERROR_ID)//protoId=1为网关异常消息,由业务处理,框架屏蔽此项的提示
  182. Log.Error($"[NetModule] [Response] seq:{seqID} protoid:{protoID} seq没有对应的contextInfo");
  183. _listener?.OnResponseSeqError(seqID, -1, protoID);
  184. }
  185. }
  186. if (IsDebug)
  187. {
  188. Log.Info("[NetModule] Receive <{1}> {2}: Seq:{4} Timestamp:{0}\n{3}",
  189. Time.GetNowTime(ClockType.Client), message.ProtocolID, message.GetType().Name, XJson.ToJson(message), message.InstanceID);
  190. }
  191. }
  192. void IMsgReceiverListener.OnResponseFinish(ResponseEventArgs args)
  193. {
  194. _listener?.OnResponseFinish(args);
  195. }
  196. void IMsgReceiverListener.OnEvent(ESessionCode code, Exception exception)
  197. {
  198. LogException(code, exception);
  199. SessionEventArgs args = default;
  200. args.code = code;
  201. switch (code)
  202. {
  203. case ESessionCode.StartConnect:
  204. _listener?.OnConnectStart();
  205. return;
  206. case ESessionCode.Connected:
  207. (Sender as IReset)?.Reset();//seq、lastid重置
  208. ContextCache.Reset();//清空缓存
  209. _listener?.OnConnected();
  210. return;
  211. case ESessionCode.Disconnect:
  212. _listener?.OnDisconnected();
  213. return;
  214. case ESessionCode.ConnectFail:
  215. //args.errorMsg = exception.Message;
  216. //break;
  217. case ESessionCode.DisconnectError:
  218. case ESessionCode.SendException:
  219. case ESessionCode.RecvError:
  220. args.errorMsg= exception.Message;
  221. if (exception is System.Net.Sockets.SocketException se)
  222. {
  223. args.isSocketError = true;
  224. args.socketError = se.SocketErrorCode;
  225. }
  226. else if (exception is Web.WebSocketException webEx)
  227. {
  228. args.websocketCode = webEx.code;
  229. }
  230. break;
  231. }
  232. _listener?.OnSessionError(args);
  233. }
  234. private void LogException(ESessionCode sessionCode, Exception ex)
  235. {
  236. if (ex != null)
  237. {
  238. string socketCode;
  239. if (ex is System.Net.Sockets.SocketException se)
  240. {
  241. socketCode = se.SocketErrorCode.ToString();
  242. }
  243. else
  244. {
  245. socketCode = "NONE";
  246. }
  247. Log.Exception($"[NetModule] Exception: Code: {sessionCode}, SocketError: {socketCode}\n", ex);
  248. }
  249. }
  250. #endregion
  251. private void InnerConnect(AddressInfo address)
  252. {
  253. if (_curRemote != null && _curRemote.Status != SessionStatus.FREE)
  254. {
  255. Log.Error($"[NetModule] SessionStatus:{_curRemote.Status} 已连接或者正在连接,又尝试连接");
  256. return;
  257. }
  258. switch (address.ProtocolType)
  259. {
  260. case ProtocolType.TCP:
  261. _curRemote = _tcpRemote ??= new TCPSession(NetDefine.SESSION_BUFFER_SIZE, NetDefine.SESSION_TIMEOUT, this);
  262. break;
  263. case ProtocolType.WS:
  264. case ProtocolType.WSS:
  265. #if !UNITY_EDITOR && UNITY_WEBGL
  266. _curRemote = new Web.WebSocket(this);
  267. #else
  268. _curRemote = _wsRemote ??= new WSSession(NetDefine.SESSION_BUFFER_SIZE, NetDefine.SESSION_TIMEOUT, this);
  269. #endif
  270. break;
  271. default:
  272. Log.Error($"[NetModule] 连接失败,协议类型错误{address.ProtocolType}");
  273. return;
  274. }
  275. Log.Debug($"[NetModule] StartConnect: Type:{address.ProtocolType}, Address:{address.Address}, Port:{address.Port}");
  276. _curRemote.Connect(address);
  277. }
  278. private bool IsConnected => _curRemote != null && _curRemote.Status == SessionStatus.CONNECTED;
  279. }
  280. }