前言
socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。
异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。
我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!
异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。
我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。
纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。
在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。
为了使大家对通讯效率有初步了解,先看测试图。
百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。
这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。
网络处理逻辑可以分为以下几个部分:
- using System;
- using System.Net;
- using System.Net.Sockets;
- using System.Threading;
- namespace IocpCore
- {
- class NetListener
- {
- private Socket listenSocket;
- public ListenParam _listenParam { get; set; }
- public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;
- bool start;
- NetServer _netServer;
- public NetListener(NetServer netServer)
- {
- _netServer = netServer;
- }
- public int _acceptAsyncCount = 0;
- public bool StartListen()
- {
- try
- {
- start = true;
- IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
- listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- listenSocket.Bind(listenPoint);
- listenSocket.Listen(200);
- Thread thread1 = new Thread(new ThreadStart(NetProcess));
- thread1.Start();
- StartAccept();
- return true;
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("**监听异常!{0}", ex.Message));
- return false;
- }
- }
- AutoResetEvent _acceptEvent = new AutoResetEvent(false);
- private void NetProcess()
- {
- while (start)
- {
- DealNewAccept();
- _acceptEvent.WaitOne(1000 * 10);
- }
- }
- private void DealNewAccept()
- {
- try
- {
- if(_acceptAsyncCount <= 10)
- {
- StartAccept();
- }
- while (true)
- {
- AsyncSocketClient client = _newSocketClientList.GetObj();
- if (client == null)
- break;
- DealNewAccept(client);
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- private void DealNewAccept(AsyncSocketClient client)
- {
- client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
- OnAcceptSocket?.Invoke(_listenParam, client);
- }
- private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
- {
- try
- {
- Interlocked.Decrement(ref _acceptAsyncCount);
- _acceptEvent.Set();
- acceptEventArgs.Completed -= AcceptEventArg_Completed;
- ProcessAccept(acceptEventArgs);
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- public bool StartAccept()
- {
- SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();
- acceptEventArgs.Completed += AcceptEventArg_Completed;
- bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
- Interlocked.Increment(ref _acceptAsyncCount);
- if (!willRaiseEvent)
- {
- Interlocked.Decrement(ref _acceptAsyncCount);
- _acceptEvent.Set();
- acceptEventArgs.Completed -= AcceptEventArg_Completed;
- ProcessAccept(acceptEventArgs);
- }
- return true;
- }
- ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
- private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
- {
- try
- {
- using (acceptEventArgs)
- {
- if (acceptEventArgs.AcceptSocket != null)
- {
- AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
- client.CreateClientInfo(this);
- _newSocketClientList.PutObj(client);
- _acceptEvent.Set();
- }
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- }
- }
- using System;
- using System.Net;
- using System.Net.Sockets;
- namespace IocpCore
- {
- class NetConnectManage
- {
- public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;
- public bool ConnectAsyn(string peerIp, int peerPort, object tag)
- {
- try
- {
- Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
- SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
- socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
- socketEventArgs.Completed += SocketConnect_Completed;
- SocketClientInfo clientInfo = new SocketClientInfo();
- socketEventArgs.UserToken = clientInfo;
- clientInfo.PeerIp = peerIp;
- clientInfo.PeerPort = peerPort;
- clientInfo.Tag = tag;
- bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
- if (!willRaiseEvent)
- {
- ProcessConnect(socketEventArgs);
- socketEventArgs.Completed -= SocketConnect_Completed;
- socketEventArgs.Dispose();
- }
- return true;
- }
- catch (Exception ex)
- {
- NetLogger.Log("ConnectAsyn",ex);
- return false;
- }
- }
- private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
- {
- ProcessConnect(socketEventArgs);
- socketEventArgs.Completed -= SocketConnect_Completed;
- socketEventArgs.Dispose();
- }
- private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
- {
- SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
- if (socketEventArgs.SocketError == SocketError.Success)
- {
- DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
- }
- else
- {
- SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
- socketParam.ClientInfo = clientInfo;
- OnSocketConnectEvent?.Invoke(socketParam, null);
- }
- }
- void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
- {
- clientInfo.SetClientInfo(socket);
- AsyncSocketClient client = new AsyncSocketClient(socket);
- client.SetClientInfo(clientInfo);
- //触发事件
- SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
- socketParam.ClientInfo = clientInfo;
- OnSocketConnectEvent?.Invoke(socketParam, client);
- }
- public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
- {
- socket = null;
- try
- {
- Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);
- SocketClientInfo clientInfo = new SocketClientInfo();
- clientInfo.PeerIp = peerIp;
- clientInfo.PeerPort = peerPort;
- clientInfo.Tag = tag;
- EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
- socketTmp.Connect(remoteEP);
- if (!socketTmp.Connected)
- return false;
- DealConnectSocket(socketTmp, clientInfo);
- socket = socketTmp;
- return true;
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex);
- return false;
- }
- }
- }
- }
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Net;
- using System.Net.Sockets;
- namespace IocpCore
- {
- public class AsyncSocketClient
- {
- public static int IocpReadLen = 1024;
- public readonly Socket ConnectSocket;
- protected SocketAsyncEventArgs m_receiveEventArgs;
- public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
- protected byte[] m_asyncReceiveBuffer;
- protected SocketAsyncEventArgs m_sendEventArgs;
- public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
- protected byte[] m_asyncSendBuffer;
- public event Action<AsyncSocketClient, byte[]> OnReadData;
- public event Action<AsyncSocketClient, int> OnSendData;
- public event Action<AsyncSocketClient> OnSocketClose;
- static object releaseLock = new object();
- public static int createCount = 0;
- public static int releaseCount = 0;
- ~AsyncSocketClient()
- {
- lock (releaseLock)
- {
- releaseCount++;
- }
- }
- public AsyncSocketClient(Socket socket)
- {
- lock (releaseLock)
- {
- createCount++;
- }
- ConnectSocket = socket;
- m_receiveEventArgs = new SocketAsyncEventArgs();
- m_asyncReceiveBuffer = new byte[IocpReadLen];
- m_receiveEventArgs.AcceptSocket = ConnectSocket;
- m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;
- m_sendEventArgs = new SocketAsyncEventArgs();
- m_asyncSendBuffer = new byte[IocpReadLen * 2];
- m_sendEventArgs.AcceptSocket = ConnectSocket;
- m_sendEventArgs.Completed += SendEventArgs_Completed;
- }
- SocketClientInfo _clientInfo;
- public SocketClientInfo ClientInfo
- {
- get
- {
- return _clientInfo;
- }
- }
- internal void CreateClientInfo(NetListener netListener)
- {
- _clientInfo = new SocketClientInfo();
- try
- {
- _clientInfo.Tag = netListener._listenParam._tag;
- IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
- Debug.Assert(netListener._listenParam._port == ip.Port);
- _clientInfo.LocalIp = ip.Address.ToString();
- _clientInfo.LocalPort = netListener._listenParam._port;
- ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
- _clientInfo.PeerIp = ip.Address.ToString();
- _clientInfo.PeerPort = ip.Port;
- }
- catch (Exception ex)
- {
- NetLogger.Log("CreateClientInfo", ex);
- }
- }
- internal void SetClientInfo(SocketClientInfo clientInfo)
- {
- _clientInfo = clientInfo;
- }
- #region read process
- bool _inReadPending = false;
- public EN_SocketReadResult ReadNextData()
- {
- lock (this)
- {
- if (_socketError)
- return EN_SocketReadResult.ReadError;
- if (_inReadPending)
- return EN_SocketReadResult.InAsyn;
- if(!ConnectSocket.Connected)
- {
- OnReadError();
- return EN_SocketReadResult.ReadError;
- }
- try
- {
- m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
- _inReadPending = true;
- bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求
- if (!willRaiseEvent)
- {
- _inReadPending = false;
- ProcessReceive();
- if (_socketError)
- {
- OnReadError();
- return EN_SocketReadResult.ReadError;
- }
- return EN_SocketReadResult.HaveRead;
- }
- else
- {
- return EN_SocketReadResult.InAsyn;
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log("ReadNextData", ex);
- _inReadPending = false;
- OnReadError();
- return EN_SocketReadResult.ReadError;
- }
- }
- }
- private void ProcessReceive()
- {
- if (ReceiveEventArgs.BytesTransferred > 0
- && ReceiveEventArgs.SocketError == SocketError.Success)
- {
- int offset = ReceiveEventArgs.Offset;
- int count = ReceiveEventArgs.BytesTransferred;
- byte[] readData = new byte[count];
- Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);
- _inReadPending = false;
- if (!_socketError)
- OnReadData?.Invoke(this, readData);
- }
- else
- {
- _inReadPending = false;
- OnReadError();
- }
- }
- private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
- {
- lock (this)
- {
- _inReadPending = false;
- ProcessReceive();
- if (_socketError)
- {
- OnReadError();
- }
- }
- }
- bool _socketError = false;
- private void OnReadError()
- {
- lock (this)
- {
- if (_socketError == false)
- {
- _socketError = true;
- OnSocketClose?.Invoke(this);
- }
- CloseClient();
- }
- }
- #endregion
- #region send process
- int _sendBufferByteCount = 102400;
- public int SendBufferByteCount
- {
- get
- {
- return _sendBufferByteCount;
- }
- set
- {
- if (value < 1024)
- {
- _sendBufferByteCount = 1024;
- }
- else
- {
- _sendBufferByteCount = value;
- }
- }
- }
- SendBufferPool _sendDataPool = new SendBufferPool();
- internal EN_SendDataResult PutSendData(byte[] data)
- {
- if (_socketError)
- return EN_SendDataResult.no_client;
- if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)
- {
- return EN_SendDataResult.buffer_overflow;
- }
- if (data.Length <= IocpReadLen)
- {
- _sendDataPool.PutObj(data);
- }
- else
- {
- List<byte[]> dataItems = SplitData(data, IocpReadLen);
- foreach (byte[] item in dataItems)
- {
- _sendDataPool.PutObj(item);
- }
- }
- return EN_SendDataResult.ok;
- }
- bool _inSendPending = false;
- public EN_SocketSendResult SendNextData()
- {
- lock (this)
- {
- if (_socketError)
- {
- return EN_SocketSendResult.SendError;
- }
- if (_inSendPending)
- {
- return EN_SocketSendResult.InAsyn;
- }
- int sendByteCount = GetSendData();
- if (sendByteCount == 0)
- {
- return EN_SocketSendResult.NoSendData;
- }
- //防止抛出异常,否则影响性能
- if (!ConnectSocket.Connected)
- {
- OnSendError();
- return EN_SocketSendResult.SendError;
- }
- try
- {
- m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);
- _inSendPending = true;
- bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);
- if (!willRaiseEvent)
- {
- _inSendPending = false;
- ProcessSend(m_sendEventArgs);
- if (_socketError)
- {
- OnSendError();
- return EN_SocketSendResult.SendError;
- }
- else
- {
- OnSendData?.Invoke(this, sendByteCount);
- //继续发下一条
- return EN_SocketSendResult.HaveSend;
- }
- }
- else
- {
- return EN_SocketSendResult.InAsyn;
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log("SendNextData", ex);
- _inSendPending = false;
- OnSendError();
- return EN_SocketSendResult.SendError;
- }
- }
- }
- private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
- {
- lock (this)
- {
- try
- {
- _inSendPending = false;
- ProcessSend(m_sendEventArgs);
- int sendCount = 0;
- if (sendEventArgs.SocketError == SocketError.Success)
- {
- sendCount = sendEventArgs.BytesTransferred;
- }
- OnSendData?.Invoke(this, sendCount);
- if (_socketError)
- {
- OnSendError();
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log("SendEventArgs_Completed", ex);
- }
- }
- }
- private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
- {
- if (sendEventArgs.SocketError == SocketError.Success)
- {
- return true;
- }
- else
- {
- OnSendError();
- return false;
- }
- }
- private int GetSendData()
- {
- int dataLen = 0;
- while (true)
- {
- byte[] data = _sendDataPool.GetObj();
- if (data == null)
- return dataLen;
- Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
- dataLen += data.Length;
- if (dataLen > IocpReadLen)
- break;
- }
- return dataLen;
- }
- private void OnSendError()
- {
- lock (this)
- {
- if (_socketError == false)
- {
- _socketError = true;
- OnSocketClose?.Invoke(this);
- }
- CloseClient();
- }
- }
- #endregion
- internal void CloseSocket()
- {
- try
- {
- ConnectSocket.Close();
- }
- catch (Exception ex)
- {
- NetLogger.Log("CloseSocket", ex);
- }
- }
- static object socketCloseLock = new object();
- public static int closeSendCount = 0;
- public static int closeReadCount = 0;
- bool _disposeSend = false;
- void CloseSend()
- {
- if (!_disposeSend && !_inSendPending)
- {
- lock (socketCloseLock)
- closeSendCount++;
- _disposeSend = true;
- m_sendEventArgs.SetBuffer(null, 0, 0);
- m_sendEventArgs.Completed -= SendEventArgs_Completed;
- m_sendEventArgs.Dispose();
- }
- }
- bool _disposeRead = false;
- void CloseRead()
- {
- if (!_disposeRead && !_inReadPending)
- {
- lock (socketCloseLock)
- closeReadCount++;
- _disposeRead = true;
- m_receiveEventArgs.SetBuffer(null, 0, 0);
- m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
- m_receiveEventArgs.Dispose();
- }
- }
- private void CloseClient()
- {
- try
- {
- CloseSend();
- CloseRead();
- ConnectSocket.Close();
- }
- catch (Exception ex)
- {
- NetLogger.Log("CloseClient", ex);
- }
- }
- //发送缓冲大小
- private List<byte[]> SplitData(byte[] data, int maxLen)
- {
- List<byte[]> items = new List<byte[]>();
- int start = 0;
- while (true)
- {
- int itemLen = Math.Min(maxLen, data.Length - start);
- if (itemLen == 0)
- break;
- byte[] item = new byte[itemLen];
- Array.Copy(data, start, item, 0, itemLen);
- items.Add(item);
- start += itemLen;
- }
- return items;
- }
- }
- public enum EN_SocketReadResult
- {
- InAsyn,
- HaveRead,
- ReadError
- }
- public enum EN_SocketSendResult
- {
- InAsyn,
- HaveSend,
- NoSendData,
- SendError
- }
- class SendBufferPool
- {
- ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();
- public Int64 _bufferByteCount = 0;
- public bool PutObj(byte[] obj)
- {
- if (_bufferPool.PutObj(obj))
- {
- lock (this)
- {
- _bufferByteCount += obj.Length;
- }
- return true;
- }
- else
- {
- return false;
- }
- }
- public byte[] GetObj()
- {
- byte[] result = _bufferPool.GetObj();
- if (result != null)
- {
- lock (this)
- {
- _bufferByteCount -= result.Length;
- }
- }
- return result;
- }
- }
- }
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Net.Sockets;
- using System.Threading;
- namespace IocpCore
- {
- public class NetServer
- {
- public Action<SocketEventParam> OnSocketPacketEvent;
- //每个连接发送缓冲大小
- public int SendBufferBytePerClient { get; set; } = 1024 * 100;
- bool _serverStart = false;
- List<NetListener> _listListener = new List<NetListener>();
- //负责对收到的字节流 组成完成的包
- ClientPacketManage _clientPacketManage;
- public Int64 SendByteCount { get; set; }
- public Int64 ReadByteCount { get; set; }
- List<ListenParam> _listListenPort = new List<ListenParam>();
- public void AddListenPort(int port, object tag)
- {
- _listListenPort.Add(new ListenParam(port, tag));
- }
- /// <summary>
- ///
- /// </summary>
- /// <param name="listenFault">监听失败的端口</param>
- /// <returns></returns>
- public bool StartListen(out List<int> listenFault)
- {
- _serverStart = true;
- _clientPacketManage = new ClientPacketManage(this);
- _clientPacketManage.OnSocketPacketEvent += PutClientPacket;
- _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;
- _listListener.Clear();
- Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
- thread1.Start();
- Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
- thread2.Start();
- Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
- thread3.Start();
- listenFault = new List<int>();
- foreach (ListenParam param in _listListenPort)
- {
- NetListener listener = new NetListener(this);
- listener._listenParam = param;
- listener.OnAcceptSocket += Listener_OnAcceptSocket;
- if (!listener.StartListen())
- {
- listenFault.Add(param._port);
- }
- else
- {
- _listListener.Add(listener);
- NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
- }
- }
- return listenFault.Count == 0;
- }
- public void PutClientPacket(SocketEventParam param)
- {
- OnSocketPacketEvent?.Invoke(param);
- }
- //获取包的最小长度
- int _packetMinLen;
- int _packetMaxLen;
- public int PacketMinLen
- {
- get { return _packetMinLen; }
- }
- public int PacketMaxLen
- {
- get { return _packetMaxLen; }
- }
- /// <summary>
- /// 设置包的最小和最大长度
- /// 当minLen=0时,认为是接收字节流
- /// </summary>
- /// <param name="minLen"></param>
- /// <param name="maxLen"></param>
- public void SetPacketParam(int minLen, int maxLen)
- {
- Debug.Assert(minLen >= 0);
- Debug.Assert(maxLen > minLen);
- _packetMinLen = minLen;
- _packetMaxLen = maxLen;
- }
- //获取包的总长度
- public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
- public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;
- ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
- private void NetPacketProcess()
- {
- while (_serverStart)
- {
- try
- {
- DealEventPool();
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- _socketEventPool.WaitOne(1000);
- }
- }
- Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
- public int ClientCount
- {
- get
- {
- lock (_clientGroup)
- {
- return _clientGroup.Count;
- }
- }
- }
- public List<Socket> ClientList
- {
- get
- {
- lock (_clientGroup)
- {
- return _clientGroup.Keys.ToList();
- }
- }
- }
- private void DealEventPool()
- {
- while (true)
- {
- SocketEventParam param = _socketEventPool.GetObj();
- if (param == null)
- return;
- if (param.SocketEvent == EN_SocketEvent.close)
- {
- lock (_clientGroup)
- {
- _clientGroup.Remove(param.Socket);
- }
- }
- if (_packetMinLen == 0)//字节流处理
- {
- OnSocketPacketEvent?.Invoke(param);
- }
- else
- {
- //组成一个完整的包 逻辑
- _clientPacketManage.PutSocketParam(param);
- }
- }
- }
- private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
- {
- try
- {
- if (param.Socket == null || client == null) //连接失败
- {
- }
- else
- {
- lock (_clientGroup)
- {
- bool remove = _clientGroup.Remove(client.ConnectSocket);
- Debug.Assert(!remove);
- _clientGroup.Add(client.ConnectSocket, client);
- }
- client.OnSocketClose += Client_OnSocketClose;
- client.OnReadData += Client_OnReadData;
- client.OnSendData += Client_OnSendData;
- _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
- }
- _socketEventPool.PutObj(param);
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
- {
- try
- {
- lock (_clientGroup)
- {
- if (!_clientGroup.ContainsKey(socket))
- {
- Debug.Assert(false);
- return;
- }
- NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
- AsyncSocketClient client = _clientGroup[socket];
- client.CloseSocket();
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- #region listen port
- private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
- {
- try
- {
- lock (_clientGroup)
- {
- bool remove = _clientGroup.Remove(client.ConnectSocket);
- Debug.Assert(!remove);
- _clientGroup.Add(client.ConnectSocket, client);
- }
- client.OnSocketClose += Client_OnSocketClose;
- client.OnReadData += Client_OnReadData;
- client.OnSendData += Client_OnSendData;
- _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
- SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
- param.ClientInfo = client.ClientInfo;
- _socketEventPool.PutObj(param);
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
- private void NetSendProcess()
- {
- while (true)
- {
- DealSendEvent();
- _listSendEvent.WaitOne(1000);
- }
- }
- ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
- private void NetReadProcess()
- {
- while (true)
- {
- DealReadEvent();
- _listReadEvent.WaitOne(1000);
- }
- }
- private void DealSendEvent()
- {
- while (true)
- {
- SocketEventDeal item = _listSendEvent.GetObj();
- if (item == null)
- break;
- switch (item.SocketEvent)
- {
- case EN_SocketDealEvent.send:
- {
- while (true)
- {
- EN_SocketSendResult result = item.Client.SendNextData();
- if (result == EN_SocketSendResult.HaveSend)
- continue;
- else
- break;
- }
- }
- break;
- case EN_SocketDealEvent.read:
- {
- Debug.Assert(false);
- }
- break;
- }
- }
- }
- private void DealReadEvent()
- {
- while (true)
- {
- SocketEventDeal item = _listReadEvent.GetObj();
- if (item == null)
- break;
- switch (item.SocketEvent)
- {
- case EN_SocketDealEvent.read:
- {
- while (true)
- {
- EN_SocketReadResult result = item.Client.ReadNextData();
- if (result == EN_SocketReadResult.HaveRead)
- continue;
- else
- break;
- }
- }
- break;
- case EN_SocketDealEvent.send:
- {
- Debug.Assert(false);
- }
- break;
- }
- }
- }
- private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
- {
- //读下一条
- _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
- try
- {
- SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
- param.ClientInfo = client.ClientInfo;
- param.Data = readData;
- _socketEventPool.PutObj(param);
- lock (this)
- {
- ReadByteCount += readData.Length;
- }
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- #endregion
- private void Client_OnSendData(AsyncSocketClient client, int sendCount)
- {
- //发送下一条
- _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
- lock (this)
- {
- SendByteCount += sendCount;
- }
- }
- private void Client_OnSocketClose(AsyncSocketClient client)
- {
- try
- {
- SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
- param.ClientInfo = client.ClientInfo;
- _socketEventPool.PutObj(param);
- }
- catch (Exception ex)
- {
- NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
- }
- }
- /// <summary>
- /// 放到发送缓冲
- /// </summary>
- /// <param name="socket"></param>
- /// <param name="data"></param>
- /// <returns></returns>
- public EN_SendDataResult SendData(Socket socket, byte[] data)
- {
- if (socket == null)
- return EN_SendDataResult.no_client;
- lock (_clientGroup)
- {
- if (!_clientGroup.ContainsKey(socket))
- return EN_SendDataResult.no_client;
- AsyncSocketClient client = _clientGroup[socket];
- EN_SendDataResult result = client.PutSendData(data);
- if (result == EN_SendDataResult.ok)
- {
- //发送下一条
- _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
- }
- return result;
- }
- }
- /// <summary>
- /// 设置某个连接的发送缓冲大小
- /// </summary>
- /// <param name="socket"></param>
- /// <param name="byteCount"></param>
- /// <returns></returns>
- public bool SetClientSendBuffer(Socket socket, int byteCount)
- {
- lock (_clientGroup)
- {
- if (!_clientGroup.ContainsKey(socket))
- return false;
- AsyncSocketClient client = _clientGroup[socket];
- client.SendBufferByteCount = byteCount;
- return true;
- }
- }
- #region connect process
- NetConnectManage _netConnectManage = new NetConnectManage();
- /// <summary>
- /// 异步连接一个客户端
- /// </summary>
- /// <param name="peerIp"></param>
- /// <param name="peerPort"></param>
- /// <param name="tag"></param>
- /// <returns></returns>
- public bool ConnectAsyn(string peerIp, int peerPort, object tag)
- {
- return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
- }
- /// <summary>
- /// 同步连接一个客户端
- /// </summary>
- /// <param name="peerIp"></param>
- /// <param name="peerPort"></param>
- /// <param name="tag"></param>
- /// <param name="socket"></param>
- /// <returns></returns>
- public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
- {
- return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
- }
- #endregion
- }
- enum EN_SocketDealEvent
- {
- read,
- send,
- }
- class SocketEventDeal
- {
- public AsyncSocketClient Client { get; set; }
- public EN_SocketDealEvent SocketEvent { get; set; }
- public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
- {
- Client = client;
- SocketEvent = socketEvent;
- }
- }
- }
使用起来非常简单,示例如下
- using IocpCore;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net.Sockets;
- using System.Text;
- using System.Threading.Tasks;
- using System.Windows;
- namespace WarningClient
- {
- public class SocketServer
- {
- public Action<SocketEventParam> OnSocketEvent;
- public Int64 SendByteCount
- {
- get
- {
- if (_netServer == null)
- return 0;
- return _netServer.SendByteCount;
- }
- }
- public Int64 ReadByteCount
- {
- get
- {
- if (_netServer == null)
- return 0;
- return _netServer.ReadByteCount;
- }
- }
- NetServer _netServer;
- EN_PacketType _packetType = EN_PacketType.byteStream;
- public void SetPacktType(EN_PacketType packetType)
- {
- _packetType = packetType;
- if (_netServer == null)
- return;
- if (packetType == EN_PacketType.byteStream)
- {
- _netServer.SetPacketParam(0, 1024);
- }
- else
- {
- _netServer.SetPacketParam(9, 1024);
- }
- }
- public bool Init(List<int> listenPort)
- {
- NetLogger.OnLogEvent += NetLogger_OnLogEvent;
- _netServer = new NetServer();
- SetPacktType(_packetType);
- _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
- _netServer.OnSocketPacketEvent += SocketPacketDeal;
- foreach (int n in listenPort)
- {
- _netServer.AddListenPort(n, n);
- }
- List<int> listenFault;
- bool start = _netServer.StartListen(out listenFault);
- return start;
- }
- int GetPacketTotalLen(byte[] data, int offset)
- {
- if (MainWindow._packetType == EN_PacketType.znss)
- return GetPacketZnss(data, offset);
- else
- return GetPacketAnzhiyuan(data, offset);
- }
- int GetPacketAnzhiyuan(byte[] data, int offset)
- {
- int n = data[offset + 5] + 6;
- return n;
- }
- int GetPacketZnss(byte[] data, int offset)
- {
- int packetLen = (int)(data[4]) + 5;
- return packetLen;
- }
- public bool ConnectAsyn(string peerIp, int peerPort, object tag)
- {
- return _netServer.ConnectAsyn(peerIp, peerPort, tag);
- }
- public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
- {
- return _netServer.Connect(peerIp, peerPort, tag, out socket);
- }
- private void NetLogger_OnLogEvent(string message)
- {
- AppLog.Log(message);
- }
- Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();
- public int ClientCount
- {
- get
- {
- lock (_clientGroup)
- {
- return _clientGroup.Count;
- }
- }
- }
- public List<Socket> ClientList
- {
- get
- {
- if (_netServer != null)
- return _netServer.ClientList;
- return new List<Socket>();
- }
- }
- void AddClient(SocketEventParam socketParam)
- {
- lock (_clientGroup)
- {
- _clientGroup.Remove(socketParam.Socket);
- _clientGroup.Add(socketParam.Socket, socketParam);
- }
- }
- void RemoveClient(SocketEventParam socketParam)
- {
- lock (_clientGroup)
- {
- _clientGroup.Remove(socketParam.Socket);
- }
- }
- ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();
- public ObjectPool<SocketEventParam> ReadDataPool
- {
- get
- {
- return _readDataPool;
- }
- }
- private void SocketPacketDeal(SocketEventParam socketParam)
- {
- OnSocketEvent?.Invoke(socketParam);
- if (socketParam.SocketEvent == EN_SocketEvent.read)
- {
- if (MainWindow._isShowReadPacket)
- _readDataPool.PutObj(socketParam);
- }
- else if (socketParam.SocketEvent == EN_SocketEvent.accept)
- {
- AddClient(socketParam);
- string peerIp = socketParam.ClientInfo.PeerIpPort;
- AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}",
- socketParam.ClientInfo.LocalPort, peerIp));
- }
- else if (socketParam.SocketEvent == EN_SocketEvent.connect)
- {
- string peerIp = socketParam.ClientInfo.PeerIpPort;
- if (socketParam.Socket != null)
- {
- AddClient(socketParam);
- AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}",
- socketParam.ClientInfo.LocalPort, peerIp));
- }
- else
- {
- AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}",
- socketParam.ClientInfo.LocalPort, peerIp));
- }
- }
- else if (socketParam.SocketEvent == EN_SocketEvent.close)
- {
- MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
- RemoveClient(socketParam);
- string peerIp = socketParam.ClientInfo.PeerIpPort;
- AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},",
- socketParam.ClientInfo.LocalPort, peerIp));
- }
- }
- public EN_SendDataResult SendData(Socket socket, byte[] data)
- {
- if(socket == null)
- {
- MessageBox.Show("还没连接!");
- return EN_SendDataResult.no_client;
- }
- return _netServer.SendData(socket, data);
- }
- internal void SendToAll(byte[] data)
- {
- lock (_clientGroup)
- {
- foreach (Socket socket in _clientGroup.Keys)
- {
- SendData(socket, data);
- }
- }
- }
- }
- }
来源: http://www.cnblogs.com/yuanchenhui/p/asyn_scoket.html