FrameSynchronizer.cs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using XGame.Framework.Interfaces;
  6. using XGame.Framework.Json;
  7. using XGame.Framework.ThreadScheduler;
  8. namespace XGame.Framework.Network
  9. {
  10. public class FrameSynchronizer : IFrameSynchronizer, IUpdate, IDisposable
  11. {
  12. private IMsgProcesser _processer;
  13. private IMsgReceiverListener _listener;
  14. //SessionNode _node;
  15. IBytesReader _reader;
  16. ConcurrentQueue<IFrameData> _queue = new();
  17. Queue<IFrameData> _realQueue = new();
  18. private ConcurrentStack<FrameDataBytes> _bytesDataPool = new();
  19. private ConcurrentStack<FrameDataEvent> _eventDataPool = new();
  20. private ConcurrentStack<FrameDataMessage> _msgDataPool = new();
  21. public FrameSynchronizer(IBytesReader reader, IMsgProcesser processer, IMsgReceiverListener listener)
  22. {
  23. _reader = reader;
  24. _processer = processer;
  25. _listener = listener;
  26. MainThreadUpdateRegister.update += Update;
  27. }
  28. Stopwatch stopwatch = new();
  29. public void Update(int millisecond)
  30. {
  31. long totalTime = 0;
  32. try
  33. {
  34. while (_queue.TryDequeue(out var data))
  35. {
  36. stopwatch.Restart();
  37. if (data is FrameDataBytes dataBytes)
  38. {
  39. NetLog.LogHexString("[FrameSynchronizer] FrameDataBytes:\n", dataBytes.bytes, dataBytes.offset, dataBytes.length);
  40. _reader.Read(dataBytes.bytes, dataBytes.offset, dataBytes.length);
  41. dataBytes.Clear();
  42. _bytesDataPool.Push(dataBytes);
  43. }
  44. else
  45. {
  46. NetLog.LogVerbose($"[FrameSynchronizer] FrameDataType:({data.GetType()})");
  47. _realQueue.Enqueue(data);
  48. }
  49. stopwatch.Stop();
  50. totalTime += stopwatch.ElapsedMilliseconds;
  51. if (totalTime > NetDefine.MsgFrameTimeLimit) return;
  52. }
  53. while (_realQueue.Count > 0)
  54. {
  55. var realData = _realQueue.Dequeue();
  56. stopwatch.Restart();
  57. Type processType = null;
  58. if (realData is FrameDataMessage dataMsg)
  59. {
  60. processType = dataMsg.message.GetType();
  61. NetLog.LogVerbose($"[FrameSynchronizer] FrameDataMessage\n [{processType.Name}]{XJson.ToJson(dataMsg.message)}");
  62. _processer.Process(dataMsg.message);
  63. }
  64. else if (realData is FrameDataEvent dataEvent)
  65. {
  66. processType = dataEvent.exception?.GetType();
  67. NetLog.LogVerbose($"[FrameSynchronizer] FrameDataEvent Code: {dataEvent.code}");
  68. _listener?.OnEvent(dataEvent.code, dataEvent.exception);
  69. }
  70. //switch (realData.EventType)
  71. //{
  72. // case ESessionCode.ProcessPush:
  73. // var push = (IMsgData)realData.Context;
  74. // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue(ProcessPush)\n[{push.GetType().Name}]{XJson.ToJson(push)}");
  75. // _node.MsgProcesser.Process(push, MsgType.PUSH);
  76. // break;
  77. // case ESessionCode.ProcessResponse:
  78. // var resp = (IMsgData)realData.Context;
  79. // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue(ProcessResponse)\n[{resp.GetType().Name}]{XJson.ToJson(resp)}");
  80. // _node.MsgProcesser.Process(resp, MsgType.RESPONSE);
  81. // break;
  82. // default:
  83. // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue({realData.EventType})");
  84. // Remote_RemoteEvent(realData.EventType, realData.Context == null ? null : (Exception)realData.Context);
  85. // break;
  86. //}
  87. stopwatch.Stop();
  88. totalTime += stopwatch.ElapsedMilliseconds;
  89. DebugCostTime(stopwatch.ElapsedMilliseconds, processType?.Name);
  90. Recycle(realData);
  91. if (totalTime > NetDefine.MsgFrameTimeLimit) return;
  92. }
  93. }
  94. catch (Exception e)
  95. {
  96. Log.Exception("[FrameSynchronizer]", e);
  97. }
  98. }
  99. void IFrameSynchronizer.Enqueue(ESessionCode code, Exception exception)
  100. {
  101. Log.Info($"[FrameSynchronizer] Enqueue ESessionCode:{code} Exception:{exception?.Message ?? "None"}");
  102. if (!_eventDataPool.TryPop(out var data))
  103. {
  104. data = new FrameDataEvent();
  105. }
  106. data.code = code;
  107. data.exception = exception;
  108. _queue.Enqueue(data);
  109. }
  110. void IFrameSynchronizer.Enqueue(byte[] bytes, int offset, int length)
  111. {
  112. if (!_bytesDataPool.TryPop(out var data))
  113. {
  114. data = new FrameDataBytes();
  115. }
  116. data.bytes = bytes;
  117. data.offset = offset;
  118. data.length = length;
  119. _queue.Enqueue(data);
  120. }
  121. void IFrameSynchronizer.Enqueue(IMessage message)
  122. {
  123. if (!_msgDataPool.TryPop(out var data))
  124. {
  125. data = new FrameDataMessage();
  126. }
  127. data.message = message;
  128. _realQueue.Enqueue(data);
  129. }
  130. public void Reset()
  131. {
  132. Log.Info($"[FrameSynchronizer] Reset.");
  133. while (_queue.TryDequeue(out var result)) Recycle(result);
  134. while (_realQueue.Count > 0) { Recycle(_realQueue.Dequeue()); }
  135. }
  136. public void Dispose()
  137. {
  138. MainThreadUpdateRegister.update -= Update;
  139. _queue.Clear();
  140. _realQueue.Clear();
  141. _bytesDataPool.Clear();
  142. _msgDataPool.Clear();
  143. _eventDataPool.Clear();
  144. }
  145. private void Recycle(IFrameData data)
  146. {
  147. data.Clear();
  148. if (data is FrameDataBytes bytes)
  149. {
  150. _bytesDataPool.Push(bytes);
  151. }
  152. else if (data is FrameDataEvent @event)
  153. {
  154. _eventDataPool.Push(@event);
  155. }
  156. else if (data is FrameDataMessage message)
  157. {
  158. _msgDataPool.Push(message);
  159. }
  160. }
  161. [Conditional(MacroDefine.DEBUG)]
  162. private void DebugCostTime(long time, string typeName)
  163. {
  164. if (time > NetDefine.MsgFrameTimeLimit)
  165. Log.Warn($"[Net] process {typeName} 耗时过长: {time}(ms)");
  166. }
  167. }
  168. }