123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using XGame.Framework.Interfaces;
- using XGame.Framework.Json;
- using XGame.Framework.ThreadScheduler;
- namespace XGame.Framework.Network
- {
- public class FrameSynchronizer : IFrameSynchronizer, IUpdate, IDisposable
- {
- private IMsgProcesser _processer;
- private IMsgReceiverListener _listener;
- //SessionNode _node;
- IBytesReader _reader;
- ConcurrentQueue<IFrameData> _queue = new();
- Queue<IFrameData> _realQueue = new();
- private ConcurrentStack<FrameDataBytes> _bytesDataPool = new();
- private ConcurrentStack<FrameDataEvent> _eventDataPool = new();
- private ConcurrentStack<FrameDataMessage> _msgDataPool = new();
- public FrameSynchronizer(IBytesReader reader, IMsgProcesser processer, IMsgReceiverListener listener)
- {
- _reader = reader;
- _processer = processer;
- _listener = listener;
- MainThreadUpdateRegister.update += Update;
- }
- Stopwatch stopwatch = new();
- public void Update(int millisecond)
- {
- long totalTime = 0;
- try
- {
- while (_queue.TryDequeue(out var data))
- {
- stopwatch.Restart();
- if (data is FrameDataBytes dataBytes)
- {
- NetLog.LogHexString("[FrameSynchronizer] FrameDataBytes:\n", dataBytes.bytes, dataBytes.offset, dataBytes.length);
- _reader.Read(dataBytes.bytes, dataBytes.offset, dataBytes.length);
- dataBytes.Clear();
- _bytesDataPool.Push(dataBytes);
- }
- else
- {
- NetLog.LogVerbose($"[FrameSynchronizer] FrameDataType:({data.GetType()})");
- _realQueue.Enqueue(data);
- }
- stopwatch.Stop();
- totalTime += stopwatch.ElapsedMilliseconds;
- if (totalTime > NetDefine.MsgFrameTimeLimit) return;
- }
- while (_realQueue.Count > 0)
- {
- var realData = _realQueue.Dequeue();
- stopwatch.Restart();
- Type processType = null;
- if (realData is FrameDataMessage dataMsg)
- {
- processType = dataMsg.message.GetType();
- NetLog.LogVerbose($"[FrameSynchronizer] FrameDataMessage\n [{processType.Name}]{XJson.ToJson(dataMsg.message)}");
- _processer.Process(dataMsg.message);
- }
- else if (realData is FrameDataEvent dataEvent)
- {
- processType = dataEvent.exception?.GetType();
- NetLog.LogVerbose($"[FrameSynchronizer] FrameDataEvent Code: {dataEvent.code}");
- _listener?.OnEvent(dataEvent.code, dataEvent.exception);
- }
- //switch (realData.EventType)
- //{
- // case ESessionCode.ProcessPush:
- // var push = (IMsgData)realData.Context;
- // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue(ProcessPush)\n[{push.GetType().Name}]{XJson.ToJson(push)}");
- // _node.MsgProcesser.Process(push, MsgType.PUSH);
- // break;
- // case ESessionCode.ProcessResponse:
- // var resp = (IMsgData)realData.Context;
- // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue(ProcessResponse)\n[{resp.GetType().Name}]{XJson.ToJson(resp)}");
- // _node.MsgProcesser.Process(resp, MsgType.RESPONSE);
- // break;
- // default:
- // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue({realData.EventType})");
- // Remote_RemoteEvent(realData.EventType, realData.Context == null ? null : (Exception)realData.Context);
- // break;
- //}
- stopwatch.Stop();
- totalTime += stopwatch.ElapsedMilliseconds;
- DebugCostTime(stopwatch.ElapsedMilliseconds, processType?.Name);
- Recycle(realData);
- if (totalTime > NetDefine.MsgFrameTimeLimit) return;
- }
- }
- catch (Exception e)
- {
- Log.Exception("[FrameSynchronizer]", e);
- }
- }
- void IFrameSynchronizer.Enqueue(ESessionCode code, Exception exception)
- {
- Log.Info($"[FrameSynchronizer] Enqueue ESessionCode:{code} Exception:{exception?.Message ?? "None"}");
- if (!_eventDataPool.TryPop(out var data))
- {
- data = new FrameDataEvent();
- }
- data.code = code;
- data.exception = exception;
- _queue.Enqueue(data);
- }
- void IFrameSynchronizer.Enqueue(byte[] bytes, int offset, int length)
- {
- if (!_bytesDataPool.TryPop(out var data))
- {
- data = new FrameDataBytes();
- }
- data.bytes = bytes;
- data.offset = offset;
- data.length = length;
- _queue.Enqueue(data);
- }
- void IFrameSynchronizer.Enqueue(IMessage message)
- {
- if (!_msgDataPool.TryPop(out var data))
- {
- data = new FrameDataMessage();
- }
- data.message = message;
- _realQueue.Enqueue(data);
- }
- public void Reset()
- {
- Log.Info($"[FrameSynchronizer] Reset.");
- while (_queue.TryDequeue(out var result)) Recycle(result);
- while (_realQueue.Count > 0) { Recycle(_realQueue.Dequeue()); }
- }
- public void Dispose()
- {
- MainThreadUpdateRegister.update -= Update;
- _queue.Clear();
- _realQueue.Clear();
- _bytesDataPool.Clear();
- _msgDataPool.Clear();
- _eventDataPool.Clear();
- }
- private void Recycle(IFrameData data)
- {
- data.Clear();
- if (data is FrameDataBytes bytes)
- {
- _bytesDataPool.Push(bytes);
- }
- else if (data is FrameDataEvent @event)
- {
- _eventDataPool.Push(@event);
- }
- else if (data is FrameDataMessage message)
- {
- _msgDataPool.Push(message);
- }
- }
- [Conditional(MacroDefine.DEBUG)]
- private void DebugCostTime(long time, string typeName)
- {
- if (time > NetDefine.MsgFrameTimeLimit)
- Log.Warn($"[Net] process {typeName} 耗时过长: {time}(ms)");
- }
- }
- }
|