123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- using System;
- using XGame.Framework.Interfaces;
- using XGame.Framework.Json;
- using XGame.Framework.Time;
- namespace XGame.Framework.Network
- {
- public partial class NetModule : INetModule, IDisposable, IHeartbeatListener, IMsgReceiverListener, ISessionContext
- {
- public IMsgSender Sender { get; private set; }
- public IMsgGenerator Generator { get; private set; }
- public IMsgProcesser Processer { get; private set; }
- public IFrameSynchronizer Synchronizer { get; private set; }
- public ISerializer Serializer { get; private set; }
- public ITimeModule Time { get; private set; }
- private IHeartbeat _heartBeat;
- private MsgContextCache _contextCache;
- private MsgContextCache ContextCache => _contextCache ??= new MsgContextCache();
- /// <summary>网络消息打印开关</summary>
- public bool IsDebug { get; set;}
- /// <summary>加密开关,SetKey时默认设置</summary>
- public bool IsOpenEncrypt { get; private set;}
- private IMsgEncryptor _encryptor;
- public IMsgEncryptor Encryptor => _encryptor ??= new RC4Encryptor();
- private IMsgCompressor _compressor;
- public IMsgCompressor Compressor => _compressor ??= new ZstdCompressor();
- #region 网络会话变量
- private TCPSession _tcpRemote;
- private WSSession _wsRemote;
- private IRemoteSession _curRemote;
- private AddressInfo _addressInfo;
- #endregion
- private IBytesReader _bytesReader;
- private INetModuleListener _listener;
- public NetModule(ITimeModule time, IMsgGenerator generator, INetModuleListener listener)
- {
- Time = time;
- Generator = generator;
- _listener = listener;
- Sender = new MsgSender(this);
- var receiver = new MsgReceiver(this, this);
- _bytesReader = new BytesReader(receiver);
- Processer = new MsgProcesser(this, this);
- Serializer = new JsonSerializer(generator);
- Synchronizer = new FrameSynchronizer(_bytesReader, Processer, this);
- }
- #region 接口实现
- public void SetEncryptKey(string key)
- {
- IsOpenEncrypt = !string.IsNullOrEmpty(key);
- if (IsOpenEncrypt)
- {
- Encryptor.SetKey(key);
- }
- }
- public void Connect(AddressInfo address)
- {
- Assert.IsNotNull(address, $"[NetModule] 连接失败, AddressInfo 不能为空.");
- _addressInfo = address;
- InnerConnect(address);
- }
- public void Disconnect()
- {
- _heartBeat?.Stop();
- Synchronizer.Reset();
- _bytesReader.Reset();
- _curRemote?.Disconnect();
- _curRemote = null;
- }
- public void Reconnect()
- {
- Disconnect();
- Assert.IsNotNull(_addressInfo, $"[NetModule] 重连失败, AddressInfo 不能为空.");
- InnerConnect(_addressInfo);
- }
- void INetModule.Send(IMsgRequest msg, bool isFilter)
- {
- var args = new RequestEventArgs()
- {
- protoId = msg.ProtocolID,
- isFilter = isFilter,
- };
- if (!IsConnected)
- {
- args.code = ESessionCode.SendNoConnect;
- }
- else if (!Sender.IsCanSend(msg, isFilter))
- {
- args.code = ESessionCode.SendFilted;
- }
- else
- {
- if (IsDebug)
- {
- Log.Info("[NetModule] Send <{1}> {2}: Seq:{4} Timestamp:{0}\n{3}",
- Time.GetNowTime(ClockType.Client), msg.ProtocolID, msg.GetType().Name, XJson.ToJson(msg), msg.InstanceID);
- }
- ContextCache.Push(msg.InstanceID, msg.ProtocolID, msg.Context);
- args.seqId = msg.InstanceID;
- args.code = Sender.Send(msg, IsOpenEncrypt);
- }
- _listener?.OnRequest(args);
- }
- void INetModule.StartHeartbeat()
- {
- if (IsConnected)
- {
- var impl = _heartBeat ??= new Heartbeat(this, this);
- impl.Start();
- }
- else
- {
- Log.Error($"[NetModule] 心跳启动失败, 网络还没连接或初始化.");
- }
- }
- void INetModule.StopHeartbeat()
- {
- _heartBeat?.Stop();
- }
- void IDisposable.Dispose()
- {
- _listener = null;
- _bytesReader = null;
- (Synchronizer as IDisposable)?.Dispose();
- (Processer as IDisposable)?.Dispose();
- (Sender as IDisposable)?.Dispose();
- (_heartBeat as IDisposable)?.Dispose();
- _heartBeat = null;
- _contextCache?.Dispose();
- _contextCache = null;
- _compressor = null;
- _encryptor = null;
- (_tcpRemote as IDisposable)?.Dispose();
- (_wsRemote as IDisposable)?.Dispose();
- _curRemote = null;
- _tcpRemote = null;
- _wsRemote = null;
- _addressInfo = null;
- }
- public IRemoteSession Session
- {
- get
- {
- Assert.IsNotNull(_curRemote, $"[NetModule] 网络还没连接或初始化.");
- return _curRemote;
- }
- }
- void IHeartbeatListener.OnTimeout()
- {
- //TODO 广播
- _listener?.OnHeartbeatTimeout();
- }
- #endregion
- #region IMsgReceiverListener 实现
- void IMsgReceiverListener.OnHeartbeatReceived()
- {
- _heartBeat?.Receive();
- }
- void IMsgReceiverListener.OnPreMessage(IMessage message)
- {
- Sender.VerifyInstanceID(message.InstanceID);
- if (message is IMsgResponse)
- {
- var seqID = message.InstanceID;
- var protoID = message.ProtocolID;
- if (ContextCache.Pop(seqID, out var reqProtocol, out var reqContext))
- {
- if (protoID != reqProtocol)
- {
- if (protoID != NetDefine.GATEWAY_ERROR_ID)//protoId=1为网关异常消息,由业务处理,框架屏蔽此项的提示
- Log.Error($"[NetModule] [Response] ProtoID和Seq没有对应 recv-ProtoID:{protoID} seq:{seqID} seq-ProtoID:{reqProtocol}");
- _listener?.OnResponseSeqError(seqID, reqProtocol, protoID);
- }
- else
- { // 将缓存的Context赋值给Response
- message.Context = reqContext;
- }
- }
- else
- {
- //收到 没有context的消息
- if (protoID != NetDefine.GATEWAY_ERROR_ID)//protoId=1为网关异常消息,由业务处理,框架屏蔽此项的提示
- Log.Error($"[NetModule] [Response] seq:{seqID} protoid:{protoID} seq没有对应的contextInfo");
- _listener?.OnResponseSeqError(seqID, -1, protoID);
- }
- }
- if (IsDebug)
- {
- Log.Info("[NetModule] Receive <{1}> {2}: Seq:{4} Timestamp:{0}\n{3}",
- Time.GetNowTime(ClockType.Client), message.ProtocolID, message.GetType().Name, XJson.ToJson(message), message.InstanceID);
- }
- }
- void IMsgReceiverListener.OnResponseFinish(ResponseEventArgs args)
- {
- _listener?.OnResponseFinish(args);
- }
- void IMsgReceiverListener.OnEvent(ESessionCode code, Exception exception)
- {
- LogException(code, exception);
- SessionEventArgs args = default;
- args.code = code;
- switch (code)
- {
- case ESessionCode.StartConnect:
- _listener?.OnConnectStart();
- return;
- case ESessionCode.Connected:
- (Sender as IReset)?.Reset();//seq、lastid重置
- ContextCache.Reset();//清空缓存
- _listener?.OnConnected();
- return;
- case ESessionCode.Disconnect:
- _listener?.OnDisconnected();
- return;
- case ESessionCode.ConnectFail:
- //args.errorMsg = exception.Message;
- //break;
- case ESessionCode.DisconnectError:
- case ESessionCode.SendException:
- case ESessionCode.RecvError:
- args.errorMsg= exception.Message;
- if (exception is System.Net.Sockets.SocketException se)
- {
- args.isSocketError = true;
- args.socketError = se.SocketErrorCode;
- }
- else if (exception is Web.WebSocketException webEx)
- {
- args.websocketCode = webEx.code;
- }
- break;
- }
- _listener?.OnSessionError(args);
- }
- private void LogException(ESessionCode sessionCode, Exception ex)
- {
- if (ex != null)
- {
- string socketCode;
- if (ex is System.Net.Sockets.SocketException se)
- {
- socketCode = se.SocketErrorCode.ToString();
- }
- else
- {
- socketCode = "NONE";
- }
- Log.Exception($"[NetModule] Exception: Code: {sessionCode}, SocketError: {socketCode}\n", ex);
- }
- }
- #endregion
- private void InnerConnect(AddressInfo address)
- {
- if (_curRemote != null && _curRemote.Status != SessionStatus.FREE)
- {
- Log.Error($"[NetModule] SessionStatus:{_curRemote.Status} 已连接或者正在连接,又尝试连接");
- return;
- }
- switch (address.ProtocolType)
- {
- case ProtocolType.TCP:
- _curRemote = _tcpRemote ??= new TCPSession(NetDefine.SESSION_BUFFER_SIZE, NetDefine.SESSION_TIMEOUT, this);
- break;
- case ProtocolType.WS:
- case ProtocolType.WSS:
- #if !UNITY_EDITOR && UNITY_WEBGL
- _curRemote = new Web.WebSocket(this);
- #else
- _curRemote = _wsRemote ??= new WSSession(NetDefine.SESSION_BUFFER_SIZE, NetDefine.SESSION_TIMEOUT, this);
- #endif
- break;
- default:
- Log.Error($"[NetModule] 连接失败,协议类型错误{address.ProtocolType}");
- return;
- }
- Log.Debug($"[NetModule] StartConnect: Type:{address.ProtocolType}, Address:{address.Address}, Port:{address.Port}");
- _curRemote.Connect(address);
- }
- private bool IsConnected => _curRemote != null && _curRemote.Status == SessionStatus.CONNECTED;
- }
- }
|