ctl是一個AtomicInteger類型的原子對象。ctl記錄了"線程池中的任務數量"和"線程池狀態"2個信息。ctl共包括32位。其中,高3位表示"線程池狀態",低29位表示"線程池中的任務數量"。
RUNNING -- 對應的高3位值是111SHUTDOWN -- 對應的高3位值是000STOP -- 對應的高3位值是001TIDYING -- 對應的高3位值是010TERMINATED -- 對應的高3位值是011線程池各個狀態之間的切換如下圖所示:
線程池各個狀態間的轉換的詳細解釋如下所示。
1> RUNNING(111) -> SHUTDOWN(000) : 調用了shutdown方法,線程池實現了finalize方法,在里面調用了shutdown方法,因此shutdown可能是在finalize中被隱式調用的
2> (RUNNING(111) or SHUTDOWN(000)) -> STOP(001) 調用了shutdownNow方法
3> SHUTDOWN(000) -> TIDYING(010) : 當隊列和線程池均為空的時候
4> STOP(001) -> TIDYING(010) : 當線程池為空的時候
5> TIDYING(010) -> TERMINATED(011) : terminated()方法調用完畢
說明:擴號后的3位數字表示ctl的高3位二進制值,并不關注低29位二進制的值
還有一些對常量的操作方法,只說明部分,其他的有興趣自己可以去查看,如下:
[java] view plain copyprint?
private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到線程運行狀態 private static int workerCountOf(int c) { return c & CAPACITY; } // 得到活動線程數 private static int ctlOf(int rs, int wc) { return rs | wc; } // 得到兩者表示的值 ![]()
private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到線程運行狀態 private static int workerCountOf(int c) { return c & CAPACITY; } // 得到活動線程數 private static int ctlOf(int rs, int wc) { return rs | wc; } // 得到兩者表示的值來看一下ThreadPoolExecutor()中最主要的一個構造函數,如下:
[java] view plain copyprint?
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ![]()
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}調用Executors方法中的幾個方法,如newCachedThreadPool()、newFixedThreadPool()時,都會間接調用上面的構造方法來初始化所有的線程池相關變量。
1、創建線程池并執行任務
有了Executor對象后,就可以調用execute()方法執行任務了。方法的源代碼如下:
[java] view plain copyprint?
public void execute(Runnable command) { if (command == null) // 任務為null,則拋出異常 throw new NullPointerException(); int c = ctl.get(); // 取出記錄著runState和workerCount 的 ctl的當前值 /* * 通過workerCountOf方法從ctl所表示的int值中提取出低29位的值,也就是當前活動的線程數。如果當前 * 活動的線程數少于corePoolSize,則通過addWorker(command, true)新建一個線程,并將任務(command) * 添加到該線程中 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker()返回值表示: * 1、true 表示需要檢測當前運行的線程是否小于corePoolSize * 2、false 表示需要檢測當前運行的線程數量是否小于maxPoolSize */ if (addWorker(command, true)) return; // 新線程創建成功,終止該方法的執行 c = ctl.get(); // 任務添加到線程失敗,取出記錄著runState和workerCount 的 ctl的當前值 } /* * 方法解釋: * isRunning(c) 當前線程池是否處于運行狀態。源代碼是通過判斷c < SHUTDOWN 來確定返回值。由于RUNNING才會接收新任務,且只有這個值-1才小于SHUTDOWN * workQueue.offer(command) 任務添加到緩沖隊列 */ if (isRunning(c) && workQueue.offer(command)) {// 當前線程處于運行狀態且成功添加到緩沖隊列 int recheck = ctl.get(); /* * 如果 線程池已經處于非運行狀態,則從緩沖隊列中移除任務然后采用線程池指定的策略拒絕任務 * 如果 線程池中任務數量為0,則通過addWorker(null, false)嘗試新建一個線程,新建線程對應的任務為null */ if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 得到活動線程數為0 addWorker(null, false); } /* * 當不滿足以下兩個條件時執行如下代碼: * 1. 當前線程池并不處于Running狀態 * 2. 當前線程池處于Running狀態,但是緩沖隊列已經滿了 */ else if (!addWorker(command, false)) reject(command); // 采用線程池指定的策略拒絕任務 } ![]()
public void execute(Runnable command) { if (command == null) // 任務為null,則拋出異常 throw new NullPointerException(); int c = ctl.get(); // 取出記錄著runState和workerCount 的 ctl的當前值 /* * 通過workerCountOf方法從ctl所表示的int值中提取出低29位的值,也就是當前活動的線程數。如果當前 * 活動的線程數少于corePoolSize,則通過addWorker(command, true)新建一個線程,并將任務(command) * 添加到該線程中 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker()返回值表示: * 1、true 表示需要檢測當前運行的線程是否小于corePoolSize * 2、false 表示需要檢測當前運行的線程數量是否小于maxPoolSize */ if (addWorker(command, true)) return; // 新線程創建成功,終止該方法的執行 c = ctl.get(); // 任務添加到線程失敗,取出記錄著runState和workerCount 的 ctl的當前值 } /* * 方法解釋: * isRunning(c) 當前線程池是否處于運行狀態。源代碼是通過判斷c < SHUTDOWN 來確定返回值。由于RUNNING才會接收新任務,且只有這個值-1才小于SHUTDOWN * workQueue.offer(command) 任務添加到緩沖隊列 */ if (isRunning(c) && workQueue.offer(command)) {// 當前線程處于運行狀態且成功添加到緩沖隊列 int recheck = ctl.get(); /* * 如果 線程池已經處于非運行狀態,則從緩沖隊列中移除任務然后采用線程池指定的策略拒絕任務 * 如果 線程池中任務數量為0,則通過addWorker(null, false)嘗試新建一個線程,新建線程對應的任務為null */ if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 得到活動線程數為0 addWorker(null, false); } /* * 當不滿足以下兩個條件時執行如下代碼: * 1. 當前線程池并不處于Running狀態 * 2. 當前線程池處于Running狀態,但是緩沖隊列已經滿了 */ else if (!addWorker(command, false)) reject(command); // 采用線程池指定的策略拒絕任務 } 當前活動的線程小于corePoolSize了,那么等于和大于corePoolSize怎么處理呢?
1> 當前活動的線程數量 >= corePoolSize 的時候,都是優先添加到隊列中,直到隊列滿了才會去創建新的線程,在這里第20行的if語句已經體現出來了。這里利用了&&的特性,只有當第一個條件會真時才會去判斷第二個條件,第一個條件是isRunning(),判斷線程池是否處于RUNNING狀態,因為只有在這個狀態下才會接受新任務,否則就拒絕,如果正處于RUNNING狀態,那么就加入隊列,如果加入失敗可能就是隊列已經滿了,這時候直接執行第29行。
2> 在execute()方法中,當 當前活動的線程數量 < corePoolSize 時,會執行addWorker()方法,關于addWorker(),它是用來直接新建線程用的,之所以叫addWorker而不是addThread是因為在線程池中,所有的線程都用一個Worker對象包裝著,來看一下這個方法:
[java] view plain copyprint?
/** * 創建并執行新線程 * @param firstTack 用于指定新增的線程執行的第一個任務 * * @param core true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize, * false表示新增線程前需要判斷當前活動線程數是否少于maximumPoolSize * * @return 是否成功新增一個線程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取記錄著runState和workCount的int變量的當前值 int rs = runStateOf(c); // 獲取當前線程池運行的狀態 /* 這個條件代表著以下幾個情景,就直接返回false說明線程創建失?。?nbsp; 1.rs > SHUTDOWN; 此時不再接收新任務,且所有的任務已經執行完畢 2.rs = SHUTDOWN; 此時不再接收新任務,但是會執行隊列中的任務,在后買年的或語句中,第一個不成立,firstTask != null成立 3.rs = SHUTDOWN;此時不再接收新任務,fistTask == null,任務隊列workQueue已經空了 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取當前活動的線程數 int wc = workerCountOf(c); //先判斷當前活動的線程數是否大于最大值,如果超過了就直接返回false說明線程創建失敗 //如果沒有超過再根據core的值再進行以下判斷 /* 1.core為true,則判斷當前活動的線程數是否大于corePoolSize 2.core為false,則判斷當前活動線程數是否大于maximumPoolSize */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //比較當前值是否和c相同,如果相同,則改為c+1,并且跳出大循環,直接執行Worker進行線程創建 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 獲取ctl的當前值 if (runStateOf(c) != rs) //檢查下當前線程池的狀態是否已經發生改變 continue retry; //如果已經改變了,則進行外層retry大循環,否則只進行內層的循環 // else CAS failed due to workerCount change; retry inner loop } } //下面這里就是開始創建新的線程了 //Worker的也是Runnable的實現類 Worker w = new Worker(firstTask); //因為不可以直接在Worker的構造方法中進行線程創建 //所以要把它的引用賦給t方便后面進行線程創建 Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //再次取出ctl的當前值,用于進行狀態的檢查,防止線程池的已經狀態改變了 int c = ctl.get(); int rs = runStateOf(c); //將if語句中的條件轉換為一個等價實現 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null)) //有個t == null是因為如果使用的是默認的ThreadFactory的話,那么它的newThread()可能會返回null /* 1. 如果t == null, 則減少一個線程數,如果線程池處于的狀態 > SHUTDOWN,則嘗試終止線程池 2. 如果t != null,且rs == SHUTDOWN,則不再接收新任務,若firstTask != null,則此時也是返回false,創建線程失敗 3. 如果t != null, 且rs > SHUTDOWN,同樣不再接受新任務,此時也是返回false,創建線程失敗 */ if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); //減少一個活動的當前線程數 tryTerminate(); //嘗試終止線程池 return false; //返回線程創建失敗 } workers.add(w); //將創建的線程添加到workers容器中 int s = workers.size(); //獲取當前線程活動的數量 if (s > largestPoolSize) //判斷當前線程活動的數量是否超過線程池最大的線程數量 largestPoolSize = s; //當池中的工作線程創新高時,會將這個數記錄到largestPoolSize字段中。然后就可以啟動這個線程t了 } finally { mainLock.unlock(); } t.start(); //開啟線程 //若start后,狀態又變成了SHUTDOWN狀態(如調用了shutdownNow方法)且新建的線程沒有被中斷過, //就要中斷該線程(shutdownNow方法要求中斷正在執行的線程), //shutdownNow方法本身也會去中斷存儲在workers中的所有線程 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; } ![]()
/** * 創建并執行新線程 * @param firstTack 用于指定新增的線程執行的第一個任務 * * @param core true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize, * false表示新增線程前需要判斷當前活動線程數是否少于maximumPoolSize * * @return 是否成功新增一個線程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取記錄著runState和workCount的int變量的當前值 int rs = runStateOf(c); // 獲取當前線程池運行的狀態 /* 這個條件代表著以下幾個情景,就直接返回false說明線程創建失?。? 1.rs > SHUTDOWN; 此時不再接收新任務,且所有的任務已經執行完畢 2.rs = SHUTDOWN; 此時不再接收新任務,但是會執行隊列中的任務,在后買年的或語句中,第一個不成立,firstTask != null成立 3.rs = SHUTDOWN;此時不再接收新任務,fistTask == null,任務隊列workQueue已經空了 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取當前活動的線程數 int wc = workerCountOf(c); //先判斷當前活動的線程數是否大于最大值,如果超過了就直接返回false說明線程創建失敗 //如果沒有超過再根據core的值再進行以下判斷 /* 1.core為true,則判斷當前活動的線程數是否大于corePoolSize 2.core為false,則判斷當前活動線程數是否大于maximumPoolSize */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //比較當前值是否和c相同,如果相同,則改為c+1,并且跳出大循環,直接執行Worker進行線程創建 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 獲取ctl的當前值 if (runStateOf(c) != rs) //檢查下當前線程池的狀態是否已經發生改變 continue retry; //如果已經改變了,則進行外層retry大循環,否則只進行內層的循環 // else CAS failed due to workerCount change; retry inner loop } } //下面這里就是開始創建新的線程了 //Worker的也是Runnable的實現類 Worker w = new Worker(firstTask); //因為不可以直接在Worker的構造方法中進行線程創建 //所以要把它的引用賦給t方便后面進行線程創建 Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //再次取出ctl的當前值,用于進行狀態的檢查,防止線程池的已經狀態改變了 int c = ctl.get(); int rs = runStateOf(c); //將if語句中的條件轉換為一個等價實現 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null)) //有個t == null是因為如果使用的是默認的ThreadFactory的話,那么它的newThread()可能會返回null /* 1. 如果t == null, 則減少一個線程數,如果線程池處于的狀態 > SHUTDOWN,則嘗試終止線程池 2. 如果t != null,且rs == SHUTDOWN,則不再接收新任務,若firstTask != null,則此時也是返回false,創建線程失敗 3. 如果t != null, 且rs > SHUTDOWN,同樣不再接受新任務,此時也是返回false,創建線程失敗 */ if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); //減少一個活動的當前線程數 tryTerminate(); //嘗試終止線程池 return false; //返回線程創建失敗 } workers.add(w); //將創建的線程添加到workers容器中 int s = workers.size(); //獲取當前線程活動的數量 if (s > largestPoolSize) //判斷當前線程活動的數量是否超過線程池最大的線程數量 largestPoolSize = s; //當池中的工作線程創新高時,會將這個數記錄到largestPoolSize字段中。然后就可以啟動這個線程t了 } finally { mainLock.unlock(); } t.start(); //開啟線程 //若start后,狀態又變成了SHUTDOWN狀態(如調用了shutdownNow方法)且新建的線程沒有被中斷過, //就要中斷該線程(shutdownNow方法要求中斷正在執行的線程), //shutdownNow方法本身也會去中斷存儲在workers中的所有線程 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }那么在創建線程的時候,線程執行的是什么的呢?
我們前面提到Worker繼承的其實也是Runnable,它在創建線程的時候是以自身作為任務傳進先創建的線程中的,這段比較簡單,我就不一一注釋了,只是給出源代碼給大家看吧。
Worker(Runnable firstTask) { this.firstTask = firstTask; //this指的是worker對象本身 this.thread = getThreadFactory().newThread(this); } 它以自身的對象作為線程任務傳進去,那么它的run方法又是怎樣的呢?
public void run() { runWorker(this); } 竟然只有一句話調用runWorker()方法,這個可是重頭戲,我們來看看,究竟運行的是什么。
[java] view plain copyprint?
/** * 執行Worker中的任務,它的執行流程是這樣的: * 若存在第一個任務,則先執行第一個任務,否則,從隊列中拿任務,不斷的執行, * 直到getTask()返回null或執行任務出錯(中斷或任務本身拋出異常),就退出while循環。 * @param w woker */ final void runWorker(Worker w) { Runnable task = w.firstTask; //將當前Worker中的任務取出來交給task,并釋放掉w.firstTask占用的內存 w.firstTask = null; //用于判斷線程是否由于異常終止,如果不是異常終止,在后面將會將該變量的值改為false //該變量的值在processWorkerExit()會使用來判斷線程是否由于異常終止 boolean completedAbruptly = true; try { //執行任務,直到getTask()返回的值為null,在此處就相當于復用了線程,讓線程執行了多個任務 while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun();//對線程池狀態進行一次判斷,后面我們會講解一下該方法 try { beforeExecute(w.thread, task); //在任務執行前需要做的邏輯方法,該方面可以由用戶進行重寫自定義 Throwable thrown = null; try { task.run(); //開始執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //在任務執行后需要做的邏輯方法,該方面可以由用戶進行重寫自定義 } } finally { task = null; w.completedTasks++; //增加該線程完成的任務 w.unlock(); } } completedAbruptly = false; //線程不是異常終止 } finally { processWorkerExit(w, completedAbruptly); //結束該線程 } } ![]()
/** * 執行Worker中的任務,它的執行流程是這樣的: * 若存在第一個任務,則先執行第一個任務,否則,從隊列中拿任務,不斷的執行, * 直到getTask()返回null或執行任務出錯(中斷或任務本身拋出異常),就退出while循環。 * @param w woker */ final void runWorker(Worker w) { Runnable task = w.firstTask; //將當前Worker中的任務取出來交給task,并釋放掉w.firstTask占用的內存 w.firstTask = null; //用于判斷線程是否由于異常終止,如果不是異常終止,在后面將會將該變量的值改為false //該變量的值在processWorkerExit()會使用來判斷線程是否由于異常終止 boolean completedAbruptly = true; try { //執行任務,直到getTask()返回的值為null,在此處就相當于復用了線程,讓線程執行了多個任務 while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun();//對線程池狀態進行一次判斷,后面我們會講解一下該方法 try { beforeExecute(w.thread, task); //在任務執行前需要做的邏輯方法,該方面可以由用戶進行重寫自定義 Throwable thrown = null; try { task.run(); //開始執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //在任務執行后需要做的邏輯方法,該方面可以由用戶進行重寫自定義 } } finally { task = null; w.completedTasks++; //增加該線程完成的任務 w.unlock(); } } completedAbruptly = false; //線程不是異常終止 } finally { processWorkerExit(w, completedAbruptly); //結束該線程 } }下面就是線程在執行任務之前對線程池狀態的一次判斷:[java] view plain copyprint?
/** * 對線程的結束做一些清理和數據同步 * @param w 封裝線程的Worker * @param completedAbruptly 表示該線程是否結束于異常 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值為true,則說明線程是結束于異常 //如果不是結束于異常,那么它降在runWorker方法的while循環中的getTask()方法中已經減一了 if (completedAbruptly) decrementWorkerCount(); //此時將線程數量減一 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //統計總共完成的任務數 workers.remove(w); //將該線程數從workers容器中移除 } finally { mainLock.unlock(); } tryTerminate(); //嘗試終止線程池 int c = ctl.get(); //接下來的這個if塊要做的事兒了。當池的狀態還是RUNNING, //又要分兩種情況,一種是異常結束,一種是正常結束。異常結束比較好弄,直接加個線程替換死掉的線程就好了, //也就是最后的addWorker操作 if (runStateLessThan(c, STOP)) { //如果當前運行狀態為RUNNING,SHUTDOWN if (!completedAbruptly) { //如果線程不是結束于異常 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //是否允許線程超時結束 if (min == 0 && ! workQueue.isEmpty()) //如果允許把那個且隊列不為空 min = 1; //至少要保留一個線程來完成任務 //如果當前活動的線程數大于等于最小的值 // 1.不允許核心線程超時結束,則必須要使得活動線程數超過corePoolSize數才可以 // 2. 允許核心線程超時結束,但是隊列中有任務,必須留至少一個線程 if (workerCountOf(c) >= min) return; // replacement not needed } //直接加個線程 addWorker(null, false); } } ![]()
/** * 對線程的結束做一些清理和數據同步 * @param w 封裝線程的Worker * @param completedAbruptly 表示該線程是否結束于異常 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值為true,則說明線程是結束于異常 //如果不是結束于異常,那么它降在runWorker方法的while循環中的getTask()方法中已經減一了 if (completedAbruptly) decrementWorkerCount(); //此時將線程數量減一 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //統計總共完成的任務數 workers.remove(w); //將該線程數從workers容器中移除 } finally { mainLock.unlock(); } tryTerminate(); //嘗試終止線程池 int c = ctl.get(); //接下來的這個if塊要做的事兒了。當池的狀態還是RUNNING, //又要分兩種情況,一種是異常結束,一種是正常結束。異常結束比較好弄,直接加個線程替換死掉的線程就好了, //也就是最后的addWorker操作 if (runStateLessThan(c, STOP)) { //如果當前運行狀態為RUNNING,SHUTDOWN if (!completedAbruptly) { //如果線程不是結束于異常 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //是否允許線程超時結束 if (min == 0 && ! workQueue.isEmpty()) //如果允許把那個且隊列不為空 min = 1; //至少要保留一個線程來完成任務 //如果當前活動的線程數大于等于最小的值 // 1.不允許核心線程超時結束,則必須要使得活動線程數超過corePoolSize數才可以 // 2. 允許核心線程超時結束,但是隊列中有任務,必須留至少一個線程 if (workerCountOf(c) >= min) return; // replacement not needed } //直接加個線程 addWorker(null, false); } }前面我們的方法遇見過很多次tryTerminate()方法,到底他是怎樣嘗試結束線程池的呢?[java] view plain copyprint?
/** * 執行該方法,根據線程池狀態進行 判斷是否結束線程池 */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || //線程池正在運行中,自然不能結束線程池啦 runStateAtLeast(c, TIDYING) || //如果狀態為TIDYING或TERMINATED,池中的活動線程數已經是0,自然也不需要做什么操作了 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //線程池出于SHUTDOWN狀態,但是任務隊列不為空,自然不能結束線程池啦 return; if (workerCountOf(c) != 0) { // Eligible to terminate /* 調用這個方法的目的是將shutdown信號傳播給其它線程。 調用shutdown方法的時候會去中斷所有空閑線程,如果這時候池中所有的線程都正在執行任務, 那么就不會有線程被中斷,調用shutdown方法只是設置了線程池的狀態為SHUTDOWN, 在取任務(getTask,后面會細說)的時候,假如很多線程都發現隊列里還有任務(沒有使用鎖,存在競態條件), 然后都去調用take,如果任務數小于池中的線程數,那么必然有方法調用take后會一直等待(shutdown的時候這些線程正在執行任務, 所以沒能調用它的interrupt,其中斷狀態沒有被設置),那么在沒有任務且線程池的狀態為SHUTDWON的時候, 這些等待中的空閑線程就需要被終止iinterruptIdleWorkers(ONLY_ONE)回去中斷一個線程,讓其從take中退出, 然后這個線程也進入同樣的邏輯,去終止一個其它空閑線程,直到池中的活動線程數為0。 */ interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* 當狀態為SHUTDOWN,且活動線程數為0的時候,就可以進入TIDYING狀態了, 進入TIDYING狀態就可以執行方法terminated(), 該方法執行結束就進入了TERMINATED狀態(參考前文中各狀態的含義以及可能的狀態轉變) */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); //執行該方法,結束線程池 } finally { ctl.set(ctlOf(TERMINATED, 0)); /* 當線程池shutdown后,外部可能還有很多線程在等待線程池真正結束, 即調用了awaitTermination方法,該方法中,外部線程就是在termination上await的, 所以,線程池關閉之前要喚醒這些等待的線程,告訴它們線程池關閉結束了。 */ termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } ![]()
/** * 執行該方法,根據線程池狀態進行 判斷是否結束線程池 */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || //線程池正在運行中,自然不能結束線程池啦 runStateAtLeast(c, TIDYING) || //如果狀態為TIDYING或TERMINATED,池中的活動線程數已經是0,自然也不需要做什么操作了 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //線程池出于SHUTDOWN狀態,但是任務隊列不為空,自然不能結束線程池啦 return; if (workerCountOf(c) != 0) { // Eligible to terminate /* 調用這個方法的目的是將shutdown信號傳播給其它線程。 調用shutdown方法的時候會去中斷所有空閑線程,如果這時候池中所有的線程都正在執行任務, 那么就不會有線程被中斷,調用shutdown方法只是設置了線程池的狀態為SHUTDOWN, 在取任務(getTask,后面會細說)的時候,假如很多線程都發現隊列里還有任務(沒有使用鎖,存在競態條件), 然后都去調用take,如果任務數小于池中的線程數,那么必然有方法調用take后會一直等待(shutdown的時候這些線程正在執行任務, 所以沒能調用它的interrupt,其中斷狀態沒有被設置),那么在沒有任務且線程池的狀態為SHUTDWON的時候, 這些等待中的空閑線程就需要被終止iinterruptIdleWorkers(ONLY_ONE)回去中斷一個線程,讓其從take中退出, 然后這個線程也進入同樣的邏輯,去終止一個其它空閑線程,直到池中的活動線程數為0。 */ interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* 當狀態為SHUTDOWN,且活動線程數為0的時候,就可以進入TIDYING狀態了, 進入TIDYING狀態就可以執行方法terminated(), 該方法執行結束就進入了TERMINATED狀態(參考前文中各狀態的含義以及可能的狀態轉變) */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); //執行該方法,結束線程池 } finally { ctl.set(ctlOf(TERMINATED, 0)); /* 當線程池shutdown后,外部可能還有很多線程在等待線程池真正結束, 即調用了awaitTermination方法,該方法中,外部線程就是在termination上await的, 所以,線程池關閉之前要喚醒這些等待的線程,告訴它們線程池關閉結束了。 */ termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }2、關閉線程池關閉時使用shutdown()方法,源碼如下:[java] view plain copyprint?
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownaccess(); // 檢查終止線程池的線程是否有權限。 advanceRunState(SHUTDOWN);// 設置線程池的狀態為關閉狀態。 interruptIdleWorkers(); // 中斷線程池中空閑的線程 onShutdown(); // 鉤子函數,在ThreadPoolExecutor中沒有任何動作 } finally { mainLock.unlock(); } tryTerminate(); // 嘗試終止線程池 } ![]()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 檢查終止線程池的線程是否有權限。 advanceRunState(SHUTDOWN);// 設置線程池的狀態為關閉狀態。 interruptIdleWorkers(); // 中斷線程池中空閑的線程 onShutdown(); // 鉤子函數,在ThreadPoolExecutor中沒有任何動作 } finally { mainLock.unlock(); } tryTerminate(); // 嘗試終止線程池 }轉載自http://blog.csdn.net/mazhimazh/