在上一篇博文中,有介紹到用換行符分割消息的方法。但是這種方法有個小問題,如果消息中本身就包含換行符,那將會將這條消息分割成兩條,結果就不對了。
本文介紹另外一種消息分割方式,即上一篇博文中講的第2條:use a fixed length header that indicates the length of the body,用一個固定字節數的Header前綴來指定Body的字節數,以此來分割消息。
上面圖中Header固定為4字節,Header中保存的是一個4字節(32位)的整數,例如12即為0x0000000C,這個整數用來指定Body的長度(字節數)。當讀完這么多字節的Body之后,又是下一條消息的Header。
下面分別用MINA、Netty、Twisted來實現對這種消息的切合和解碼。
MINA:
MINA提供了PRefixedStringCodecFactory來對這種類型的消息進行編碼解碼,PrefixedStringCodecFactory默認Header的大小是4字節,當然也可以指定成1或2。
public class TcpServer { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); // 4字節的Header指定Body的字節數,對這種消息的處理 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8")))); acceptor.setHandler(new TcpServerHandle()); acceptor.bind(new InetSocketAddress(8080)); } } class TcpServerHandle extends IoHandlerAdapter { @Override public void exceptionCaught(Iosession session, Throwable cause) throws Exception { cause.printStackTrace(); } // 接收到新的數據 @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = (String) message; System.out.println("messageReceived:" + msg); } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("sessionClosed"); } }
Netty:
Netty使用LengthFieldBasedFrameDecoder來處理這種消息。下面代碼中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5個參數,分別是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength為消息的最大長度,lengthFieldOffset為Header的位置,lengthFieldLength為Header的長度,lengthAdjustment為長度調整(默認Header中的值表示Body的長度,并不包含Header自己),initialBytesToStrip為去掉字節數(默認解碼后返回Header+Body的全部內容,這里設為4表示去掉4字節的Header,只留下Body)。
public class TcpServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // LengthFieldBasedFrameDecoder按行分割消息,取出body pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)); // 再按UTF-8編碼轉成字符串 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } class TcpServerHandler extends ChannelInboundHandlerAdapter { // 接收到新的數據 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String message = (String) msg; System.out.println("channelRead:" + message); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("channelInactive"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
Twisted:
在Twisted中需要繼承Int32StringReceiver,不再繼承Protocol。Int32StringReceiver表示固定32位(4字節)的Header,另外還有Int16StringReceiver、Int8StringReceiver等。而需要實現的接受數據事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。
# -*- coding:utf-8 –*- from twisted.protocols.basic import Int32StringReceiver from twisted.internet.protocol import Factory from twisted.internet import reactor class TcpServerHandle(Int32StringReceiver): # 新的連接建立 def connectionMade(self): print 'connectionMade' # 連接斷開 def connectionLost(self, reason): print 'connectionLost' # 接收到新的數據 def stringReceived(self, data): print 'stringReceived:' + data factory = Factory() factory.protocol = TcpServerHandle reactor.listenTCP(8080, factory) reactor.run()
下面是java編寫的一個客戶端測試程序:
public class TcpClient { public static void main(String[] args) throws IOException { Socket socket = null; DataOutputStream out = null; try { socket = new Socket("localhost", 8080); out = new DataOutputStream(socket.getOutputStream()); // 請求服務器 String data1 = "牛頓"; byte[] outputBytes1 = data1.getBytes("UTF-8"); out.writeInt(outputBytes1.length); // write header out.write(outputBytes1); // write body String data2 = "愛因斯坦"; byte[] outputBytes2 = data2.getBytes("UTF-8"); out.writeInt(outputBytes2.length); // write header out.write(outputBytes2); // write body out.flush(); } finally { // 關閉連接 out.close(); socket.close(); } } }
MINA服務器輸出結果:
sessionCreatedmessageReceived:牛頓messageReceived:愛因斯坦sessionClosed
Netty服務器輸出結果:
channelActivechannelRead:牛頓channelRead:愛因斯坦channelInactive
Twisted服務器輸出結果:
connectionMadestringReceived:牛頓stringReceived:愛因斯坦connectionLost
新聞熱點
疑難解答