亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > C# > 正文

C#中一個高性能異步socket封裝庫的實現思路分享

2019-10-29 21:07:56
字體:
來源:轉載
供稿:網友

前言

socket是軟件之間通訊最常用的一種方式。c#實現socket通訊有很多中方法,其中效率最高就是異步通訊。

異步通訊實際是利用windows完成端口(IOCP)來處理的,關于完成端口實現原理,大家可以參考網上文章。

我這里想強調的是采用完成端口機制的異步通訊是windows下效率最高的通訊方式,沒有之一!

 

異步通訊比同步通訊處理要難很多,代碼編寫中會遇到許多“坑“。如果沒有經驗,很難完成。

我搜集了大量資料,完成了對異步socket的封裝。此庫已用穩定高效的運行幾個月。

 

縱觀網上的資料,我還沒有遇到一個滿意的封裝庫。許多文章把數據收發和協議處理雜糅在一塊,代碼非常難懂,也無法擴展。

在編寫該庫時,避免以上缺陷。將邏輯處理層次化,模塊化!同時實現了高可用性與高性能。

 

為了使大家對通訊效率有初步了解,先看測試圖。

C#,socket封裝庫

主機配置情況

C#,socket封裝庫

百兆帶寬基本占滿,cpu占用40%,我的電腦在空閑時,cpu占用大概20%,也就是說程序占用cpu 20%左右。

這個庫是可擴展的,就是說即使10萬個連接,收發同樣的數據,cpu占用基本相同。

 

庫的結構圖

C#,socket封裝庫

目標

即可作為服務端(監聽)也可以作為客戶端(主動連接)使用。

可以適應任何網絡協議。收發的數據針對字節流或一個完整的包。對協議內容不做處理。

高可用性。將復雜的底層處理封裝,對外接口非常友好。

高性能。最大限度優化處理。單機可支持數萬連接,收發速度可達幾百兆bit。

實現思路

網絡處理邏輯可以分為以下幾個部分:

網絡監聽 可以在多個端口實現監聽。負責生成socket,生成的socket供后續處理。監聽模塊功能比較單一,如有必要,可對監聽模塊做進一步優化。

主動連接 可以異步或同步的連接對方。連接成功后,對socket的后續處理,與監聽得到的socket完全一樣。注:無論是監聽得到的socket,還是連接得到的socket,后續處理完全一樣。

Socket收發處理 每個socket對應一個收發實例,socket收發只針對字節流處理。收發時,做了優化。比如發送時,對數據做了沾包,提高發送性能;接收時,一次投遞1K的數據。

組包處理 一般數據包都有包長度指示;比如 報頭的前倆個字節表示長度,根據這個值就可以組成一個完整的包。

NetListener 監聽

using System;using System.Net;using System.Net.Sockets;using System.Threading; namespace IocpCore{ class NetListener {  private Socket listenSocket;  public ListenParam _listenParam { get; set; }  public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;   bool start;   NetServer _netServer;  public NetListener(NetServer netServer)  {   _netServer = netServer;  }   public int _acceptAsyncCount = 0;  public bool StartListen()  {   try   {    start = true;    IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);    listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);    listenSocket.Bind(listenPoint);    listenSocket.Listen(200);     Thread thread1 = new Thread(new ThreadStart(NetProcess));    thread1.Start();        StartAccept();    return true;   }   catch (Exception ex)   {    NetLogger.Log(string.Format("**監聽異常!{0}", ex.Message));    return false;   }  }   AutoResetEvent _acceptEvent = new AutoResetEvent(false);  private void NetProcess()  {   while (start)   {    DealNewAccept();    _acceptEvent.WaitOne(1000 * 10);   }  }   private void DealNewAccept()  {   try   {    if(_acceptAsyncCount <= 10)    {     StartAccept();    }     while (true)    {     AsyncSocketClient client = _newSocketClientList.GetObj();     if (client == null)      break;      DealNewAccept(client);    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("DealNewAccept 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }   private void DealNewAccept(AsyncSocketClient client)  {   client.SendBufferByteCount = _netServer.SendBufferBytePerClient;   OnAcceptSocket?.Invoke(_listenParam, client);  }   private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)  {   try   {    Interlocked.Decrement(ref _acceptAsyncCount);    _acceptEvent.Set();    acceptEventArgs.Completed -= AcceptEventArg_Completed;    ProcessAccept(acceptEventArgs);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));   }  }   public bool StartAccept()  {   SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();   acceptEventArgs.Completed += AcceptEventArg_Completed;    bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);   Interlocked.Increment(ref _acceptAsyncCount);    if (!willRaiseEvent)   {    Interlocked.Decrement(ref _acceptAsyncCount);    _acceptEvent.Set();    acceptEventArgs.Completed -= AcceptEventArg_Completed;    ProcessAccept(acceptEventArgs);   }   return true;  }   ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();  private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)  {   try   {    using (acceptEventArgs)    {     if (acceptEventArgs.AcceptSocket != null)     {      AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);      client.CreateClientInfo(this);       _newSocketClientList.PutObj(client);      _acceptEvent.Set();     }    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));   }  } }}

NetConnectManage連接處理

using System;using System.Net;using System.Net.Sockets;namespace IocpCore{ class NetConnectManage {  public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   try   {    Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);    SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();    socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);    socketEventArgs.Completed += SocketConnect_Completed;    SocketClientInfo clientInfo = new SocketClientInfo();    socketEventArgs.UserToken = clientInfo;    clientInfo.PeerIp = peerIp;    clientInfo.PeerPort = peerPort;    clientInfo.Tag = tag;    bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);    if (!willRaiseEvent)    {     ProcessConnect(socketEventArgs);     socketEventArgs.Completed -= SocketConnect_Completed;     socketEventArgs.Dispose();    }    return true;   }   catch (Exception ex)   {    NetLogger.Log("ConnectAsyn",ex);    return false;   }  }  private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)  {   ProcessConnect(socketEventArgs);   socketEventArgs.Completed -= SocketConnect_Completed;   socketEventArgs.Dispose();  }  private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)  {   SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;   if (socketEventArgs.SocketError == SocketError.Success)   {    DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);   }   else   {    SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);    socketParam.ClientInfo = clientInfo;    OnSocketConnectEvent?.Invoke(socketParam, null);   }  }  void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)  {   clientInfo.SetClientInfo(socket);   AsyncSocketClient client = new AsyncSocketClient(socket);   client.SetClientInfo(clientInfo);   //觸發事件   SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);   socketParam.ClientInfo = clientInfo;   OnSocketConnectEvent?.Invoke(socketParam, client);  }  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   socket = null;   try   {    Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);    SocketClientInfo clientInfo = new SocketClientInfo();    clientInfo.PeerIp = peerIp;    clientInfo.PeerPort = peerPort;    clientInfo.Tag = tag;    EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);    socketTmp.Connect(remoteEP);    if (!socketTmp.Connected)     return false;    DealConnectSocket(socketTmp, clientInfo);    socket = socketTmp;    return true;   }   catch (Exception ex)   {    NetLogger.Log(string.Format("連接對方:({0}:{1})出錯!", peerIp, peerPort), ex);    return false;   }  } }}

AsyncSocketClient socket收發處理

using System;using System.Collections.Generic;using System.Diagnostics;using System.Net;using System.Net.Sockets;namespace IocpCore{ public class AsyncSocketClient {  public static int IocpReadLen = 1024;  public readonly Socket ConnectSocket;  protected SocketAsyncEventArgs m_receiveEventArgs;  public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }  protected byte[] m_asyncReceiveBuffer;  protected SocketAsyncEventArgs m_sendEventArgs;  public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }  protected byte[] m_asyncSendBuffer;  public event Action<AsyncSocketClient, byte[]> OnReadData;  public event Action<AsyncSocketClient, int> OnSendData;  public event Action<AsyncSocketClient> OnSocketClose;  static object releaseLock = new object();  public static int createCount = 0;  public static int releaseCount = 0;  ~AsyncSocketClient()  {   lock (releaseLock)   {    releaseCount++;   }  }  public AsyncSocketClient(Socket socket)  {   lock (releaseLock)   {    createCount++;   }   ConnectSocket = socket;   m_receiveEventArgs = new SocketAsyncEventArgs();   m_asyncReceiveBuffer = new byte[IocpReadLen];   m_receiveEventArgs.AcceptSocket = ConnectSocket;   m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;   m_sendEventArgs = new SocketAsyncEventArgs();   m_asyncSendBuffer = new byte[IocpReadLen * 2];   m_sendEventArgs.AcceptSocket = ConnectSocket;   m_sendEventArgs.Completed += SendEventArgs_Completed;  }  SocketClientInfo _clientInfo;  public SocketClientInfo ClientInfo  {   get   {    return _clientInfo;   }  }  internal void CreateClientInfo(NetListener netListener)  {   _clientInfo = new SocketClientInfo();   try   {    _clientInfo.Tag = netListener._listenParam._tag;    IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;    Debug.Assert(netListener._listenParam._port == ip.Port);    _clientInfo.LocalIp = ip.Address.ToString();    _clientInfo.LocalPort = netListener._listenParam._port;    ip = ConnectSocket.RemoteEndPoint as IPEndPoint;    _clientInfo.PeerIp = ip.Address.ToString();    _clientInfo.PeerPort = ip.Port;   }   catch (Exception ex)   {    NetLogger.Log("CreateClientInfo", ex);   }  }  internal void SetClientInfo(SocketClientInfo clientInfo)  {   _clientInfo = clientInfo;  }  #region read process  bool _inReadPending = false;  public EN_SocketReadResult ReadNextData()  {   lock (this)   {    if (_socketError)     return EN_SocketReadResult.ReadError;    if (_inReadPending)     return EN_SocketReadResult.InAsyn;    if(!ConnectSocket.Connected)    {     OnReadError();     return EN_SocketReadResult.ReadError;    }    try    {     m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);     _inReadPending = true;     bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投遞接收請求     if (!willRaiseEvent)     {      _inReadPending = false;      ProcessReceive();      if (_socketError)      {       OnReadError();       return EN_SocketReadResult.ReadError;      }      return EN_SocketReadResult.HaveRead;     }     else     {      return EN_SocketReadResult.InAsyn;     }    }    catch (Exception ex)    {     NetLogger.Log("ReadNextData", ex);     _inReadPending = false;     OnReadError();     return EN_SocketReadResult.ReadError;    }   }  }  private void ProcessReceive()  {   if (ReceiveEventArgs.BytesTransferred > 0    && ReceiveEventArgs.SocketError == SocketError.Success)   {    int offset = ReceiveEventArgs.Offset;    int count = ReceiveEventArgs.BytesTransferred;    byte[] readData = new byte[count];    Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);    _inReadPending = false;    if (!_socketError)     OnReadData?.Invoke(this, readData);   }   else   {    _inReadPending = false;    OnReadError();   }  }  private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)  {   lock (this)   {    _inReadPending = false;    ProcessReceive();    if (_socketError)    {     OnReadError();    }   }  }  bool _socketError = false;  private void OnReadError()  {   lock (this)   {    if (_socketError == false)    {     _socketError = true;     OnSocketClose?.Invoke(this);    }    CloseClient();   }  }  #endregion  #region send process  int _sendBufferByteCount = 102400;  public int SendBufferByteCount  {   get   {    return _sendBufferByteCount;   }   set   {    if (value < 1024)    {     _sendBufferByteCount = 1024;    }    else    {     _sendBufferByteCount = value;    }   }  }  SendBufferPool _sendDataPool = new SendBufferPool();  internal EN_SendDataResult PutSendData(byte[] data)  {   if (_socketError)    return EN_SendDataResult.no_client;   if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)   {    return EN_SendDataResult.buffer_overflow;   }   if (data.Length <= IocpReadLen)   {    _sendDataPool.PutObj(data);   }   else   {    List<byte[]> dataItems = SplitData(data, IocpReadLen);    foreach (byte[] item in dataItems)    {     _sendDataPool.PutObj(item);    }   }   return EN_SendDataResult.ok;  }  bool _inSendPending = false;  public EN_SocketSendResult SendNextData()  {   lock (this)   {    if (_socketError)    {     return EN_SocketSendResult.SendError;    }    if (_inSendPending)    {     return EN_SocketSendResult.InAsyn;    }    int sendByteCount = GetSendData();    if (sendByteCount == 0)    {     return EN_SocketSendResult.NoSendData;    }    //防止拋出異常,否則影響性能    if (!ConnectSocket.Connected)    {     OnSendError();     return EN_SocketSendResult.SendError;    }    try    {     m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);     _inSendPending = true;     bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);     if (!willRaiseEvent)     {      _inSendPending = false;      ProcessSend(m_sendEventArgs);      if (_socketError)      {       OnSendError();       return EN_SocketSendResult.SendError;      }      else      {       OnSendData?.Invoke(this, sendByteCount);       //繼續發下一條       return EN_SocketSendResult.HaveSend;      }     }     else     {      return EN_SocketSendResult.InAsyn;     }    }    catch (Exception ex)    {     NetLogger.Log("SendNextData", ex);     _inSendPending = false;     OnSendError();     return EN_SocketSendResult.SendError;    }   }  }  private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)  {   lock (this)   {    try    {     _inSendPending = false;     ProcessSend(m_sendEventArgs);     int sendCount = 0;     if (sendEventArgs.SocketError == SocketError.Success)     {      sendCount = sendEventArgs.BytesTransferred;     }     OnSendData?.Invoke(this, sendCount);     if (_socketError)     {      OnSendError();     }    }    catch (Exception ex)    {     NetLogger.Log("SendEventArgs_Completed", ex);    }   }  }  private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)  {   if (sendEventArgs.SocketError == SocketError.Success)   {    return true;   }   else   {    OnSendError();    return false;   }  }  private int GetSendData()  {   int dataLen = 0;   while (true)   {    byte[] data = _sendDataPool.GetObj();    if (data == null)     return dataLen;    Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);    dataLen += data.Length;    if (dataLen > IocpReadLen)     break;   }   return dataLen;  }  private void OnSendError()  {   lock (this)   {    if (_socketError == false)    {     _socketError = true;     OnSocketClose?.Invoke(this);    }    CloseClient();   }  }  #endregion  internal void CloseSocket()  {   try   {    ConnectSocket.Close();   }   catch (Exception ex)   {    NetLogger.Log("CloseSocket", ex);   }  }  static object socketCloseLock = new object();  public static int closeSendCount = 0;  public static int closeReadCount = 0;  bool _disposeSend = false;  void CloseSend()  {   if (!_disposeSend && !_inSendPending)   {    lock (socketCloseLock)     closeSendCount++;    _disposeSend = true;    m_sendEventArgs.SetBuffer(null, 0, 0);    m_sendEventArgs.Completed -= SendEventArgs_Completed;    m_sendEventArgs.Dispose();   }  }  bool _disposeRead = false;  void CloseRead()  {   if (!_disposeRead && !_inReadPending)   {    lock (socketCloseLock)     closeReadCount++;    _disposeRead = true;    m_receiveEventArgs.SetBuffer(null, 0, 0);    m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;    m_receiveEventArgs.Dispose();   }  }  private void CloseClient()  {   try   {    CloseSend();    CloseRead();    ConnectSocket.Close();   }   catch (Exception ex)   {    NetLogger.Log("CloseClient", ex);   }  }  //發送緩沖大小  private List<byte[]> SplitData(byte[] data, int maxLen)  {   List<byte[]> items = new List<byte[]>();   int start = 0;   while (true)   {    int itemLen = Math.Min(maxLen, data.Length - start);    if (itemLen == 0)     break;    byte[] item = new byte[itemLen];    Array.Copy(data, start, item, 0, itemLen);    items.Add(item);    start += itemLen;   }   return items;  } } public enum EN_SocketReadResult {  InAsyn,  HaveRead,  ReadError } public enum EN_SocketSendResult {  InAsyn,  HaveSend,  NoSendData,  SendError } class SendBufferPool {  ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();  public Int64 _bufferByteCount = 0;  public bool PutObj(byte[] obj)  {   if (_bufferPool.PutObj(obj))   {    lock (this)    {     _bufferByteCount += obj.Length;    }    return true;   }   else   {    return false;   }  }  public byte[] GetObj()  {   byte[] result = _bufferPool.GetObj();   if (result != null)   {    lock (this)    {     _bufferByteCount -= result.Length;    }   }   return result;  } }}

NetServer 聚合其他類

using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Net.Sockets;using System.Threading;namespace IocpCore{ public class NetServer {  public Action<SocketEventParam> OnSocketPacketEvent;  //每個連接發送緩沖大小  public int SendBufferBytePerClient { get; set; } = 1024 * 100;  bool _serverStart = false;  List<NetListener> _listListener = new List<NetListener>();  //負責對收到的字節流 組成完成的包  ClientPacketManage _clientPacketManage;  public Int64 SendByteCount { get; set; }  public Int64 ReadByteCount { get; set; }  List<ListenParam> _listListenPort = new List<ListenParam>();  public void AddListenPort(int port, object tag)  {   _listListenPort.Add(new ListenParam(port, tag));  }  /// <summary>  ///   /// </summary>  /// <param name="listenFault">監聽失敗的端口</param>  /// <returns></returns>  public bool StartListen(out List<int> listenFault)  {   _serverStart = true;   _clientPacketManage = new ClientPacketManage(this);   _clientPacketManage.OnSocketPacketEvent += PutClientPacket;   _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;   _listListener.Clear();   Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));   thread1.Start();   Thread thread2 = new Thread(new ThreadStart(NetSendProcess));   thread2.Start();   Thread thread3 = new Thread(new ThreadStart(NetReadProcess));   thread3.Start();   listenFault = new List<int>();   foreach (ListenParam param in _listListenPort)   {    NetListener listener = new NetListener(this);    listener._listenParam = param;    listener.OnAcceptSocket += Listener_OnAcceptSocket;    if (!listener.StartListen())    {     listenFault.Add(param._port);    }    else    {     _listListener.Add(listener);     NetLogger.Log(string.Format("監聽成功!端口:{0}", param._port));    }   }   return listenFault.Count == 0;  }  public void PutClientPacket(SocketEventParam param)  {   OnSocketPacketEvent?.Invoke(param);  }  //獲取包的最小長度  int _packetMinLen;  int _packetMaxLen;  public int PacketMinLen  {   get { return _packetMinLen; }  }  public int PacketMaxLen  {   get { return _packetMaxLen; }  }  /// <summary>  /// 設置包的最小和最大長度  /// 當minLen=0時,認為是接收字節流  /// </summary>  /// <param name="minLen"></param>  /// <param name="maxLen"></param>  public void SetPacketParam(int minLen, int maxLen)  {   Debug.Assert(minLen >= 0);   Debug.Assert(maxLen > minLen);   _packetMinLen = minLen;   _packetMaxLen = maxLen;  }  //獲取包的總長度  public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);  public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;  ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();  private void NetPacketProcess()  {   while (_serverStart)   {    try    {     DealEventPool();    }    catch (Exception ex)    {     NetLogger.Log(string.Format("DealEventPool 異常 {0}***{1}", ex.Message, ex.StackTrace));    }    _socketEventPool.WaitOne(1000);   }  }  Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();  public int ClientCount  {   get   {    lock (_clientGroup)    {     return _clientGroup.Count;    }   }  }  public List<Socket> ClientList  {   get   {    lock (_clientGroup)    {     return _clientGroup.Keys.ToList();    }   }  }  private void DealEventPool()  {   while (true)   {    SocketEventParam param = _socketEventPool.GetObj();    if (param == null)     return;    if (param.SocketEvent == EN_SocketEvent.close)    {     lock (_clientGroup)     {      _clientGroup.Remove(param.Socket);     }    }    if (_packetMinLen == 0)//字節流處理    {     OnSocketPacketEvent?.Invoke(param);    }    else    {     //組成一個完整的包 邏輯     _clientPacketManage.PutSocketParam(param);    }   }  }  private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)  {   try   {    if (param.Socket == null || client == null) //連接失敗    {         }    else    {     lock (_clientGroup)     {      bool remove = _clientGroup.Remove(client.ConnectSocket);      Debug.Assert(!remove);      _clientGroup.Add(client.ConnectSocket, client);     }     client.OnSocketClose += Client_OnSocketClose;     client.OnReadData += Client_OnReadData;     client.OnSendData += Client_OnSendData;     _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));    }    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("SocketConnectEvent 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)  {   try   {    lock (_clientGroup)    {     if (!_clientGroup.ContainsKey(socket))     {      Debug.Assert(false);      return;     }     NetLogger.Log(string.Format("報長度異常!包長:{0}", packetLen));     AsyncSocketClient client = _clientGroup[socket];     client.CloseSocket();    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("OnRcvPacketLenError 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  #region listen port  private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)  {   try   {    lock (_clientGroup)    {     bool remove = _clientGroup.Remove(client.ConnectSocket);     Debug.Assert(!remove);     _clientGroup.Add(client.ConnectSocket, client);    }    client.OnSocketClose += Client_OnSocketClose;    client.OnReadData += Client_OnReadData;    client.OnSendData += Client_OnSendData;    _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));    SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Listener_OnAcceptSocket 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();  private void NetSendProcess()  {   while (true)   {    DealSendEvent();    _listSendEvent.WaitOne(1000);   }  }  ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();  private void NetReadProcess()  {   while (true)   {    DealReadEvent();    _listReadEvent.WaitOne(1000);   }  }    private void DealSendEvent()  {   while (true)   {    SocketEventDeal item = _listSendEvent.GetObj();    if (item == null)     break;    switch (item.SocketEvent)    {     case EN_SocketDealEvent.send:      {       while (true)       {        EN_SocketSendResult result = item.Client.SendNextData();        if (result == EN_SocketSendResult.HaveSend)         continue;        else         break;       }      }      break;     case EN_SocketDealEvent.read:      {       Debug.Assert(false);      }      break;         }   }  }  private void DealReadEvent()  {   while (true)   {    SocketEventDeal item = _listReadEvent.GetObj();    if (item == null)     break;    switch (item.SocketEvent)    {     case EN_SocketDealEvent.read:      {       while (true)       {        EN_SocketReadResult result = item.Client.ReadNextData();        if (result == EN_SocketReadResult.HaveRead)         continue;        else         break;       }      }      break;     case EN_SocketDealEvent.send:      {       Debug.Assert(false);      }      break;    }   }  }  private void Client_OnReadData(AsyncSocketClient client, byte[] readData)  {   //讀下一條   _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));   try   {    SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    param.Data = readData;    _socketEventPool.PutObj(param);    lock (this)    {     ReadByteCount += readData.Length;    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Client_OnReadData 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }#endregion  private void Client_OnSendData(AsyncSocketClient client, int sendCount)  {   //發送下一條   _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));   lock (this)   {    SendByteCount += sendCount;   }  }  private void Client_OnSocketClose(AsyncSocketClient client)  {   try   {    SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Client_OnSocketClose 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  /// <summary>  /// 放到發送緩沖  /// </summary>  /// <param name="socket"></param>  /// <param name="data"></param>  /// <returns></returns>  public EN_SendDataResult SendData(Socket socket, byte[] data)  {   if (socket == null)    return EN_SendDataResult.no_client;   lock (_clientGroup)   {    if (!_clientGroup.ContainsKey(socket))     return EN_SendDataResult.no_client;    AsyncSocketClient client = _clientGroup[socket];    EN_SendDataResult result = client.PutSendData(data);    if (result == EN_SendDataResult.ok)    {     //發送下一條     _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));         }    return result;   }  }  /// <summary>  /// 設置某個連接的發送緩沖大小  /// </summary>  /// <param name="socket"></param>  /// <param name="byteCount"></param>  /// <returns></returns>  public bool SetClientSendBuffer(Socket socket, int byteCount)  {   lock (_clientGroup)   {    if (!_clientGroup.ContainsKey(socket))     return false;    AsyncSocketClient client = _clientGroup[socket];    client.SendBufferByteCount = byteCount;    return true;   }  }  #region connect process  NetConnectManage _netConnectManage = new NetConnectManage();  /// <summary>  /// 異步連接一個客戶端  /// </summary>  /// <param name="peerIp"></param>  /// <param name="peerPort"></param>  /// <param name="tag"></param>  /// <returns></returns>  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);  }  /// <summary>  /// 同步連接一個客戶端  /// </summary>  /// <param name="peerIp"></param>  /// <param name="peerPort"></param>  /// <param name="tag"></param>  /// <param name="socket"></param>  /// <returns></returns>  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);  }  #endregion } enum EN_SocketDealEvent {  read,  send, } class SocketEventDeal {  public AsyncSocketClient Client { get; set; }  public EN_SocketDealEvent SocketEvent { get; set; }  public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)  {   Client = client;   SocketEvent = socketEvent;  } }}

庫的使用

使用起來非常簡單,示例如下

using IocpCore;using System;using System.Collections.Generic;using System.Linq;using System.Net.Sockets;using System.Text;using System.Threading.Tasks;using System.Windows;namespace WarningClient{ public class SocketServer {  public Action<SocketEventParam> OnSocketEvent;  public Int64 SendByteCount  {   get   {    if (_netServer == null)     return 0;    return _netServer.SendByteCount;   }  }  public Int64 ReadByteCount  {   get   {    if (_netServer == null)     return 0;    return _netServer.ReadByteCount;   }  }  NetServer _netServer;  EN_PacketType _packetType = EN_PacketType.byteStream;  public void SetPacktType(EN_PacketType packetType)  {   _packetType = packetType;   if (_netServer == null)    return;   if (packetType == EN_PacketType.byteStream)   {    _netServer.SetPacketParam(0, 1024);   }   else   {    _netServer.SetPacketParam(9, 1024);   }  }  public bool Init(List<int> listenPort)  {   NetLogger.OnLogEvent += NetLogger_OnLogEvent;   _netServer = new NetServer();   SetPacktType(_packetType);   _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;   _netServer.OnSocketPacketEvent += SocketPacketDeal;   foreach (int n in listenPort)   {    _netServer.AddListenPort(n, n);   }   List<int> listenFault;   bool start = _netServer.StartListen(out listenFault);   return start;  }  int GetPacketTotalLen(byte[] data, int offset)  {   if (MainWindow._packetType == EN_PacketType.znss)    return GetPacketZnss(data, offset);   else    return GetPacketAnzhiyuan(data, offset);  }  int GetPacketAnzhiyuan(byte[] data, int offset)  {   int n = data[offset + 5] + 6;   return n;  }  int GetPacketZnss(byte[] data, int offset)  {   int packetLen = (int)(data[4]) + 5;   return packetLen;  }  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   return _netServer.ConnectAsyn(peerIp, peerPort, tag);  }  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   return _netServer.Connect(peerIp, peerPort, tag, out socket);  }  private void NetLogger_OnLogEvent(string message)  {   AppLog.Log(message);  }  Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();  public int ClientCount  {   get   {    lock (_clientGroup)    {     return _clientGroup.Count;    }   }  }  public List<Socket> ClientList  {   get   {    if (_netServer != null)     return _netServer.ClientList;    return new List<Socket>();   }  }  void AddClient(SocketEventParam socketParam)  {   lock (_clientGroup)   {    _clientGroup.Remove(socketParam.Socket);    _clientGroup.Add(socketParam.Socket, socketParam);   }  }  void RemoveClient(SocketEventParam socketParam)  {   lock (_clientGroup)   {    _clientGroup.Remove(socketParam.Socket);   }  }  ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();  public ObjectPool<SocketEventParam> ReadDataPool  {   get   {    return _readDataPool;   }  }  private void SocketPacketDeal(SocketEventParam socketParam)  {   OnSocketEvent?.Invoke(socketParam);   if (socketParam.SocketEvent == EN_SocketEvent.read)   {    if (MainWindow._isShowReadPacket)     _readDataPool.PutObj(socketParam);   }   else if (socketParam.SocketEvent == EN_SocketEvent.accept)   {    AddClient(socketParam);    string peerIp = socketParam.ClientInfo.PeerIpPort;    AppLog.Log(string.Format("客戶端鏈接!本地端口:{0},對端:{1}",     socketParam.ClientInfo.LocalPort, peerIp));   }   else if (socketParam.SocketEvent == EN_SocketEvent.connect)   {    string peerIp = socketParam.ClientInfo.PeerIpPort;    if (socketParam.Socket != null)    {     AddClient(socketParam);     AppLog.Log(string.Format("連接對端成功!本地端口:{0},對端:{1}",      socketParam.ClientInfo.LocalPort, peerIp));    }    else    {     AppLog.Log(string.Format("連接對端失敗!本地端口:{0},對端:{1}",      socketParam.ClientInfo.LocalPort, peerIp));    }   }   else if (socketParam.SocketEvent == EN_SocketEvent.close)   {    MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);    RemoveClient(socketParam);    string peerIp = socketParam.ClientInfo.PeerIpPort;    AppLog.Log(string.Format("客戶端斷開!本地端口:{0},對端:{1},",     socketParam.ClientInfo.LocalPort, peerIp));   }  }  public EN_SendDataResult SendData(Socket socket, byte[] data)  {   if(socket == null)   {    MessageBox.Show("還沒連接!");    return EN_SendDataResult.no_client;   }   return _netServer.SendData(socket, data);  }  internal void SendToAll(byte[] data)  {   lock (_clientGroup)   {    foreach (Socket socket in _clientGroup.Keys)    {     SendData(socket, data);    }   }  } }}

以上這篇C#中一個高性能異步socket封裝庫的實現思路分享就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持VEVB武林網。


注:相關教程知識閱讀請移步到c#教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
九九热视频这里只有精品| 亚洲精品狠狠操| 欧美在线播放视频| 欧美极品少妇与黑人| 久久777国产线看观看精品| 欧美老女人性视频| 日韩免费av一区二区| 亚洲日韩中文字幕| 久久精品国产91精品亚洲| 国产精品视频自拍| 2018日韩中文字幕| 国产精品人成电影在线观看| 欧美丝袜一区二区三区| 国内精久久久久久久久久人| 国产成人aa精品一区在线播放| 欧美另类99xxxxx| 欧美成人一二三| 成人亚洲欧美一区二区三区| 日韩亚洲国产中文字幕| 日韩视频在线观看免费| 久久久久久久一区二区| 亚洲欧美在线x视频| 欧美性做爰毛片| 2021久久精品国产99国产精品| 成人福利视频在线观看| 日韩欧美a级成人黄色| 亚洲偷熟乱区亚洲香蕉av| 国产精品v片在线观看不卡| 不卡av日日日| 欧美小视频在线| 欧美一区二区三区艳史| 懂色av影视一区二区三区| 岛国视频午夜一区免费在线观看| 中文字幕欧美日韩| 91夜夜揉人人捏人人添红杏| 26uuu亚洲国产精品| 亚洲伊人久久综合| 在线播放精品一区二区三区| 亚洲国产精品成人一区二区| 国产精品久久久精品| 欧美成人免费全部观看天天性色| 亚洲图片在线综合| 成人h视频在线观看播放| 日韩在线观看视频免费| 一本色道久久88综合亚洲精品ⅰ| 国产精品视频精品| 国产精品高清在线观看| 亚洲最大的成人网| 亚洲精品网址在线观看| 日韩精品中文字幕在线播放| 国产精品ⅴa在线观看h| 成人精品视频99在线观看免费| 91精品啪在线观看麻豆免费| 亚洲在线观看视频网站| 亚洲aa在线观看| 日韩在线观看免费高清完整版| 欧美精品久久久久久久久| 美女视频黄免费的亚洲男人天堂| 国产精品免费电影| 国产精品扒开腿做爽爽爽视频| 日韩高清av一区二区三区| 亚洲女同性videos| 欧美激情视频一区二区| 欧美综合在线观看| 欧美裸身视频免费观看| 精品久久久在线观看| 久久成人精品电影| 欧美黑人一区二区三区| 国产91精品久| 国产91亚洲精品| 亚洲色图av在线| 欧美精品18videosex性欧美| 中文国产成人精品| 色老头一区二区三区在线观看| 精品无人区太爽高潮在线播放| 久久综合免费视频影院| 亚洲一区二区免费| 亚洲午夜av电影| 亚洲qvod图片区电影| 欧美中在线观看| 欧美专区福利在线| 成人免费视频网址| 成人精品久久久| 在线观看免费高清视频97| 最新国产成人av网站网址麻豆| 超碰日本道色综合久久综合| 伊人一区二区三区久久精品| 欧美裸体xxxxx| 亚洲白拍色综合图区| 中文国产成人精品| www.亚洲免费视频| 亚洲精品一区av在线播放| 国产一区二区日韩| 欧美一区深夜视频| 日韩精品久久久久久福利| 欧美性猛交99久久久久99按摩| 欧美日韩在线视频观看| 国产美女91呻吟求| 97成人在线视频| 亚洲男人天堂手机在线| 欧美日本精品在线| 国产成人在线精品| 欧美黄色片免费观看| 国产一区二区三区丝袜| 亚洲精品99999| 久久久亚洲国产| 久久视频在线直播| 91久久久久久国产精品| 97在线观看视频国产| 欧美性猛交xxxx富婆| 久久天天躁日日躁| 欧美日韩亚洲高清| 日韩中文字幕在线播放| 亚洲一区二区三区在线视频| 日本一本a高清免费不卡| 日本高清不卡的在线| 国产成人a亚洲精品| 日韩精品在线观看一区| 91高清免费在线观看| 中文字幕久热精品在线视频| 日韩精品中文字| 亚洲精品电影网站| 丰满岳妇乱一区二区三区| 久久精品视频免费播放| 亚洲欧美成人一区二区在线电影| 欧美国产精品日韩| 久热99视频在线观看| 成人高清视频观看www| 欧美电影免费观看网站| 亚洲图片欧洲图片av| 国产日产欧美a一级在线| 亚洲xxxx妇黄裸体| 91视频国产一区| 亚洲另类激情图| 性色av一区二区三区在线观看| 欧美怡春院一区二区三区| 欧美在线日韩在线| 亚洲精品小视频在线观看| 日韩中文字幕在线播放| 亚洲美女av黄| 国色天香2019中文字幕在线观看| 亚洲无亚洲人成网站77777| 久久网福利资源网站| 亚洲成人激情在线观看| 亚洲男人的天堂在线播放| 色妞在线综合亚洲欧美| 91精品视频大全| 91精品久久久久久久久| 欧美激情视频一区二区三区不卡| 一区二区日韩精品| 欧美日韩国产成人高清视频| 91青草视频久久| 久久久久久久影视| 久久久999精品视频| 色悠久久久久综合先锋影音下载| 激情久久av一区av二区av三区| 国产999精品久久久| 岛国视频午夜一区免费在线观看| 国产精品女主播视频| 国产精品成人v| 亚洲аv电影天堂网| 国产精品入口日韩视频大尺度| 中文字幕欧美日韩va免费视频| 裸体女人亚洲精品一区|