123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- using System;
- using System.Collections.Concurrent;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- namespace XGame.Framework.Network
- {
- public class WSSession : IRemoteSession, IDisposable
- {
- readonly int TIMEOUT;
- private volatile SessionStatus _status = SessionStatus.FREE;
- public SessionStatus Status => _status;
- public SessionAddress Address { get; private set; }
- ClientWebSocket _webSocket;
- byte[] _buffer;
- CancellationTokenSource _sendSource;
- CancellationTokenSource _receiveSource;
- ConcurrentQueue<PendingCommand> _pendings;
- private ISessionContext _context;
- struct PendingCommand
- {
- public byte[] buffer;
- public int offset;
- public int length;
- }
- internal WSSession(int bufferSize, int timeout, ISessionContext context)
- {
- _buffer = new byte[bufferSize];
- TIMEOUT = timeout;
- _pendings = new ConcurrentQueue<PendingCommand>();
- _sendSource = new CancellationTokenSource();
- _receiveSource = new CancellationTokenSource();
- _context = context;
- }
- #region Connect Method
- public void Connect(AddressInfo info)
- {
- if (Status != SessionStatus.FREE)
- {
- Log.Warn($"[WSSession] SessionStatus:{Status} 已连接或者正在连接,又尝试连接");
- return;
- }
- OnStartConnect();
- if (Address == null ||
- Address.Domain != info.Address ||
- Address.PORT == info.Port ||
- Address.ProtocolType == info.ProtocolType)
- {
- Address = new SessionAddress(info);
- }
- var uri = GetUri(Address);
- if (uri == null)
- {
- OnConnectFail(new Exception($"[WSSession] ProtocolType {Address.ProtocolType} is not find"));
- return;
- }
- StartConnect(uri).ConfigureAwait(false);
- }
- Uri GetUri(SessionAddress info)
- {
- string url;
- switch (info.ProtocolType)
- {
- case ProtocolType.WS:
- if (string.IsNullOrEmpty(info.URI))
- url = $"ws://{info.Domain}:{info.PORT}";
- else
- url = info.URI;
- break;
- case ProtocolType.WSS:
- if (string.IsNullOrEmpty(info.URI))
- url = $"wss://{info.Domain}:{info.PORT}";
- else
- url = info.URI;
- break;
- default:
- return null;
- }
- return new Uri(url);
- }
- async Task StartConnect(Uri uri)
- {
- await Task.Run(() =>
- {
- _status = SessionStatus.CONNECTING;
- var webSocket = new ClientWebSocket();
- webSocket.Options.KeepAliveInterval = new TimeSpan(0, 0, 30);
- webSocket.Options.Proxy = null;
- webSocket.Options.UseDefaultCredentials = true;
- try
- {
- IAsyncResult asyncResult = webSocket.ConnectAsync(uri, CancellationToken.None);
- var success = asyncResult.AsyncWaitHandle.WaitOne(TIMEOUT, true);
- if (success && webSocket.State == WebSocketState.Open)
- {
- while (_pendings.TryDequeue(out var _)) ; // clear pendings
- _webSocket = webSocket;
- if (_sendSource != null)
- try { _sendSource.Dispose(); } catch { }
- if (_receiveSource != null)
- try { _receiveSource.Dispose(); } catch { }
- _sendSource = new CancellationTokenSource();
- _receiveSource = new CancellationTokenSource();
- _ = Task.Run(() => LoopSendAsync().ConfigureAwait(false));
- _ = Task.Run(() => LoopReceiveAsync().ConfigureAwait(false));
- OnConnected();
- }
- else
- {
- webSocket.Dispose();
- OnConnectTimeout();
- }
- }
- catch (Exception e)
- {
- webSocket.Dispose();
- OnConnectFail(e);
- }
- });
- }
- #endregion
- #region Disconnect Method
- public void Disconnect()
- {
- if (Status != SessionStatus.FREE)
- {
- Exception exception = null;
- _sendSource.Cancel();
- try
- {
- if (_webSocket != null && _webSocket.State == WebSocketState.Open)
- {
- IAsyncResult resultAsync = _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
- resultAsync.AsyncWaitHandle.WaitOne(TIMEOUT, true);
- }
- else
- {
- Log.Debug($"[WSSession] CloseOutputAsync else!!!: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}");
- }
- }
- catch (Exception e)
- {
- exception = e;
- }
- finally
- {
- _receiveSource.Cancel();
- try { _webSocket?.Dispose(); } catch { }
- _webSocket = null;
- _status = SessionStatus.FREE;
- if (exception != null)
- OnDisconnectError(exception);
- OnDisconnect();
- }
- }
- }
- #endregion
- public bool IsConnected(bool bPrecice)
- {
- return _webSocket != null && Status == SessionStatus.CONNECTED;
- }
- public bool Send(byte[] bytes, int offset, int length)
- {
- PendingCommand cmd = new PendingCommand()
- {
- buffer = bytes,
- offset = offset,
- length = length
- };
- _pendings.Enqueue(cmd);
- return true;
- }
- async Task LoopSendAsync()
- {
- var cancelToken = _sendSource.Token;
- Log.Debug($"[WSSession] LoopSendAsync Start: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_sendSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}");
- try
- {
- while (_webSocket.State != WebSocketState.Closed && !cancelToken.IsCancellationRequested)
- {
- cancelToken.ThrowIfCancellationRequested();
- while (_pendings.TryDequeue(out var cmd))
- {
- var buffer = new ArraySegment<byte>(cmd.buffer, cmd.offset, cmd.length);
- cancelToken.ThrowIfCancellationRequested();
- await _webSocket.SendAsync(buffer, WebSocketMessageType.Binary, true, cancelToken).ConfigureAwait(false);
- cancelToken.ThrowIfCancellationRequested();
- SessionBufferPool.Recycle(cmd.buffer);
- }
- Thread.Sleep(1);
- }
- }
- catch (OperationCanceledException oce)
- {
- Log.WarnException("[WSSession] Normal upon task/token cancellation, disregard", oce);
- }
- catch (Exception e)
- {
- OnSendError(e);
- }
- finally
- {
- Log.Debug($"[WSSession] LoopSendAsync Exit: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_sendSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}");
- }
- }
- async Task LoopReceiveAsync()
- {
- var cancelToken = _receiveSource.Token;
- Log.Debug($"[WSSession] LoopReceiveAsync Start: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_receiveSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}");
- Exception exception = null;
- try
- {
- var buffer = new ArraySegment<byte>(_buffer, 0, _buffer.Length);
- while (_webSocket.State != WebSocketState.Closed && !cancelToken.IsCancellationRequested)
- {
- cancelToken.ThrowIfCancellationRequested();
- var receiveResult = await _webSocket.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false);
- if (!cancelToken.IsCancellationRequested)
- {
- if (_webSocket.State == WebSocketState.CloseReceived && receiveResult.MessageType == WebSocketMessageType.Close)
- {
- _sendSource.Cancel();
- await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Acknowledge Close frame", CancellationToken.None).ConfigureAwait(false);
- OnRemoteClose();
- }
- if (_webSocket.State == WebSocketState.Open && receiveResult.MessageType != WebSocketMessageType.Close)
- {
- int length = receiveResult.Count;
- if (length > 0)
- OnRead(buffer.Array, buffer.Offset, length);
- }
- }
- Thread.Sleep(1);
- if (Status == SessionStatus.FREE)
- throw new OperationCanceledException("No cancel after disconnected...");
- }
- }
- catch (OperationCanceledException oce)
- {
- Log.WarnException("[WSSession] Normal upon task/token cancellation, disregard", oce);
- }
- catch (Exception e)
- {
- exception = e;
- }
- finally
- {
- _sendSource.Cancel();
- if (exception != null)
- OnReceiveError(exception);
- }
- Log.Debug($"[WSSession] LoopReceiveAsync Exit: Status: {Status}, Time: {DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}, State: {_webSocket?.State}, IsNull: {_webSocket == null}, SourceIsCanceled: {_receiveSource.IsCancellationRequested}, TokenIsCanceled: {cancelToken.IsCancellationRequested}");
- }
- void OnRead(byte[] bytes, int offset, int length)
- {
- var buffer = SessionBufferPool.Acquire();
- Array.Copy(bytes, offset, buffer, 0, length);
- _context.Synchronizer.Enqueue(buffer, 0, length);
- }
- void IDisposable.Dispose()
- {
- Disconnect();
- _context = null;
- }
- #region Event
- private void OnConnectFail(Exception e)
- {
- _status = SessionStatus.CONNECT_FAIL;
- _context.Synchronizer.Enqueue(ESessionCode.ConnectFail, e);
- }
- private void OnConnectTimeout()
- {
- _status = SessionStatus.CONNECT_TIMEOUT;
- _context.Synchronizer.Enqueue(ESessionCode.ConnectTimeout, null);
- }
- private void OnConnected()
- {
- _status = SessionStatus.CONNECTED;
- _context.Synchronizer.Enqueue(ESessionCode.Connected, null);
- }
- private void OnStartConnect()
- {
- _context.Synchronizer.Enqueue(ESessionCode.StartConnect, null);
- }
- private void OnDisconnect()
- {
- _context.Synchronizer.Enqueue(ESessionCode.Disconnect, null);
- }
- private void OnDisconnectError(Exception e)
- {
- _context.Synchronizer.Enqueue(ESessionCode.DisconnectError, e);
- }
- private void OnSendError(Exception e)
- {
- _status = SessionStatus.SEND_ERROR;
- _context.Synchronizer.Enqueue(ESessionCode.SendException, e);
- }
- private void OnReceiveError(Exception e)
- {
- _status = SessionStatus.RECV_ERROR;
- _context.Synchronizer.Enqueue(ESessionCode.RecvError, e);
- }
- private void OnRemoteClose()
- {
- _status = SessionStatus.CLOSED;
- _context.Synchronizer.Enqueue(ESessionCode.SessionClose, null);
- }
- #endregion
- }
- }
|