using System; using System.Collections.Concurrent; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; namespace XGame.Framework.Network { public class WSSession : IRemoteSession, IDisposable { readonly int TIMEOUT; private volatile SessionStatus _status = SessionStatus.FREE; public SessionStatus Status => _status; public SessionAddress Address { get; private set; } ClientWebSocket _webSocket; byte[] _buffer; CancellationTokenSource _sendSource; CancellationTokenSource _receiveSource; ConcurrentQueue _pendings; private ISessionContext _context; struct PendingCommand { public byte[] buffer; public int offset; public int length; } internal WSSession(int bufferSize, int timeout, ISessionContext context) { _buffer = new byte[bufferSize]; TIMEOUT = timeout; _pendings = new ConcurrentQueue(); _sendSource = new CancellationTokenSource(); _receiveSource = new CancellationTokenSource(); _context = context; } #region Connect Method public void Connect(AddressInfo info) { if (Status != SessionStatus.FREE) { Log.Warn($"[WSSession] SessionStatus:{Status} 已连接或者正在连接,又尝试连接"); return; } OnStartConnect(); if (Address == null || Address.Domain != info.Address || Address.PORT == info.Port || Address.ProtocolType == info.ProtocolType) { Address = new SessionAddress(info); } var uri = GetUri(Address); if (uri == null) { OnConnectFail(new Exception($"[WSSession] ProtocolType {Address.ProtocolType} is not find")); return; } StartConnect(uri).ConfigureAwait(false); } Uri GetUri(SessionAddress info) { string url; switch (info.ProtocolType) { case ProtocolType.WS: if (string.IsNullOrEmpty(info.URI)) url = $"ws://{info.Domain}:{info.PORT}"; else url = info.URI; break; case ProtocolType.WSS: if (string.IsNullOrEmpty(info.URI)) url = $"wss://{info.Domain}:{info.PORT}"; else url = info.URI; break; default: return null; } return new Uri(url); } async Task StartConnect(Uri uri) { await Task.Run(() => { _status = SessionStatus.CONNECTING; var webSocket = new ClientWebSocket(); webSocket.Options.KeepAliveInterval = new TimeSpan(0, 0, 30); webSocket.Options.Proxy = null; webSocket.Options.UseDefaultCredentials = true; try { IAsyncResult asyncResult = webSocket.ConnectAsync(uri, CancellationToken.None); var success = asyncResult.AsyncWaitHandle.WaitOne(TIMEOUT, true); if (success && webSocket.State == WebSocketState.Open) { while (_pendings.TryDequeue(out var _)) ; // clear pendings _webSocket = webSocket; if (_sendSource != null) try { _sendSource.Dispose(); } catch { } if (_receiveSource != null) try { _receiveSource.Dispose(); } catch { } _sendSource = new CancellationTokenSource(); _receiveSource = new CancellationTokenSource(); _ = Task.Run(() => LoopSendAsync().ConfigureAwait(false)); _ = Task.Run(() => LoopReceiveAsync().ConfigureAwait(false)); OnConnected(); } else { webSocket.Dispose(); OnConnectTimeout(); } } catch (Exception e) { webSocket.Dispose(); OnConnectFail(e); } }); } #endregion #region Disconnect Method public void Disconnect() { if (Status != SessionStatus.FREE) { Exception exception = null; _sendSource.Cancel(); try { if (_webSocket != null && _webSocket.State == WebSocketState.Open) { IAsyncResult resultAsync = _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None); resultAsync.AsyncWaitHandle.WaitOne(TIMEOUT, true); } else { Log.Debug($"[WSSession] CloseOutputAsync else!!!: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}"); } } catch (Exception e) { exception = e; } finally { _receiveSource.Cancel(); try { _webSocket?.Dispose(); } catch { } _webSocket = null; _status = SessionStatus.FREE; if (exception != null) OnDisconnectError(exception); OnDisconnect(); } } } #endregion public bool IsConnected(bool bPrecice) { return _webSocket != null && Status == SessionStatus.CONNECTED; } public bool Send(byte[] bytes, int offset, int length) { PendingCommand cmd = new PendingCommand() { buffer = bytes, offset = offset, length = length }; _pendings.Enqueue(cmd); return true; } async Task LoopSendAsync() { var cancelToken = _sendSource.Token; Log.Debug($"[WSSession] LoopSendAsync Start: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_sendSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}"); try { while (_webSocket.State != WebSocketState.Closed && !cancelToken.IsCancellationRequested) { cancelToken.ThrowIfCancellationRequested(); while (_pendings.TryDequeue(out var cmd)) { var buffer = new ArraySegment(cmd.buffer, cmd.offset, cmd.length); cancelToken.ThrowIfCancellationRequested(); await _webSocket.SendAsync(buffer, WebSocketMessageType.Binary, true, cancelToken).ConfigureAwait(false); cancelToken.ThrowIfCancellationRequested(); SessionBufferPool.Recycle(cmd.buffer); } Thread.Sleep(1); } } catch (OperationCanceledException oce) { Log.WarnException("[WSSession] Normal upon task/token cancellation, disregard", oce); } catch (Exception e) { OnSendError(e); } finally { Log.Debug($"[WSSession] LoopSendAsync Exit: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_sendSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}"); } } async Task LoopReceiveAsync() { var cancelToken = _receiveSource.Token; Log.Debug($"[WSSession] LoopReceiveAsync Start: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_receiveSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}"); Exception exception = null; try { var buffer = new ArraySegment(_buffer, 0, _buffer.Length); while (_webSocket.State != WebSocketState.Closed && !cancelToken.IsCancellationRequested) { cancelToken.ThrowIfCancellationRequested(); var receiveResult = await _webSocket.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false); if (!cancelToken.IsCancellationRequested) { if (_webSocket.State == WebSocketState.CloseReceived && receiveResult.MessageType == WebSocketMessageType.Close) { _sendSource.Cancel(); await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Acknowledge Close frame", CancellationToken.None).ConfigureAwait(false); OnRemoteClose(); } if (_webSocket.State == WebSocketState.Open && receiveResult.MessageType != WebSocketMessageType.Close) { int length = receiveResult.Count; if (length > 0) OnRead(buffer.Array, buffer.Offset, length); } } Thread.Sleep(1); if (Status == SessionStatus.FREE) throw new OperationCanceledException("No cancel after disconnected..."); } } catch (OperationCanceledException oce) { Log.WarnException("[WSSession] Normal upon task/token cancellation, disregard", oce); } catch (Exception e) { exception = e; } finally { _sendSource.Cancel(); if (exception != null) OnReceiveError(exception); } Log.Debug($"[WSSession] LoopReceiveAsync Exit: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_receiveSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}"); } void OnRead(byte[] bytes, int offset, int length) { var buffer = SessionBufferPool.Acquire(); Array.Copy(bytes, offset, buffer, 0, length); _context.Synchronizer.Enqueue(buffer, 0, length); } void IDisposable.Dispose() { Disconnect(); _context = null; } #region Event private void OnConnectFail(Exception e) { _status = SessionStatus.CONNECT_FAIL; _context.Synchronizer.Enqueue(ESessionCode.ConnectFail, e); } private void OnConnectTimeout() { _status = SessionStatus.CONNECT_TIMEOUT; _context.Synchronizer.Enqueue(ESessionCode.ConnectTimeout, null); } private void OnConnected() { _status = SessionStatus.CONNECTED; _context.Synchronizer.Enqueue(ESessionCode.Connected, null); } private void OnStartConnect() { _context.Synchronizer.Enqueue(ESessionCode.StartConnect, null); } private void OnDisconnect() { _context.Synchronizer.Enqueue(ESessionCode.Disconnect, null); } private void OnDisconnectError(Exception e) { _context.Synchronizer.Enqueue(ESessionCode.DisconnectError, e); } private void OnSendError(Exception e) { _status = SessionStatus.SEND_ERROR; _context.Synchronizer.Enqueue(ESessionCode.SendException, e); } private void OnReceiveError(Exception e) { _status = SessionStatus.RECV_ERROR; _context.Synchronizer.Enqueue(ESessionCode.RecvError, e); } private void OnRemoteClose() { _status = SessionStatus.CLOSED; _context.Synchronizer.Enqueue(ESessionCode.SessionClose, null); } #endregion } }