1、開始正常監聽以后,就要開始接受數據了,整體流程圖如下:
2、上一節看到我們在程序初始化的時候,初始化了很多個SocketConnection,用于管理客戶端的鏈接,那應用層如何來操作,又什么時候來接受數據?于是我們便有了Socketsession,用于給應用層來管理整個會話過程,代碼如下:
public class SocketSession : IDisposable { public string SessionId { get; PRivate set; } private System.Net.Sockets.Socket _connectSocket; private iprotocol _protocol; private SocketConnection _connect; public SocketConnection Connection { get { return _connect; } } private MemoryStream _memStream; private delegate void ReceiveDataHandler(SocketAsyncEventArgs e); private ReceiveDataHandler ReceiveHandler; private delegate void ReceiveReadPackageHandler(byte[] b, int offset, SocketAsyncEventArgs e); private ReceiveReadPackageHandler ReadPackageHandler; public System.Net.Sockets.Socket ConnectSocket { get { return _connectSocket; } private set { } } public SocketSession(string sessionId) { this.SessionId = sessionId; } public SocketSession(System.Net.Sockets.Socket client, SocketConnection connect) : this(Guid.NewGuid().ToString()) { this._connectSocket = client; this._connect = connect; this._protocol = connect.Pool.AppServer.AppProtocol; _memStream = new MemoryStream(); ReceiveHandler = ReceiveData; ReadPackageHandler = this.ReadPackage; } internal void ReceiveData(SocketAsyncEventArgs e) { if (e.SocketError != SocketError.Success) { this.Close(); return; } if (e.BytesTransferred <= 0) { this.Close(); return; } try { if (this.Connection.Flag == SocketFlag.Busy) { byte[] buffer = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, 0, buffer, 0, e.BytesTransferred); ReadPackage(buffer, 0, e); buffer = null; } } catch (Exception ex) { this.Close(); return; } } internal void ReceiveAsync(SocketAsyncEventArgs e) { if (e == null) { return; } bool isCompleted = true; try { isCompleted = this._connectSocket.ReceiveAsync(e); } catch (Exception ex) { LogHelper.Debug(this.SessionId + ex.ToString()); this.Close(); } if (!isCompleted) { this.ReceiveHandler.BeginInvoke(e, ReceiveHandlerCallBack, ReceiveHandler); } } void ReceiveHandlerCallBack(IAsyncResult result) { try { (result.AsyncState as ReceiveDataHandler).EndInvoke(result); } catch (Exception e) { LogHelper.Debug(e.Message); } } internal void OnDataRecevied(SessionEventArgs arg) { if (DataRecevied != null) { this._memStream.SetLength(0); DataRecevied.Invoke(this, arg); } } internal void Close() { try { this._connectSocket.Close(); } catch (Exception ex) { LogHelper.Debug("關閉socket異常" + ex.ToString()); } if (this.Closed != null) { this.Closed(); } } internal Action Closed; internal Action<SocketSession, SessionEventArgs> DataRecevied; public void Dispose() { if (_memStream != null) { _memStream.Close(); _memStream.Dispose(); _memStream = null; } } public void Send(byte[] data) { try { if (this.Connection.Flag == SocketFlag.Busy) { this._connectSocket.Send(data); } } catch (Exception ex) { this.Close(); } } private void ReadPackage(byte[] data, int offset, SocketAsyncEventArgs e) { if (data == null || data.Length == 0) { return; } if (offset >= data.Length) { return; } if (offset == 0) { if (_memStream.Length > 0) { _memStream.Write(data, 0, data.Length); data = _memStream.ToArray(); } } //粘包處理 OnReceivedCallBack(data, offset, e); data = null; } private void OnReceivedCallBack(byte[] buffer, int offset, SocketAsyncEventArgs e) { byte[] data = this._protocol.OnDataReceivedCallBack(buffer, ref offset); if (offset == -1) { this.Close(); return; } if (data == null || data.Length == 0) { this._memStream.Write(buffer, offset, buffer.Length - offset); this.ReceiveAsync(e); return; } SessionEventArgs session_args = new SessionEventArgs(); session_args.Data = data; this.OnDataRecevied(session_args); if (offset < buffer.Length) { this.ReadPackageHandler.BeginInvoke(buffer, offset, e, ReadPackageCallBack, ReadPackageHandler); } else { this.ReceiveAsync(e); } data = null; } void ReadPackageCallBack(IAsyncResult result) { try { (result.AsyncState as ReceiveReadPackageHandler).EndInvoke(result); } catch (Exception ex) { LogHelper.Debug(ex.Message); } } }View Code
細心的童鞋可以發現,在ReceiveAsync方法里面,接收數據的地方,當同步接收完成的時候,我們調用了一個異步委托ReceiveHandler.BeginInvoke。
在解析出一個獨立的包,并且緩沖區的數據里面還有多余的包的時候,我們也調用了一個異步的委托ReadPackageHandler.BeginInvoke。
如果緩沖區比較大,比如我現在是8K,而單個包很小,客戶端又發送比較頻繁的時候。會導致在解析包的時候,形成一個短暫的遞歸。遞歸就會不停的壓堆,資源得不到釋放。
運行一段時間后,有可能導致OutOfMemoryException,如果一直是同步接收數據,在Receive的地方,也有可能形成一個遞歸。于是便采用了異步調用的方式。
3、因為socket屬于無邊界的,代碼層面的每一次Send,并不是真正意義上的直接發送給服務器,而只是寫到了緩沖區,由系統來決定什么時候發。如果客戶 端發送非常頻繁的情況下,就可能導致服務器從緩沖區取出來的包,是由多個包一起組成的。從緩沖區取出來的包,并不能保證是一個獨立的應用層的包,需要按既定的協議來解析包。
我們先假定一個簡單的協議,一個包的前4個字節,表明這個包內容的長度。代碼如下:
public class DefaultProtocol : IProtocol { public byte[] OnDataReceivedCallBack(byte[] data, ref int offset) { int length = BitConverter.ToInt32(data, offset); int package_head = 4; int package_length = length + package_head; byte[] buffer = null; if (length > 0) { if (offset + package_length <= data.Length)
新聞熱點
疑難解答