亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

詳解Java 網絡IO編程總結(BIO、NIO、AIO均含完整實例代碼)

2024-07-13 10:15:41
字體:
來源:轉載
供稿:網友

本文會從傳統的BIO到NIO再到AIO自淺至深介紹,并附上完整的代碼講解。

下面代碼中會使用這樣一個例子:客戶端發送一段算式的字符串到服務器,服務器計算后返回結果到客戶端。

代碼的所有說明,都直接作為注釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結果的工具類,見文章代碼部分。

1、BIO編程

1.1、傳統的BIO編程

網絡編程的基本模型是C/S模型,即兩個進程間的通信。

服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功后,雙方通過輸入和輸出流進行同步阻塞式通信。 

簡單的描述一下BIO的服務端通信模型:采用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理沒處理完成后,通過輸出流返回應答給客戶端,線程銷毀。即典型的一請求一應答通宵模型。

傳統BIO通信模型圖:

Java,網絡IO編程,網絡IO

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端并發訪問量增加后,服務端的線程個數和客戶端并發訪問數呈1:1的正比關系,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹后,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O創建的Server源碼:

package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /**  * BIO服務端源碼  * @author yangtao__anxpp.com  * @version 1.0  */ public final class ServerNormal {  //默認的端口號  private static int DEFAULT_PORT = 12345;  //單例的ServerSocket  private static ServerSocket server;  //根據傳入參數設置監聽端口,如果沒有參數調用以下方法并使用默認值  public static void start() throws IOException{   //使用默認值   start(DEFAULT_PORT);  }  //這個方法不會被大量并發訪問,不太需要考慮效率,直接進行方法同步就行了  public synchronized static void start(int port) throws IOException{   if(server != null) return;   try{    //通過構造函數創建ServerSocket    //如果端口合法且空閑,服務端就監聽成功    server = new ServerSocket(port);    System.out.println("服務器已啟動,端口號:" + port);    //通過無線循環監聽客戶端連接    //如果沒有客戶端接入,將阻塞在accept操作上。    while(true){     Socket socket = server.accept();     //當有新的客戶端接入時,會執行下面的代碼     //然后創建一個新的線程處理這條Socket鏈路     new Thread(new ServerHandler(socket)).start();    }   }finally{    //一些必要的清理工作    if(server != null){     System.out.println("服務器已關閉。");     server.close();     server = null;    }   }  } } 

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket;  import com.anxpp.io.utils.Calculator; /**  * 客戶端線程  * @author yangtao__anxpp.com  * 用于處理一個客戶端的Socket鏈路  */ public class ServerHandler implements Runnable{  private Socket socket;  public ServerHandler(Socket socket) {   this.socket = socket;  }  @Override  public void run() {   BufferedReader in = null;   PrintWriter out = null;   try{    in = new BufferedReader(new InputStreamReader(socket.getInputStream()));    out = new PrintWriter(socket.getOutputStream(),true);    String expression;    String result;    while(true){     //通過BufferedReader讀取一行     //如果已經讀到輸入流尾部,返回null,退出循環     //如果得到非空值,就嘗試計算結果并返回     if((expression = in.readLine())==null) break;     System.out.println("服務器收到消息:" + expression);     try{      result = Calculator.cal(expression).toString();     }catch(Exception e){      result = "計算錯誤:" + e.getMessage();     }     out.println(result);    }   }catch(Exception e){    e.printStackTrace();   }finally{    //一些必要的清理工作    if(in != null){     try {      in.close();     } catch (IOException e) {      e.printStackTrace();     }     in = null;    }    if(out != null){     out.close();     out = null;    }    if(socket != null){     try {      socket.close();     } catch (IOException e) {      e.printStackTrace();     }     socket = null;    }   }  } } 

同步阻塞式I/O創建的Client源碼:

package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /**  * 阻塞式I/O創建的客戶端  * @author yangtao__anxpp.com  * @version 1.0  */ public class Client {  //默認的端口號  private static int DEFAULT_SERVER_PORT = 12345;  private static String DEFAULT_SERVER_IP = "127.0.0.1";  public static void send(String expression){   send(DEFAULT_SERVER_PORT,expression);  }  public static void send(int port,String expression){   System.out.println("算術表達式為:" + expression);   Socket socket = null;   BufferedReader in = null;   PrintWriter out = null;   try{    socket = new Socket(DEFAULT_SERVER_IP,port);    in = new BufferedReader(new InputStreamReader(socket.getInputStream()));    out = new PrintWriter(socket.getOutputStream(),true);    out.println(expression);    System.out.println("___結果為:" + in.readLine());   }catch(Exception e){    e.printStackTrace();   }finally{    //一下必要的清理工作    if(in != null){     try {      in.close();     } catch (IOException e) {      e.printStackTrace();     }     in = null;    }    if(out != null){     out.close();     out = null;    }    if(socket != null){     try {      socket.close();     } catch (IOException e) {      e.printStackTrace();     }     socket = null;    }   }  } } 

測試代碼,為了方便在控制臺看輸出結果,放到同一個程序(jvm)中運行:

package com.anxpp.io.calculator.bio; import java.io.IOException; import java.util.Random; /**  * 測試方法  * @author yangtao__anxpp.com  * @version 1.0  */ public class Test {  //測試主方法  public static void main(String[] args) throws InterruptedException {   //運行服務器   new Thread(new Runnable() {    @Override    public void run() {     try {      ServerBetter.start();     } catch (IOException e) {      e.printStackTrace();     }    }   }).start();   //避免客戶端先于服務器啟動前執行代碼   Thread.sleep(100);   //運行客戶端   char operators[] = {'+','-','*','/'};   Random random = new Random(System.currentTimeMillis());   new Thread(new Runnable() {    @SuppressWarnings("static-access")    @Override    public void run() {     while(true){      //隨機產生算術表達式      String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);      Client.send(expression);      try {       Thread.currentThread().sleep(random.nextInt(1000));      } catch (InterruptedException e) {       e.printStackTrace();      }     }    }   }).start();  } } 

其中一次的運行結果:

服務器已啟動,端口號:12345算術表達式為:4-2服務器收到消息:4-2___結果為:2算術表達式為:5-10服務器收到消息:5-10___結果為:-5算術表達式為:0-9服務器收到消息:0-9___結果為:-9算術表達式為:0+6服務器收到消息:0+6___結果為:6算術表達式為:1/6服務器收到消息:1/6___結果為:0.16666666666666666...

從以上代碼,很容易看出,BIO主要的問題在于每當有一個新的客戶端請求接入時,服務端必須創建一個新的線程來處理這條鏈路,在需要滿足高性能、高并發的場景是沒法應用的(大量創建新的線程會嚴重影響服務器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

Java,網絡IO編程,網絡IO

實現很簡單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼即可:

package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**  * BIO服務端源碼__偽異步I/O  * @author yangtao__anxpp.com  * @version 1.0  */ public final class ServerBetter {  //默認的端口號  private static int DEFAULT_PORT = 12345;  //單例的ServerSocket  private static ServerSocket server;  //線程池 懶漢式的單例  private static ExecutorService executorService = Executors.newFixedThreadPool(60);  //根據傳入參數設置監聽端口,如果沒有參數調用以下方法并使用默認值  public static void start() throws IOException{   //使用默認值   start(DEFAULT_PORT);  }  //這個方法不會被大量并發訪問,不太需要考慮效率,直接進行方法同步就行了  public synchronized static void start(int port) throws IOException{   if(server != null) return;   try{    //通過構造函數創建ServerSocket    //如果端口合法且空閑,服務端就監聽成功    server = new ServerSocket(port);    System.out.println("服務器已啟動,端口號:" + port);    //通過無線循環監聽客戶端連接    //如果沒有客戶端接入,將阻塞在accept操作上。    while(true){     Socket socket = server.accept();     //當有新的客戶端接入時,會執行下面的代碼     //然后創建一個新的線程處理這條Socket鏈路     executorService.execute(new ServerHandler(socket));    }   }finally{    //一些必要的清理工作    if(server != null){     System.out.println("服務器已關閉。");     server.close();     server = null;    }   }  } } 

測試運行結果是一樣的。

我們知道,如果使用CachedThreadPool線程池(不限制線程數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理線程(復用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool我們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的偽異步I/O模型。

但是,正因為限制了線程數量,如果發生大量并發請求,超過最大數量的線程就只能等待,直到線程池中的有空閑的線程可以被復用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

  1. 有數據可讀
  2. 可用數據以及讀取完畢
  3. 發生空指針或I/O異常

所以在讀取數據較慢時(比如數據量大、網絡傳輸慢等),大量并發的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

而后面即將介紹的NIO,就能解決這個難題。

2、NIO 編程

JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網絡I/O中都可能會發生,但本文只討論后者。

2.1、簡介

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對于老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

新增的著兩種通道都支持阻塞和非阻塞兩種模式。

阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

對于低負載、低并發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對于高負載、高并發的(網絡)應用,應使用NIO的非阻塞模式來開發。

下面會先對基礎知識進行介紹。

2.2、緩沖區 Buffer

Buffer是一個對象,包含一些要寫入或者讀出的數據。

在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的;在寫入數據時,也是寫入到緩沖區中。任何時候訪問NIO中的數據,都是通過緩沖區進行操作。

緩沖區實際上是一個數組,并提供了對數據結構化訪問以及維護讀寫位置等信息。

具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。

2.3、通道 Channel

我們對數據的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同于流的地方就是通道是雙向的,可以用于讀、寫和同時讀寫操作。

底層的操作系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統的API。

Channel主要分兩大類:

  1. SelectableChannel:用戶網絡讀寫
  2. FileChannel:用于文件操作

后面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

 2.4、多路復用器 Selector

Selector是Java  NIO 編程的基礎。

Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢注冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處于就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進行后續的I/O操作。

一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

2.5、NIO服務端

代碼比傳統的Socket編程看起來要復雜不少。

直接貼代碼吧,以注釋的形式給出代碼說明。

NIO創建的Server源碼:

package com.anxpp.io.calculator.nio; public class Server {  private static int DEFAULT_PORT = 12345;  private static ServerHandle serverHandle;  public static void start(){   start(DEFAULT_PORT);  }  public static synchronized void start(int port){   if(serverHandle!=null)    serverHandle.stop();   serverHandle = new ServerHandle(port);   new Thread(serverHandle,"Server").start();  }  public static void main(String[] args){   start();  } } 

ServerHandle:

package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;  import com.anxpp.io.utils.Calculator; /**  * NIO服務端  * @author yangtao__anxpp.com  * @version 1.0  */ public class ServerHandle implements Runnable{  private Selector selector;  private ServerSocketChannel serverChannel;  private volatile boolean started;  /**   * 構造方法   * @param port 指定要監聽的端口號   */  public ServerHandle(int port) {   try{    //創建選擇器    selector = Selector.open();    //打開監聽通道    serverChannel = ServerSocketChannel.open();    //如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式    serverChannel.configureBlocking(false);//開啟非阻塞模式    //綁定端口 backlog設為1024    serverChannel.socket().bind(new InetSocketAddress(port),1024);    //監聽客戶端連接請求    serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //標記服務器已開啟    started = true;    System.out.println("服務器已啟動,端口號:" + port);   }catch(IOException e){    e.printStackTrace();    System.exit(1);   }  }  public void stop(){   started = false;  }  @Override  public void run() {   //循環遍歷selector   while(started){    try{     //無論是否有讀寫事件發生,selector每隔1s被喚醒一次     selector.select(1000);     //阻塞,只有當至少一個注冊的事件發生的時候才會繼續. //    selector.select();     Set<SelectionKey> keys = selector.selectedKeys();     Iterator<SelectionKey> it = keys.iterator();     SelectionKey key = null;     while(it.hasNext()){      key = it.next();      it.remove();      try{       handleInput(key);      }catch(Exception e){       if(key != null){        key.cancel();        if(key.channel() != null){         key.channel().close();        }       }      }     }    }catch(Throwable t){     t.printStackTrace();    }   }   //selector關閉后會自動釋放里面管理的資源   if(selector != null)    try{     selector.close();    }catch (Exception e) {     e.printStackTrace();    }  }  private void handleInput(SelectionKey key) throws IOException{   if(key.isValid()){    //處理新接入的請求消息    if(key.isAcceptable()){     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();     //通過ServerSocketChannel的accept創建SocketChannel實例     //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立     SocketChannel sc = ssc.accept();     //設置為非阻塞的     sc.configureBlocking(false);     //注冊為讀     sc.register(selector, SelectionKey.OP_READ);    }    //讀消息    if(key.isReadable()){     SocketChannel sc = (SocketChannel) key.channel();     //創建ByteBuffer,并開辟一個1M的緩沖區     ByteBuffer buffer = ByteBuffer.allocate(1024);     //讀取請求碼流,返回讀取到的字節數     int readBytes = sc.read(buffer);     //讀取到字節,對字節進行編解碼     if(readBytes>0){      //將緩沖區當前的limit設置為position=0,用于后續對緩沖區的讀取操作      buffer.flip();      //根據緩沖區可讀字節數創建字節數組      byte[] bytes = new byte[buffer.remaining()];      //將緩沖區可讀字節數組復制到新建的數組中      buffer.get(bytes);      String expression = new String(bytes,"UTF-8");      System.out.println("服務器收到消息:" + expression);      //處理數據      String result = null;      try{       result = Calculator.cal(expression).toString();      }catch(Exception e){       result = "計算錯誤:" + e.getMessage();      }      //發送應答消息      doWrite(sc,result);     }     //沒有讀取到字節 忽略 //    else if(readBytes==0);     //鏈路已經關閉,釋放資源     else if(readBytes<0){      key.cancel();      sc.close();     }    }   }  }  //異步發送應答消息  private void doWrite(SocketChannel channel,String response) throws IOException{   //將消息編碼為字節數組   byte[] bytes = response.getBytes();   //根據數組容量創建ByteBuffer   ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);   //將字節數組復制到緩沖區   writeBuffer.put(bytes);   //flip操作   writeBuffer.flip();   //發送緩沖區的字節數組   channel.write(writeBuffer);   //****此處不含處理“寫半包”的代碼  } } 

    可以看到,創建NIO服務端的主要步驟如下:

  1. 打開ServerSocketChannel,監聽客戶端連接
  2. 綁定監聽端口,設置連接為非阻塞模式
  3. 創建Reactor線程,創建多路復用器并啟動線程
  4. 將ServerSocketChannel注冊到Reactor線程中的Selector上,監聽ACCEPT事件
  5. Selector輪詢準備就緒的key
  6. Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
  7. 設置客戶端鏈路為非阻塞模式
  8. 將新接入的客戶端連接注冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息
  9. 異步讀取客戶端消息到緩沖區
  10. 對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task
  11. 將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端

因為應答消息的發送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發送的數據發送完,此時就會出現寫半包的問題。我們需要注冊寫操作,不斷輪詢Selector將沒有發送完的消息發送完畢,然后通過Buffer的hasRemain()方法判斷消息是否發送完成。

2.6、NIO客戶端

還是直接上代碼吧,過程也不需要太多解釋了,跟服務端代碼有點類似。

Client:

package com.anxpp.io.calculator.nio; public class Client {  private static String DEFAULT_HOST = "127.0.0.1";  private static int DEFAULT_PORT = 12345;  private static ClientHandle clientHandle;  public static void start(){   start(DEFAULT_HOST,DEFAULT_PORT);  }  public static synchronized void start(String ip,int port){   if(clientHandle!=null)    clientHandle.stop();   clientHandle = new ClientHandle(ip,port);   new Thread(clientHandle,"Server").start();  }  //向服務器發送消息  public static boolean sendMsg(String msg) throws Exception{   if(msg.equals("q")) return false;   clientHandle.sendMsg(msg);   return true;  }  public static void main(String[] args){   start();  } } 

ClientHandle:

package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /**  * NIO客戶端  * @author yangtao__anxpp.com  * @version 1.0  */ public class ClientHandle implements Runnable{  private String host;  private int port;  private Selector selector;  private SocketChannel socketChannel;  private volatile boolean started;   public ClientHandle(String ip,int port) {   this.host = ip;   this.port = port;   try{    //創建選擇器    selector = Selector.open();    //打開監聽通道    socketChannel = SocketChannel.open();    //如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式    socketChannel.configureBlocking(false);//開啟非阻塞模式    started = true;   }catch(IOException e){    e.printStackTrace();    System.exit(1);   }  }  public void stop(){   started = false;  }  @Override  public void run() {   try{    doConnect();   }catch(IOException e){    e.printStackTrace();    System.exit(1);   }   //循環遍歷selector   while(started){    try{     //無論是否有讀寫事件發生,selector每隔1s被喚醒一次     selector.select(1000);     //阻塞,只有當至少一個注冊的事件發生的時候才會繼續. //    selector.select();     Set<SelectionKey> keys = selector.selectedKeys();     Iterator<SelectionKey> it = keys.iterator();     SelectionKey key = null;     while(it.hasNext()){      key = it.next();      it.remove();      try{       handleInput(key);      }catch(Exception e){       if(key != null){        key.cancel();        if(key.channel() != null){         key.channel().close();        }       }      }     }    }catch(Exception e){     e.printStackTrace();     System.exit(1);    }   }   //selector關閉后會自動釋放里面管理的資源   if(selector != null)    try{     selector.close();    }catch (Exception e) {     e.printStackTrace();    }  }  private void handleInput(SelectionKey key) throws IOException{   if(key.isValid()){    SocketChannel sc = (SocketChannel) key.channel();    if(key.isConnectable()){     if(sc.finishConnect());     else System.exit(1);    }    //讀消息    if(key.isReadable()){     //創建ByteBuffer,并開辟一個1M的緩沖區     ByteBuffer buffer = ByteBuffer.allocate(1024);     //讀取請求碼流,返回讀取到的字節數     int readBytes = sc.read(buffer);     //讀取到字節,對字節進行編解碼     if(readBytes>0){      //將緩沖區當前的limit設置為position=0,用于后續對緩沖區的讀取操作      buffer.flip();      //根據緩沖區可讀字節數創建字節數組      byte[] bytes = new byte[buffer.remaining()];      //將緩沖區可讀字節數組復制到新建的數組中      buffer.get(bytes);      String result = new String(bytes,"UTF-8");      System.out.println("客戶端收到消息:" + result);     }     //沒有讀取到字節 忽略 //    else if(readBytes==0);     //鏈路已經關閉,釋放資源     else if(readBytes<0){      key.cancel();      sc.close();     }    }   }  }  //異步發送消息  private void doWrite(SocketChannel channel,String request) throws IOException{   //將消息編碼為字節數組   byte[] bytes = request.getBytes();   //根據數組容量創建ByteBuffer   ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);   //將字節數組復制到緩沖區   writeBuffer.put(bytes);   //flip操作   writeBuffer.flip();   //發送緩沖區的字節數組   channel.write(writeBuffer);   //****此處不含處理“寫半包”的代碼  }  private void doConnect() throws IOException{   if(socketChannel.connect(new InetSocketAddress(host,port)));   else socketChannel.register(selector, SelectionKey.OP_CONNECT);  }  public void sendMsg(String msg) throws Exception{   socketChannel.register(selector, SelectionKey.OP_READ);   doWrite(socketChannel, msg);  } } 

2.7、演示結果

首先運行服務器,順便也運行一個客戶端:

package com.anxpp.io.calculator.nio; import java.util.Scanner; /**  * 測試方法  * @author yangtao__anxpp.com  * @version 1.0  */ public class Test {  //測試主方法  @SuppressWarnings("resource")  public static void main(String[] args) throws Exception{   //運行服務器   Server.start();   //避免客戶端先于服務器啟動前執行代碼   Thread.sleep(100);   //運行客戶端   Client.start();   while(Client.sendMsg(new Scanner(System.in).nextLine()));  } } 

我們也可以單獨運行客戶端,效果都是一樣的。

一次測試的結果:

服務器已啟動,端口號:123451+2+3+4+5+6服務器收到消息:1+2+3+4+5+6客戶端收到消息:211*2/3-4+5*6/7-8服務器收到消息:1*2/3-4+5*6/7-8客戶端收到消息:-7.0476190476190474

運行多個客戶端,都是沒有問題的。

3、AIO編程

NIO 2.0引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實現。

異步的套接字通道時真正的異步非阻塞I/O,對應于UNIX網絡編程中的事件驅動I/O(AIO)。他不需要過多的Selector對注冊的通道進行輪詢即可實現異步讀寫,從而簡化了NIO的編程模型。

直接上代碼吧。

3.1、Server端代碼

Server:

package com.anxpp.io.calculator.aio.server; /**  * AIO服務端  * @author yangtao__anxpp.com  * @version 1.0  */ public class Server {  private static int DEFAULT_PORT = 12345;  private static AsyncServerHandler serverHandle;  public volatile static long clientCount = 0;  public static void start(){   start(DEFAULT_PORT);  }  public static synchronized void start(int port){   if(serverHandle!=null)    return;   serverHandle = new AsyncServerHandler(port);   new Thread(serverHandle,"Server").start();  }  public static void main(String[] args){   Server.start();  } } 

AsyncServerHandler:

package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncServerHandler implements Runnable {  public CountDownLatch latch;  public AsynchronousServerSocketChannel channel;  public AsyncServerHandler(int port) {   try {    //創建服務端通道    channel = AsynchronousServerSocketChannel.open();    //綁定端口    channel.bind(new InetSocketAddress(port));    System.out.println("服務器已啟動,端口號:" + port);   } catch (IOException e) {    e.printStackTrace();   }  }  @Override  public void run() {   //CountDownLatch初始化   //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞   //此處,讓現場在此阻塞,防止服務端執行完成后退出   //也可以使用while(true)+sleep   //生成環境就不需要擔心這個問題,以為服務端是不會退出的   latch = new CountDownLatch(1);   //用于接收客戶端的連接   channel.accept(this,new AcceptHandler());   try {    latch.await();   } catch (InterruptedException e) {    e.printStackTrace();   }  } } 

AcceptHandler:

package com.anxpp.io.calculator.aio.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; //作為handler接收客戶端連接 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {  @Override  public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {   //繼續接受其他客戶端的請求   Server.clientCount++;   System.out.println("連接的客戶端數:" + Server.clientCount);   serverHandler.channel.accept(serverHandler, this);   //創建新的Buffer   ByteBuffer buffer = ByteBuffer.allocate(1024);   //異步讀 第三個參數為接收消息回調的業務Handler   channel.read(buffer, buffer, new ReadHandler(channel));  }  @Override  public void failed(Throwable exc, AsyncServerHandler serverHandler) {   exc.printStackTrace();   serverHandler.latch.countDown();  } } 

ReadHandler:

package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {  //用于讀取半包消息和發送應答  private AsynchronousSocketChannel channel;  public ReadHandler(AsynchronousSocketChannel channel) {    this.channel = channel;  }  //讀取到消息后的處理  @Override  public void completed(Integer result, ByteBuffer attachment) {   //flip操作   attachment.flip();   //根據   byte[] message = new byte[attachment.remaining()];   attachment.get(message);   try {    String expression = new String(message, "UTF-8");    System.out.println("服務器收到消息: " + expression);    String calrResult = null;    try{     calrResult = Calculator.cal(expression).toString();    }catch(Exception e){     calrResult = "計算錯誤:" + e.getMessage();    }    //向客戶端發送消息    doWrite(calrResult);   } catch (UnsupportedEncodingException e) {    e.printStackTrace();   }  }  //發送消息  private void doWrite(String result) {   byte[] bytes = result.getBytes();   ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);   writeBuffer.put(bytes);   writeBuffer.flip();   //異步寫數據 參數與前面的read一樣   channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {    @Override    public void completed(Integer result, ByteBuffer buffer) {     //如果沒有發送完,就繼續發送直到完成     if (buffer.hasRemaining())      channel.write(buffer, buffer, this);     else{      //創建新的Buffer      ByteBuffer readBuffer = ByteBuffer.allocate(1024);      //異步讀 第三個參數為接收消息回調的業務Handler      channel.read(readBuffer, readBuffer, new ReadHandler(channel));     }    }    @Override    public void failed(Throwable exc, ByteBuffer attachment) {     try {      channel.close();     } catch (IOException e) {     }    }   });  }  @Override  public void failed(Throwable exc, ByteBuffer attachment) {   try {    this.channel.close();   } catch (IOException e) {    e.printStackTrace();   }  } } 

OK,這樣就已經完成了,其實說起來也簡單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

下面看客戶端代碼。

3.2、Client端代碼

Client:

package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client {  private static String DEFAULT_HOST = "127.0.0.1";  private static int DEFAULT_PORT = 12345;  private static AsyncClientHandler clientHandle;  public static void start(){   start(DEFAULT_HOST,DEFAULT_PORT);  }  public static synchronized void start(String ip,int port){   if(clientHandle!=null)    return;   clientHandle = new AsyncClientHandler(ip,port);   new Thread(clientHandle,"Client").start();  }  //向服務器發送消息  public static boolean sendMsg(String msg) throws Exception{   if(msg.equals("q")) return false;   clientHandle.sendMsg(msg);   return true;  }  @SuppressWarnings("resource")  public static void main(String[] args) throws Exception{   Client.start();   System.out.println("請輸入請求消息:");   Scanner scanner = new Scanner(System.in);   while(Client.sendMsg(scanner.nextLine()));  } } 

AsyncClientHandler:

package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {  private AsynchronousSocketChannel clientChannel;  private String host;  private int port;  private CountDownLatch latch;  public AsyncClientHandler(String host, int port) {   this.host = host;   this.port = port;   try {    //創建異步的客戶端通道    clientChannel = AsynchronousSocketChannel.open();   } catch (IOException e) {    e.printStackTrace();   }  }  @Override  public void run() {   //創建CountDownLatch等待   latch = new CountDownLatch(1);   //發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法   clientChannel.connect(new InetSocketAddress(host, port), this, this);   try {    latch.await();   } catch (InterruptedException e1) {    e1.printStackTrace();   }   try {    clientChannel.close();   } catch (IOException e) {    e.printStackTrace();   }  }  //連接服務器成功  //意味著TCP三次握手完成  @Override  public void completed(Void result, AsyncClientHandler attachment) {   System.out.println("客戶端成功連接到服務器...");  }  //連接服務器失敗  @Override  public void failed(Throwable exc, AsyncClientHandler attachment) {   System.err.println("連接服務器失敗...");   exc.printStackTrace();   try {    clientChannel.close();    latch.countDown();   } catch (IOException e) {    e.printStackTrace();   }  }  //向服務器發送消息  public void sendMsg(String msg){   byte[] req = msg.getBytes();   ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);   writeBuffer.put(req);   writeBuffer.flip();   //異步寫   clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));  } } 

WriteHandler:
 

package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {  private AsynchronousSocketChannel clientChannel;  private CountDownLatch latch;  public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {   this.clientChannel = clientChannel;   this.latch = latch;  }  @Override  public void completed(Integer result, ByteBuffer buffer) {   //完成全部數據的寫入   if (buffer.hasRemaining()) {    clientChannel.write(buffer, buffer, this);   }   else {    //讀取數據    ByteBuffer readBuffer = ByteBuffer.allocate(1024);    clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));   }  }  @Override  public void failed(Throwable exc, ByteBuffer attachment) {   System.err.println("數據發送失敗...");   try {    clientChannel.close();    latch.countDown();   } catch (IOException e) {   }  } } 

ReadHandler:

package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {  private AsynchronousSocketChannel clientChannel;  private CountDownLatch latch;  public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {   this.clientChannel = clientChannel;   this.latch = latch;  }  @Override  public void completed(Integer result,ByteBuffer buffer) {   buffer.flip();   byte[] bytes = new byte[buffer.remaining()];   buffer.get(bytes);   String body;   try {    body = new String(bytes,"UTF-8");    System.out.println("客戶端收到結果:"+ body);   } catch (UnsupportedEncodingException e) {    e.printStackTrace();   }  }  @Override  public void failed(Throwable exc,ByteBuffer attachment) {   System.err.println("數據讀取失敗...");   try {    clientChannel.close();    latch.countDown();   } catch (IOException e) {   }  } } 

這個API使用起來真的是很順手。

3.3、測試

Test:

package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /**  * 測試方法  * @author yangtao__anxpp.com  * @version 1.0  */ public class Test {  //測試主方法  @SuppressWarnings("resource")  public static void main(String[] args) throws Exception{   //運行服務器   Server.start();   //避免客戶端先于服務器啟動前執行代碼   Thread.sleep(100);   //運行客戶端   Client.start();   System.out.println("請輸入請求消息:");   Scanner scanner = new Scanner(System.in);   while(Client.sendMsg(scanner.nextLine()));  } } 

我們可以在控制臺輸入我們需要計算的算數字符串,服務器就會返回結果,當然,我們也可以運行大量的客戶端,都是沒有問題的,以為此處設計為單例客戶端,所以也就沒有演示大量客戶端并發。

讀者可以自己修改Client類,然后開辟大量線程,并使用構造方法創建很多的客戶端測試。

下面是其中一次參數的輸出:

服務器已啟動,端口號:12345請輸入請求消息:客戶端成功連接到服務器...連接的客戶端數:1123456+789+456服務器收到消息: 123456+789+456客戶端收到結果:1247019526*56服務器收到消息: 9526*56客戶端收到結果:533456...

AIO是真正的異步非阻塞的,所以,在面對超級大量的客戶端,更能得心應手。

下面就比較一下,幾種I/O編程的優缺點。

4、各種I/O的對比

先以一張表來直觀的對比一下:

Java,網絡IO編程,網絡IO

具體選擇什么樣的模型或者NIO框架,完全基于業務的實際應用場景和性能需求,如果客戶端很少,服務器負荷不重,就沒有必要選擇開發起來相對不那么簡單的NIO做服務端;相反,就應考慮使用NIO或者相關的框架了。

5、附錄

上文中服務端使用到的用于計算的工具類:

package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator {  private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{  return jse.eval(expression); }}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
久久久久久久999精品视频| 91成人福利在线| 色在人av网站天堂精品| 国产精品ⅴa在线观看h| 91天堂在线观看| 成人福利视频网| 国产精品视频xxxx| 91禁国产网站| 国产精品丝袜视频| 黑人巨大精品欧美一区免费视频| 日本久久久久久久| 中文字幕国内精品| 国产91精品视频在线观看| 亚洲最大在线视频| 九九视频这里只有精品| 超碰日本道色综合久久综合| 欧美激情在线观看| 中文字幕亚洲无线码在线一区| 欧美久久久精品| 亚洲欧美综合区自拍另类| 亚洲国产女人aaa毛片在线| 精品日本美女福利在线观看| 亚洲美女精品成人在线视频| 一本久久综合亚洲鲁鲁| 亚洲电影成人av99爱色| 91在线精品视频| 91香蕉亚洲精品| 激情懂色av一区av二区av| 2020国产精品视频| 国产精品成人va在线观看| 国产精品久久久久一区二区| 国产999精品久久久| 色先锋资源久久综合5566| 成人激情免费在线| 久久久久久久亚洲精品| 1769国内精品视频在线播放| 精品一区二区电影| 美女av一区二区| 欧美日韩一区免费| 美女啪啪无遮挡免费久久网站| 精品日韩中文字幕| 在线激情影院一区| 91成人免费观看网站| 欧美第一页在线| 欧美黄色成人网| 亚洲乱亚洲乱妇无码| 成人免费xxxxx在线观看| 欧美视频一区二区三区…| 国产精品极品尤物在线观看| 亚洲男人第一av网站| 国产欧美一区二区三区在线| 欧美黑人一级爽快片淫片高清| 国产亚洲精品一区二区| 国产精品欧美在线| 国产美女精品视频| 久久国产精品影视| 亚洲欧美综合另类中字| 欧美激情精品久久久久久久变态| 日韩经典中文字幕| 亚洲人成在线观| 国产精品入口尤物| 国产精品中文字幕在线观看| 亚洲精品天天看| 精品亚洲一区二区三区在线观看| 欧美怡红院视频一区二区三区| 中文字幕av一区中文字幕天堂| 国产精品九九久久久久久久| 亚洲韩国欧洲国产日产av| 国产视频精品免费播放| 日本成人在线视频网址| 91久久嫩草影院一区二区| 亚洲视频在线观看视频| 97视频在线观看免费高清完整版在线观看| 日韩av中文在线| 欧美日韩中国免费专区在线看| 欧美日韩在线视频一区| 国产色婷婷国产综合在线理论片a| 成人两性免费视频| 操人视频在线观看欧美| 91精品久久久久久综合乱菊| 日韩资源在线观看| 国产精品久久久久9999| 欧美国产日韩一区| 狠狠操狠狠色综合网| 亚洲情综合五月天| 91精品国产乱码久久久久久久久| 亚洲免费成人av电影| 97色在线视频| 美女视频黄免费的亚洲男人天堂| 欧美国产日韩xxxxx| 国产一区二区三区精品久久久| 久久99青青精品免费观看| 性色av一区二区咪爱| 欧美一级大片在线观看| 欧美成人精品激情在线观看| 久久久精品一区二区三区| 日本高清久久天堂| 欧美在线视频观看免费网站| 精品久久久久久中文字幕大豆网| 97香蕉久久超级碰碰高清版| 九九九热精品免费视频观看网站| 亚洲黄色www网站| 日韩在线观看av| 亚洲国产精品字幕| 在线观看日韩专区| 亚洲欧美一区二区三区情侣bbw| 97在线视频免费| 亚洲视频自拍偷拍| 亚洲电影免费观看高清| 国产精品视频区| 久久在线观看视频| 日韩精品视频免费专区在线播放| 夜夜嗨av一区二区三区四区| 亚洲欧美综合v| 色综合伊人色综合网| 国产精品综合久久久| 欧美中文字幕在线播放| 精品福利一区二区| 亚洲综合小说区| 久久久久久久影视| 97视频色精品| 亚洲自拍小视频| 欧美日韩精品二区| 亚洲免费高清视频| 国产91对白在线播放| 中文国产亚洲喷潮| 精品亚洲国产成av人片传媒| 2024亚洲男人天堂| 亚洲人成网7777777国产| 国产成人一区二区| 国产欧美精品xxxx另类| 韩国19禁主播vip福利视频| 亚洲香蕉成视频在线观看| 中文.日本.精品| 在线视频日韩精品| 亚洲精品久久久久久久久久久久久| 久久噜噜噜精品国产亚洲综合| 亚洲一区制服诱惑| 久久久久久久一区二区| 55夜色66夜色国产精品视频| 欧美午夜影院在线视频| 热re99久久精品国产66热| 久久五月天色综合| 国产精品久久国产精品99gif| 91福利视频在线观看| 国产91精品网站| 91在线国产电影| xvideos成人免费中文版| 精品女同一区二区三区在线播放| 欧美国产欧美亚洲国产日韩mv天天看完整| 亚洲精品电影在线观看| 亚洲综合在线播放| 久久精品中文字幕免费mv| 欧美亚洲成人精品| 亚洲大胆人体视频| 亚洲电影在线观看| 欧美成人性色生活仑片| 免费91麻豆精品国产自产在线观看| 国产精品96久久久久久| 国产精品永久免费观看| 国产精品爽黄69天堂a| 国产精品久久久久免费a∨| 亚洲国产精彩中文乱码av| 国产精品对白刺激|