亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

深入理解Java編程線程池的實現原理

2024-07-13 10:13:57
字體:
來源:轉載
供稿:網友

在前面的文章中,我們使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:

如果并發的線程數量很多,并且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。

那么有沒有一種辦法使得線程可以復用,就是執行完一個任務,并不被銷毀,而是可以繼續執行其他的任務?

在Java中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然后再講述它的實現原理,接著給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。

以下是本文的目錄大綱:

一.Java中的ThreadPoolExecutor類

二.深入剖析線程池實現原理

三.使用示例

四.如何合理配置線程池的大小

若有不正之處請多多諒解,并歡迎批評指正。

一.Java中的ThreadPoolExecutor類

java/79123.html">java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。

在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {  .....  public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,      BlockingQueue<Runnable> workQueue);   public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,      BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);   public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,      BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);   public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,    BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);  ...}

從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

下面解釋下一下構造器中各個參數的含義:

corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS;        //天TimeUnit.HOURS;       //小時TimeUnit.MINUTES;      //分鐘TimeUnit.SECONDS;      //秒TimeUnit.MILLISECONDS;   //毫秒TimeUnit.MICROSECONDS;   //微妙TimeUnit.NANOSECONDS;    //納秒

workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:

ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

threadFactory:線程工廠,主要用來創建線程;
handler:表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

具體參數的配置與線程池的關系將在下一節講述。

從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:

public abstract class AbstractExecutorService implements ExecutorService {   protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };  public Future<?> submit(Runnable task) {};  public <T> Future<T> submit(Runnable task, T result) { };  public <T> Future<T> submit(Callable<T> task) { };  private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,              boolean timed, long nanos)    throws InterruptedException, ExecutionException, TimeoutException {  };  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)    throws InterruptedException, ExecutionException {  };  public <T> T invokeAny(Collection<? extends Callable<T>> tasks,              long timeout, TimeUnit unit)    throws InterruptedException, ExecutionException, TimeoutException {  };  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)    throws InterruptedException {  };  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                     long timeout, TimeUnit unit)    throws InterruptedException {  };}

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

我們接著看ExecutorService接口的實現:

public interface ExecutorService extends Executor {   void shutdown();  boolean isShutdown();  boolean isTerminated();  boolean awaitTermination(long timeout, TimeUnit unit)    throws InterruptedException;  <T> Future<T> submit(Callable<T> task);  <T> Future<T> submit(Runnable task, T result);  Future<?> submit(Runnable task);  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)    throws InterruptedException;  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                 long timeout, TimeUnit unit)    throws InterruptedException;   <T> T invokeAny(Collection<? extends Callable<T>> tasks)    throws InterruptedException, ExecutionException;  <T> T invokeAny(Collection<? extends Callable<T>> tasks,          long timeout, TimeUnit unit)    throws InterruptedException, ExecutionException, TimeoutException;}

而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:

public interface Executor {  void execute(Runnable command);}

到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。

Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

然后ThreadPoolExecutor繼承了類AbstractExecutorService。

在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()submit()shutdown()shutdownNow()

execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

shutdown()和shutdownNow()是用來關閉線程池的。

還有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。

二.深入剖析線程池實現原理

在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現原理,將從下面幾個方面講解:

1.線程池狀態

  2.任務的執行

  3.線程池中的線程初始化

  4.任務緩存隊列及排隊策略

  5.任務拒絕策略

  6.線程池的關閉

  7.線程池容量的動態調整

1.線程池狀態

在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

volatile int runState;static final int RUNNING  = 0;static final int SHUTDOWN  = 1;static final int STOP    = 2;static final int TERMINATED = 3;

runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

下面的幾個static final變量表示runState可能的幾個取值。

當創建線程池后,初始時,線程池處于RUNNING狀態;

如果調用了shutdown()方法,則線程池處于SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

如果調用了shutdownNow()方法,則線程池處于STOP狀態,此時線程池不能接受新的任務,并且會去嘗試終止正在執行的任務;

當線程池處于SHUTDOWN或STOP狀態,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。

2.任務的執行

在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;       //任務緩存隊列,用來存放等待執行的任務private final ReentrantLock mainLock = new ReentrantLock();  //線程池的主要狀態鎖,對線程池狀態(比如線程池大小                               //、runState等)的改變都要使用這個鎖private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集 private volatile long keepAliveTime;  //線程存貨時間  private volatile boolean allowCoreThreadTimeOut;  //是否允許為核心線程設置存活時間private volatile int  corePoolSize;   //核心池的大?。淳€程池中的線程數目大于這個參數時,提交的任務會被放進任務緩存隊列)private volatile int  maximumPoolSize;  //線程池最大能容忍的線程數 private volatile int  poolSize;    //線程池中當前的線程數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory;  //線程工廠,用來創建線程 private int largestPoolSize;  //用來記錄線程池中曾經出現過的最大線程數 private long completedTaskCount;  //用來記錄已經執行完畢的任務個數

每個變量的作用都已經標明出來了,這里要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:

假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。

因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;

當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;

如果說新任務數目增長的速度遠遠大于工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;

然后就將任務也分配給這4個臨時工人做;

如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。

當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

不過為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。

largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。

下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。

在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法里面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:

public void execute(Runnable command) {  if (command == null)    throw new NullPointerException();  if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {    if (runState == RUNNING && workQueue.offer(command)) {      if (runState != RUNNING || poolSize == 0)        ensureQueuedTaskHandled(command);    }    else if (!addIfUnderMaximumPoolSize(command))      reject(command); // is shutdown or saturated  }}

上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:

首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;

接著是這句,這句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

由于是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小于核心池大小,那么就會直接進入下面的if語句塊了。

如果線程池中當前線程數小于核心池大小,則接著執行后半部分,也就是執行

addIfUnderCorePoolSize(command)

如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。

如果執行完addIfUnderCorePoolSize這個方法返回false,然后接著判斷:

if (runState == RUNNING && workQueue.offer(command))

如果當前線程池處于RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處于RUNNING狀態或者任務放入緩存隊列失敗,則執行:

addIfUnderMaximumPoolSize(command)

如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

回到前面:

if (runState == RUNNING && workQueue.offer(command))

這句的執行,如果說當前線程池處于RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:

if (runState != RUNNING || poolSize == 0)

這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:

ensureQueuedTaskHandled(command)

進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。

我們接著看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {  Thread t = null;  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    if (poolSize < corePoolSize && runState == RUNNING)      t = addThread(firstTask);    //創建線程去執行firstTask任務      } finally {    mainLock.unlock();  }  if (t == null)    return false;  t.start();  return true;}

這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低于核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小于核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小于核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中并沒有加鎖,因此可能在execute方法判斷的時候poolSize小于corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務,就可能導致poolSize不小于corePoolSize了,所以需要在這個地方繼續判斷。然后接著判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然后就是執行

t = addThread(firstTask);

這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然后接著在下面判斷t是否為空,為空則表明創建線程失?。磒oolSize>=corePoolSize或者runState不等于RUNNING),否則調用t.start()方法啟動線程。

我們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) {  Worker w = new Worker(firstTask);  Thread t = threadFactory.newThread(w); //創建一個線程,執行任務    if (t != null) {    w.thread = t;      //將創建的線程的引用賦值為w的成員變量        workers.add(w);    int nt = ++poolSize;   //當前線程數加1        if (nt > largestPoolSize)      largestPoolSize = nt;  }  return t;}

在addThread方法中,首先用提交的任務創建了一個Worker對象,然后調用線程工廠threadFactory創建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接著通過workers.add(w)將Worker對象添加到工作集當中。

下面我們看一下Worker類的實現:

private final class Worker implements Runnable {  private final ReentrantLock runLock = new ReentrantLock();  private Runnable firstTask;  volatile long completedTasks;  Thread thread;  Worker(Runnable firstTask) {    this.firstTask = firstTask;  }  boolean isActive() {    return runLock.isLocked();  }  void interruptIfIdle() {    final ReentrantLock runLock = this.runLock;    if (runLock.tryLock()) {      try {    if (thread != Thread.currentThread())    thread.interrupt();      } finally {        runLock.unlock();      }    }  }  void interruptNow() {    thread.interrupt();  }   private void runTask(Runnable task) {    final ReentrantLock runLock = this.runLock;    runLock.lock();    try {      if (runState < STOP &&        Thread.interrupted() &&        runState >= STOP)      boolean ran = false;      beforeExecute(thread, task);  //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據      //自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等            try {        task.run();        ran = true;        afterExecute(task, null);        ++completedTasks;      } catch (RuntimeException ex) {        if (!ran)          afterExecute(task, ex);        throw ex;      }    } finally {      runLock.unlock();    }  }   public void run() {    try {      Runnable task = firstTask;      firstTask = null;      while (task != null || (task = getTask()) != null) {        runTask(task);        task = null;      }    } finally {      workerDone(this);  //當任務隊列中沒有任務時,進行清理工作        }  }}

它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:

Thread t = new Thread(w);

相當于傳進去了一個Runnable任務,在線程t中執行這個Runnable。

既然Worker實現了Runnable接口,那么自然最核心的方法便是run()方法了:

public void run() {  try {    Runnable task = firstTask;    firstTask = null;    while (task != null || (task = getTask()) != null) {      runTask(task);      task = null;    }  } finally {    workerDone(this);  }}

從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之后,在while循環里面不斷通過getTask()去取新的任務來執行,那么去哪里取呢?自然是從任務緩存隊列里面去取,getTask是ThreadPoolExecutor類中的方法,并不是Worker類中的方法,下面是getTask方法的實現:

Runnable getTask() {  for (;;) {    try {      int state = runState;      if (state > SHUTDOWN)        return null;      Runnable r;      if (state == SHUTDOWN) // Help drain queue        r = workQueue.poll();      else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大于核心池大小或者允許為核心池線程設置空閑時間,        //則通過poll取任務,若等待一定的時間取不到任務,則返回null        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);      else        r = workQueue.take();      if (r != null)        return r;      if (workerCanExit()) {  //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出        if (runState >= SHUTDOWN) // Wake up others          interruptIdleWorkers();  //中斷處于空閑狀態的worker        return null;      }      // Else retry    } catch (InterruptedException ie) {      // On interruption, re-check runState    }  }

在getTask中,先判斷當前線程池狀態,如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。

如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。

如果當前線程池的線程數大于核心池大小corePoolSize或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。

然后判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:

private boolean workerCanExit() {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  boolean canExit;  //如果runState大于等于STOP,或者任務緩存隊列為空了  //或者 允許為核心池線程設置空閑存活時間并且線程池中的線程數目大于1  try {    canExit = runState >= STOP ||      workQueue.isEmpty() ||      (allowCoreThreadTimeOut &&       poolSize > Math.max(1, corePoolSize));  } finally {    mainLock.unlock();  }  return canExit;}

也就是說如果線程池處于STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間并且線程數大于1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處于空閑狀態的worker,我們看一下interruptIdleWorkers()的實現:

void interruptIdleWorkers() {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    for (Worker w : workers) //實際上調用的是worker的interruptIfIdle()方法      w.interruptIfIdle();  } finally {    mainLock.unlock();  }}

從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {  final ReentrantLock runLock = this.runLock;  if (runLock.tryLock()) {  //注意這里,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的                //如果成功獲取了鎖,說明當前worker處于空閑狀態    try {  if (thread != Thread.currentThread())   thread.interrupt();    } finally {      runLock.unlock();    }  }}

這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這里,并沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程去任務緩存隊列里面取任務來執行。

我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小并且往任務隊列中添加任務失敗的情況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {  Thread t = null;  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    if (poolSize < maximumPoolSize && runState == RUNNING)      t = addThread(firstTask);  } finally {    mainLock.unlock();  }  if (t == null)    return false;  t.start();  return true;}

看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下:

1)首先,要清楚corePoolSize和maximumPoolSize的含義;

2)其次,要知道Worker是用來起到什么作用的;

3)要知道任務提交給線程池之后的處理策略,這里總結一下主要有4點:

如果當前線程池中的線程數目小于corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失?。ㄒ话銇碚f是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

3.線程池中的線程初始化

默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。

在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:

prestartCoreThread():初始化一個核心線程;
prestartAllCoreThreads():初始化所有核心線程
下面是這2個方法的實現:

public boolean prestartCoreThread() {  return addIfUnderCorePoolSize(null); //注意傳進去的參數是null} public int prestartAllCoreThreads() {  int n = 0;  while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null    ++n;  return n;}

注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最后執行線程會阻塞在getTask方法中的

r = workQueue.take();

即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

workQueue的類型為BlockingQueue<Runnable>,通??梢匀∠旅嫒N類型:

1)ArrayBlockingQueue:基于數組的先進先出隊列,此隊列創建時必須指定大?。?/p>

2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;

3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

ThreadPoolExecutor提供了兩個方法,用于線程池的關閉,分別是shutdown()和shutdownNow(),其中:

shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,返回尚未執行的任務
7.線程池容量的動態調整

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

setCorePoolSize:設置核心池大小
setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。

三.使用示例

前面我們討論了關于線程池的實現原理,這一節我們來看一下它的具體使用:

public class Test {   public static void main(String[] args) {       ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,         new ArrayBlockingQueue<Runnable>(5));      for(int i=0;i<15;i++){       MyTask myTask = new MyTask(i);       executor.execute(myTask);       System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+       executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());     }     executor.shutdown();   }} class MyTask implements Runnable {  private int taskNum;   public MyTask(int num) {    this.taskNum = num;  }   @Override  public void run() {    System.out.println("正在執行task "+taskNum);    try {      Thread.currentThread().sleep(4000);    } catch (InterruptedException e) {      e.printStackTrace();    }    System.out.println("task "+taskNum+"執行完畢");  }}

執行結果:

正在執行task 0線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0正在執行task 1線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0正在執行task 2線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0正在執行task 3線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0正在執行task 4線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0正在執行task 10線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0正在執行task 11線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0正在執行task 12線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0正在執行task 13線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0正在執行task 14task 3執行完畢task 0執行完畢task 2執行完畢task 1執行完畢正在執行task 8正在執行task 7正在執行task 6正在執行task 5task 4執行完畢task 10執行完畢task 11執行完畢task 13執行完畢task 12執行完畢正在執行task 9task 14執行完畢task 8執行完畢task 5執行完畢task 7執行完畢task 6執行完畢task 9執行完畢

從執行結果可以看出,當線程池中線程的數目大于5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。

不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:

Executors.newCachedThreadPool();    //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUEExecutors.newSingleThreadExecutor();  //創建容量為1的緩沖池Executors.newFixedThreadPool(int);  //創建固定容量大小的緩沖池

下面是這三個靜態方法的具體實現;

public static ExecutorService newFixedThreadPool(int nThreads) {  return new ThreadPoolExecutor(nThreads, nThreads,                 0L, TimeUnit.MILLISECONDS,                 new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() {  return new FinalizableDelegatedExecutorService    (new ThreadPoolExecutor(1, 1,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newCachedThreadPool() {  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                 60L, TimeUnit.SECONDS,                 new SynchronousQueue<Runnable>());}

從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;

newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。

實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

四.如何合理配置線程池的大小

本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

一般需要根據任務的類型來配置線程池大?。?/p>

如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1

如果是IO密集型任務,參考值可以設置為2*NCPU

當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

總結

以上就是本文關于深入理解Java編程線程池的實現原理的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復大家的。感謝朋友們對本站的支持!


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美日韩免费一区| 黄色成人在线播放| 日韩第一页在线| 欧美成人在线免费视频| 久久久久国产精品免费| 欧美日韩综合视频网址| 欧美视频中文在线看| 日韩高清电影免费观看完整版| 日韩成人av在线| 97久久精品人搡人人玩| 91高清免费视频| 欧美日韩另类视频| 久久久久久久久亚洲| 亚洲香蕉成视频在线观看| 91免费看片在线| 视频一区视频二区国产精品| 亚洲美腿欧美激情另类| 亚洲国产精品成人va在线观看| 最近2019中文字幕第三页视频| 欧美国产极速在线| 久久精品视频中文字幕| 欧美性猛交xxxx富婆| 亚洲直播在线一区| 在线亚洲午夜片av大片| 欧美美女操人视频| 国产日韩中文字幕在线| 亚洲欧洲美洲在线综合| 国产精品盗摄久久久| 亚洲国产欧美在线成人app| 亚洲成人激情小说| 欧美性猛交xxxx乱大交极品| 亚洲影院色在线观看免费| 福利微拍一区二区| 国产v综合v亚洲欧美久久| 欧美最猛性xxxxx亚洲精品| 国产日韩欧美在线| 国产精品欧美激情在线播放| 亚洲精品女av网站| 日本免费一区二区三区视频观看| 国产欧美欧洲在线观看| 欧美黑人xxxx| 国产欧美精品在线播放| 2019中文在线观看| 国产69精品99久久久久久宅男| 欧美激情欧美激情| 欧美成人免费一级人片100| 在线成人激情视频| 亚洲一区二区黄| 亚洲人a成www在线影院| 亚洲国产97在线精品一区| 97视频在线观看视频免费视频| 国产精品自产拍高潮在线观看| 久久精品这里热有精品| 国产精品稀缺呦系列在线| 久热精品视频在线观看一区| 中文字幕亚洲欧美日韩在线不卡| 亚洲成人久久久| 青青草成人在线| 中文字幕无线精品亚洲乱码一区| 色婷婷av一区二区三区在线观看| 97国产精品人人爽人人做| 国产成人精品最新| 中文字幕亚洲综合久久| 国自产精品手机在线观看视频| 91久久久久久久一区二区| 久久久噜噜噜久久久| 国产精品欧美日韩一区二区| 日韩视频免费观看| 亚洲精品一区二三区不卡| 日韩av片免费在线观看| 国产精品r级在线| 亚洲日本aⅴ片在线观看香蕉| 一区二区欧美激情| 一区二区三区四区视频| 亚洲国产精久久久久久久| 亚洲电影免费观看高清| 亚洲精品永久免费精品| 欧美成人一区二区三区电影| 久久躁狠狠躁夜夜爽| 亚洲性69xxxbbb| 国内精品久久久| 亚洲在线一区二区| 欧美日韩国产综合视频在线观看中文| 亚洲国产精品va在线看黑人| 日韩高清人体午夜| 欧洲美女免费图片一区| 亚洲精品久久在线| 亚洲国产精品久久久久秋霞蜜臀| 欧美成人一区二区三区电影| 国产午夜一区二区| 欧美激情手机在线视频| 亚洲精品电影网在线观看| 亚洲日本aⅴ片在线观看香蕉| 久久精品99久久香蕉国产色戒| 粉嫩av一区二区三区免费野| 午夜精品久久久久久久99热| 福利视频第一区| 国产精品jizz在线观看麻豆| 亚洲色图第一页| 久久久欧美精品| 成人中文字幕在线观看| 国产精品国产三级国产专播精品人| 亚洲最大成人在线| 国产精品极品尤物在线观看| 国产亚洲美女久久| 亚洲老板91色精品久久| 日本一区二区不卡| 欧美在线免费视频| 国产成人91久久精品| 亚洲欧美日韩综合| 91成人精品网站| 欧美一区亚洲一区| 韩剧1988在线观看免费完整版| 国外成人在线视频| 7m精品福利视频导航| 欧美激情视频一区二区| 亚洲福利视频在线| 精品五月天久久| 欧美中文在线免费| 午夜精品国产精品大乳美女| 大量国产精品视频| 精品久久久久久国产| 高清日韩电视剧大全免费播放在线观看| 亚洲欧美日韩精品久久奇米色影视| 亚洲已满18点击进入在线看片| 在线免费观看羞羞视频一区二区| 国产91精品久久久久久| 欧美成人在线影院| 亚洲无av在线中文字幕| 欧美日韩精品国产| 亚洲欧美在线第一页| 欧美亚洲伦理www| 国产一区二区视频在线观看| 亚洲综合在线小说| 欧美在线观看网站| 国产一区二区精品丝袜| www亚洲欧美| 成人淫片在线看| 日本一区二区在线播放| 国产精品女主播| 亚洲色图13p| 久久成人18免费网站| 91精品国产网站| 亚洲精品一区中文字幕乱码| 欧美精品成人91久久久久久久| 91高潮在线观看| 91av视频在线免费观看| 精品一区二区三区四区| 亚洲精品99久久久久中文字幕| 国产精品精品一区二区三区午夜版| 久久99久久99精品免观看粉嫩| 国产精品r级在线| 黄网站色欧美视频| 日韩欧美国产黄色| 国产91免费观看| 爽爽爽爽爽爽爽成人免费观看| 欧美专区在线观看| 欧美精品在线免费观看| 在线性视频日韩欧美| 2021久久精品国产99国产精品| 色yeye香蕉凹凸一区二区av| 51久久精品夜色国产麻豆| 欧美激情videos| 国产精品免费一区二区三区都可以|