亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

基于Netty的RPC架構筆記3之線程模型源碼分析(1)

2019-11-14 09:02:40
字體:
來源:轉載
供稿:網友

      隨著用戶量上升,項目的架構也在不斷的升級,由最開始的MVC的垂直架構(傳統項目)到RPC架構(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到后來變成多路復用,也是阻塞IO。到非阻塞NIO,再到異步非阻塞AIO,

     言歸正傳,接著談netty,傳統IO是一個線程服務一個客戶,后來通過netty,可以一個線程服務多個客戶,下面的那個圖展示的是netty的NIO通過引入多線程來提高性能,既一個線程負責一片用戶

直接上代碼

package com.cn;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 啟動函數 * */public class Start {	public static void main(String[] args) {				//初始化線程		NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());				//獲取服務類		ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);				//綁定端口		bootstrap.bind(new InetSocketAddress(10101));				System.out.PRintln("start");	}}
package com.cn.pool;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector線程管理者 * */public class NioSelectorRunnablePool {	/**	 * boss線程數組	 */	private final AtomicInteger bossIndex = new AtomicInteger();	private Boss[] bosses;	/**	 * worker線程數組	 */	private final AtomicInteger workerIndex = new AtomicInteger();	private Worker[] workeres;		public NioSelectorRunnablePool(Executor boss, Executor worker) {		initBoss(boss, 1);		initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);	}	/**	 * 初始化boss線程	 * @param boss	 * @param count	 */	private void initBoss(Executor boss, int count) {		this.bosses = new NioServerBoss[count];		for (int i = 0; i < bosses.length; i++) {			bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);		}	}	/**	 * 初始化worker線程	 * @param worker	 * @param count	 */	private void initWorker(Executor worker, int count) {		this.workeres = new NioServerWorker[count];		for (int i = 0; i < workeres.length; i++) {			workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);		}	}	/**	 * 獲取一個worker	 * @return	 */	public Worker nextWorker() {		 return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];	}	/**	 * 獲取一個boss	 * @return	 */	public Boss nextBoss() {		 return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];	}}
package com.cn;import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服務類 * */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;		public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {		this.selectorRunnablePool = selectorRunnablePool;	}		/**	 * 綁定端口	 * @param localAddress	 */	public void bind(final SocketAddress localAddress){		try {			// 獲得一個ServerSocket通道			ServerSocketChannel serverChannel = ServerSocketChannel.open();			// 設置通道為非阻塞			serverChannel.configureBlocking(false);			// 將該通道對應的ServerSocket綁定到port端口			serverChannel.socket().bind(localAddress);						//獲取一個boss線程			Boss nextBoss = selectorRunnablePool.nextBoss();			//向boss注冊一個ServerSocket通道			nextBoss.registerAcceptChannelTask(serverChannel);		} catch (Exception e) {			e.printStackTrace();		}	}}
package com.cn.pool;import java.nio.channels.SocketChannel;/** * worker接口 * */public interface Worker {		/**	 * 加入一個新的客戶端會話	 * @param channel	 */	public void registerNewChannelTask(SocketChannel channel);}
package com.cn.pool;import java.nio.channels.ServerSocketChannel;/** * boss接口 * */public interface Boss {		/**	 * 加入一個新的ServerSocket	 * @param serverChannel	 */	public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}
package com.cn;import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector線程類 *  *  */public abstract class AbstractNioSelector implements Runnable {	/**	 * 線程池	 */	private final Executor executor;	/**	 * 選擇器	 */	protected Selector selector;	/**	 * 選擇器wakenUp狀態標記	 */	protected final AtomicBoolean wakenUp = new AtomicBoolean();	/**	 * 任務隊列	 */	private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();	/**	 * 線程名稱	 */	private String threadName;		/**	 * 線程管理對象	 */	protected NioSelectorRunnablePool selectorRunnablePool;	AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		this.executor = executor;		this.threadName = threadName;		this.selectorRunnablePool = selectorRunnablePool;		openSelector();	}	/**	 * 獲取selector并啟動線程	 */	private void openSelector() {		try {			this.selector = Selector.open();		} catch (IOException e) {			throw new RuntimeException("Failed to create a selector.");		}		executor.execute(this);	}	@Override	public void run() {				Thread.currentThread().setName(this.threadName);		while (true) {			try {				wakenUp.set(false);				select(selector);				processTaskQueue();				process(selector);			} catch (Exception e) {				// ignore			}		}	}	/**	 * 注冊一個任務并激活selector	 * 	 * @param task	 */	protected final void registerTask(Runnable task) {		taskQueue.add(task);		Selector selector = this.selector;		if (selector != null) {			if (wakenUp.compareAndSet(false, true)) {				selector.wakeup();			}		} else {			taskQueue.remove(task);		}	}	/**	 * 執行隊列里的任務	 */	private void processTaskQueue() {		for (;;) {			final Runnable task = taskQueue.poll();			if (task == null) {				break;			}			task.run();		}	}		/**	 * 獲取線程管理對象	 * @return	 */	public NioSelectorRunnablePool getSelectorRunnablePool() {		return selectorRunnablePool;	}	/**	 * select抽象方法	 * 	 * @param selector	 * @return	 * @throws IOException	 */	protected abstract int select(Selector selector) throws IOException;	/**	 * selector的業務處理	 * 	 * @param selector	 * @throws IOException	 */	protected abstract void process(Selector selector) throws IOException;}
package com.cn;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss實現類 * */public class NioServerBoss extends AbstractNioSelector implements Boss{	public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }                for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {            SelectionKey key = i.next();            i.remove();            ServerSocketChannel server = (ServerSocketChannel) key.channel();    		// 新客戶端    		SocketChannel channel = server.accept();    		// 設置為非阻塞    		channel.configureBlocking(false);    		// 獲取一個worker    		Worker nextworker = getSelectorRunnablePool().nextWorker();    		// 注冊新客戶端接入任務    		nextworker.registerNewChannelTask(channel);    		    		System.out.println("新客戶端鏈接");        }	}			public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//注冊serverChannel到selector					serverChannel.register(selector, SelectionKey.OP_ACCEPT);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}		@Override	protected int select(Selector selector) throws IOException {		return selector.select();	}}
package com.cn;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker實現類 * */public class NioServerWorker extends AbstractNioSelector implements Worker{	public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();		while (ite.hasNext()) {			SelectionKey key = (SelectionKey) ite.next();			// 移除,防止重復處理			ite.remove();						// 得到事件發生的Socket通道			SocketChannel channel = (SocketChannel) key.channel();						// 數據總長度			int ret = 0;			boolean failure = true;			ByteBuffer buffer = ByteBuffer.allocate(1024);			//讀取數據			try {				ret = channel.read(buffer);				failure = false;			} catch (Exception e) {				// ignore			}			//判斷是否連接已斷開			if (ret <= 0 || failure) {				key.cancel();				System.out.println("客戶端斷開連接");	        }else{	        	 System.out.println("收到數據:" + new String(buffer.array()));	        	 	     		//回寫數據	     		ByteBuffer outBuffer = ByteBuffer.wrap("收到/n".getBytes());	     		channel.write(outBuffer);// 將消息回送給客戶端	        }		}	}	/**	 * 加入一個新的socket客戶端	 */	public void registerNewChannelTask(final SocketChannel channel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//將客戶端注冊到selector中					channel.register(selector, SelectionKey.OP_READ);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}	@Override	protected int select(Selector selector) throws IOException {		return selector.select(500);	}	}


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
久久久精品2019中文字幕神马| 欧美日韩国产精品| 欧美成人中文字幕| 国内精品久久久久久久久| 精品国偷自产在线视频99| 日韩视频免费大全中文字幕| 欧美在线一级va免费观看| 亚洲天堂av高清| 欧美日本中文字幕| 国产精品成人av在线| 中文字幕少妇一区二区三区| 日韩欧美成人精品| 欧美精品情趣视频| 91精品国产99| 欧美日韩国产一中文字不卡| 热re99久久精品国产66热| 亚洲人成电影网站色| 国产视频久久久久久久| 91精品国产高清| 国产精品久久久久久av下载红粉| 精品免费在线观看| 中文字幕v亚洲ⅴv天堂| 92看片淫黄大片看国产片| 欧美激情乱人伦| 亚洲黄色有码视频| 国产一区二区在线免费| 国产91精品高潮白浆喷水| 亚洲国产精品成人av| 日韩中文在线中文网三级| 欧美重口另类videos人妖| 亚洲国产成人久久综合一区| 欧美激情一二三| 亚洲一区二区三区在线免费观看| 丰满岳妇乱一区二区三区| 欧美另类在线播放| 亚洲人成电影网站| 日韩电影在线观看免费| 欧美成人中文字幕在线| 欧美激情一区二区三区高清视频| 欧美激情伊人电影| 亚洲国产天堂久久国产91| 日韩美女写真福利在线观看| 国产国产精品人在线视| 久久久久久999| 91久久精品在线| 91久久久久久久久久| 青青久久av北条麻妃黑人| 亚洲午夜小视频| 国产69精品久久久久久| 国产精品视频大全| 国产精品视频一区二区高潮| 国产亚洲人成a一在线v站| 欧美中文在线视频| 久久精品国产2020观看福利| 免费97视频在线精品国自产拍| 91免费人成网站在线观看18| 欧美成年人在线观看| 韩国19禁主播vip福利视频| 国产精品专区h在线观看| 色青青草原桃花久久综合| 色悠久久久久综合先锋影音下载| 精品国产31久久久久久| 欧美激情国产日韩精品一区18| 欧美成人手机在线| 日韩毛片在线看| 欧美电影在线观看高清| 精品久久久精品| 成人网在线观看| 欧美一区二区三区精品电影| 精品magnet| 在线播放日韩专区| 丝袜亚洲欧美日韩综合| 日韩小视频在线| 国产91久久婷婷一区二区| 红桃视频成人在线观看| 国产精品久久久久久一区二区| 午夜精品久久久久久99热软件| 国产精品久久77777| 精品视频在线播放| 欧美第一黄网免费网站| 91chinesevideo永久地址| 久久国内精品一国内精品| 国产精品日本精品| 亚洲美女视频网| 国语自产精品视频在线看一大j8| 欧美日韩亚洲激情| 欧美成人精品一区| 欧美日韩一区二区三区| 欧美激情中文字幕乱码免费| 91日韩在线视频| 久国内精品在线| 最新国产成人av网站网址麻豆| 国产日产久久高清欧美一区| 久久精品视频一| 亚洲影视九九影院在线观看| 日韩av在线高清| 亚洲夜晚福利在线观看| 国产成人精品在线| 欧美日韩一区二区免费在线观看| 国产精品r级在线| 亚洲春色另类小说| 成人在线视频网| 韩国三级日本三级少妇99| 亚洲国产日韩精品在线| 久久久久久久久久久免费| 国产欧美一区二区三区久久人妖| 久久久久北条麻妃免费看| 国产精品第一区| 国产精品亚洲激情| 日韩av影院在线观看| 亚洲欧美综合图区| 在线观看成人黄色| 久久精品国产亚洲| 国产色综合天天综合网| 97国产精品视频人人做人人爱| 国产成人亚洲综合| 91免费国产网站| 色综合五月天导航| 美女999久久久精品视频| 亚洲电影成人av99爱色| 中文字幕一区二区三区电影| 日韩成人av一区| 亚洲视频精品在线| 国产成人aa精品一区在线播放| 久久久久久久久网站| 国产精品扒开腿做爽爽爽男男| 国产日韩在线精品av| 亚洲激情免费观看| …久久精品99久久香蕉国产| 欧美国产精品va在线观看| 国产欧美久久久久久| 精品免费在线观看| 九九精品在线视频| 亚洲亚裔videos黑人hd| 日韩在线视频二区| 一个色综合导航| 黄色精品在线看| 国产丝袜精品第一页| 97精品一区二区视频在线观看| 欧美夫妻性生活视频| 中文字幕日韩欧美| 欧美成在线观看| 久久精品一本久久99精品| 欧美日韩国产限制| 亚洲精品国产精品国自产在线| 久久久久免费精品国产| 久久视频在线视频| 精品亚洲一区二区三区在线观看| 欧美最近摘花xxxx摘花| 日韩美女视频中文字幕| 亚洲欧美国产日韩中文字幕| 精品亚洲精品福利线在观看| 欧美日韩在线观看视频| 欧美中文在线视频| 91亚洲一区精品| 国产这里只有精品| 中文字幕精品在线视频| 国产精品18久久久久久首页狼| 成人网在线免费观看| 中文字幕日韩精品在线| 欧美激情一区二区三区高清视频| 最近2019年好看中文字幕视频| 亚洲欧美日韩精品久久奇米色影视| 久久视频在线看|