using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using XGame.Framework.Interfaces; using XGame.Framework.Json; using XGame.Framework.ThreadScheduler; namespace XGame.Framework.Network { public class FrameSynchronizer : IFrameSynchronizer, IUpdate, IDisposable { private IMsgProcesser _processer; private IMsgReceiverListener _listener; //SessionNode _node; IBytesReader _reader; ConcurrentQueue _queue = new(); Queue _realQueue = new(); private ConcurrentStack _bytesDataPool = new(); private ConcurrentStack _eventDataPool = new(); private ConcurrentStack _msgDataPool = new(); public FrameSynchronizer(IBytesReader reader, IMsgProcesser processer, IMsgReceiverListener listener) { _reader = reader; _processer = processer; _listener = listener; MainThreadUpdateRegister.update += Update; } Stopwatch stopwatch = new(); public void Update(int millisecond) { long totalTime = 0; try { while (_queue.TryDequeue(out var data)) { stopwatch.Restart(); if (data is FrameDataBytes dataBytes) { NetLog.LogHexString("[FrameSynchronizer] FrameDataBytes:\n", dataBytes.bytes, dataBytes.offset, dataBytes.length); _reader.Read(dataBytes.bytes, dataBytes.offset, dataBytes.length); dataBytes.Clear(); _bytesDataPool.Push(dataBytes); } else { NetLog.LogVerbose($"[FrameSynchronizer] FrameDataType:({data.GetType()})"); _realQueue.Enqueue(data); } stopwatch.Stop(); totalTime += stopwatch.ElapsedMilliseconds; if (totalTime > NetDefine.MsgFrameTimeLimit) return; } while (_realQueue.Count > 0) { var realData = _realQueue.Dequeue(); stopwatch.Restart(); Type processType = null; if (realData is FrameDataMessage dataMsg) { processType = dataMsg.message.GetType(); NetLog.LogVerbose($"[FrameSynchronizer] FrameDataMessage\n [{processType.Name}]{XJson.ToJson(dataMsg.message)}"); _processer.Process(dataMsg.message); } else if (realData is FrameDataEvent dataEvent) { processType = dataEvent.exception?.GetType(); NetLog.LogVerbose($"[FrameSynchronizer] FrameDataEvent Code: {dataEvent.code}"); _listener?.OnEvent(dataEvent.code, dataEvent.exception); } //switch (realData.EventType) //{ // case ESessionCode.ProcessPush: // var push = (IMsgData)realData.Context; // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue(ProcessPush)\n[{push.GetType().Name}]{XJson.ToJson(push)}"); // _node.MsgProcesser.Process(push, MsgType.PUSH); // break; // case ESessionCode.ProcessResponse: // var resp = (IMsgData)realData.Context; // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue(ProcessResponse)\n[{resp.GetType().Name}]{XJson.ToJson(resp)}"); // _node.MsgProcesser.Process(resp, MsgType.RESPONSE); // break; // default: // NetLog.LogVerbose($"[MsgFrameDistributor] Update-RealQueue({realData.EventType})"); // Remote_RemoteEvent(realData.EventType, realData.Context == null ? null : (Exception)realData.Context); // break; //} stopwatch.Stop(); totalTime += stopwatch.ElapsedMilliseconds; DebugCostTime(stopwatch.ElapsedMilliseconds, processType?.Name); Recycle(realData); if (totalTime > NetDefine.MsgFrameTimeLimit) return; } } catch (Exception e) { Log.Exception("[FrameSynchronizer]", e); } } void IFrameSynchronizer.Enqueue(ESessionCode code, Exception exception) { Log.Info($"[FrameSynchronizer] Enqueue ESessionCode:{code} Exception:{exception?.Message ?? "None"}"); if (!_eventDataPool.TryPop(out var data)) { data = new FrameDataEvent(); } data.code = code; data.exception = exception; _queue.Enqueue(data); } void IFrameSynchronizer.Enqueue(byte[] bytes, int offset, int length) { if (!_bytesDataPool.TryPop(out var data)) { data = new FrameDataBytes(); } data.bytes = bytes; data.offset = offset; data.length = length; _queue.Enqueue(data); } void IFrameSynchronizer.Enqueue(IMessage message) { if (!_msgDataPool.TryPop(out var data)) { data = new FrameDataMessage(); } data.message = message; _realQueue.Enqueue(data); } public void Reset() { Log.Info($"[FrameSynchronizer] Reset."); while (_queue.TryDequeue(out var result)) Recycle(result); while (_realQueue.Count > 0) { Recycle(_realQueue.Dequeue()); } } public void Dispose() { MainThreadUpdateRegister.update -= Update; _queue.Clear(); _realQueue.Clear(); _bytesDataPool.Clear(); _msgDataPool.Clear(); _eventDataPool.Clear(); } private void Recycle(IFrameData data) { data.Clear(); if (data is FrameDataBytes bytes) { _bytesDataPool.Push(bytes); } else if (data is FrameDataEvent @event) { _eventDataPool.Push(@event); } else if (data is FrameDataMessage message) { _msgDataPool.Push(message); } } [Conditional(MacroDefine.DEBUG)] private void DebugCostTime(long time, string typeName) { if (time > NetDefine.MsgFrameTimeLimit) Log.Warn($"[Net] process {typeName} 耗时过长: {time}(ms)"); } } }