亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

《java.util.concurrent 包源碼閱讀》10 線程池系列之AbstractExecutorService

2019-11-14 20:52:12
字體:
來源:轉載
供稿:網友
java.util.concurrent 包源碼閱讀》10 線程池系列之AbstractExecutorService

AbstractExecutorService對ExecutorService的執行任務類型的方法提供了一個默認實現。這些方法包括submit,invokeAny和InvokeAll。

注意的是來自Executor接口的execute方法是未被實現,execute方法是整個體系的核心,所有的任務都是在這個方法里被真正執行的,因此該方法的不同實現會帶來不同的執行策略。這個在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出來。

首先來看submit方法,它的基本邏輯是這樣的:

1. 生成一個任務類型和Future接口的包裝接口RunnableFuture的對象

2. 執行任務

3. 返回future。

    public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }

因為submit支持Callable和Runnable兩種類型的任務,因此newTaskFor方法有兩個重載方法:

    PRotected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {        return new FutureTask<T>(callable);    }    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {        return new FutureTask<T>(runnable, value);    }

上一篇文章里曾經說過Callable和Runnable的區別在于前者帶返回值,也就是說Callable=Runnable+返回值。因此java中提供了一種adapter,把Runnable+返回值轉換成Callable類型。這點可以在newTaskFor中的FutureTask類型的構造函數的代碼中看到:

    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        sync = new Sync(callable);    }    public FutureTask(Runnable runnable, V result) {        sync = new Sync(Executors.callable(runnable, result));    }

以下是Executors.callable方法的代碼:

    public static <T> Callable<T> callable(Runnable task, T result) {        if (task == null)            throw new NullPointerException();        return new RunnableAdapter<T>(task, result);    }

那么RunnableAdapter的代碼就很好理解了,它是一個Callable的實現,call方法的實現就是執行Runnable的run方法,然后返回那個value。

    static final class RunnableAdapter<T> implements Callable<T> {        final Runnable task;        final T result;        RunnableAdapter(Runnable task, T result) {            this.task = task;            this.result = result;        }        public T call() {            task.run();            return result;        }    }

接下來先說說較為簡單的invokeAll:

1. 為每個task調用newTaskFor方法生成得到一個既是Task也是Future的包裝類對象的List

2. 循環調用execute執行每個任務

3. 再次循環調用每個Future的get方法等待每個task執行完成

4. 最后返回Future的list。

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                                         long timeout, TimeUnit unit)        throws InterruptedException {        if (tasks == null || unit == null)            throw new NullPointerException();        long nanos = unit.toNanos(timeout);        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());        boolean done = false;        try {            // 為每個task生成包裝對象            for (Callable<T> t : tasks)                futures.add(newTaskFor(t));            long lastTime = System.nanoTime();            // 循環調用execute執行每個方法            // 這里因為設置了超時時間,所以每次執行完成后            // 檢查是否超時,超時了就直接返回future集合            Iterator<Future<T>> it = futures.iterator();            while (it.hasNext()) {                execute((Runnable)(it.next()));                long now = System.nanoTime();                nanos -= now - lastTime;                lastTime = now;                if (nanos <= 0)                    return futures;            }            // 等待每個任務執行完成            for (Future<T> f : futures) {                if (!f.isDone()) {                    if (nanos <= 0)                        return futures;                    try {                        f.get(nanos, TimeUnit.NANOSECONDS);                    } catch (CancellationException ignore) {                    } catch (ExecutionException ignore) {                    } catch (TimeoutException toe) {                        return futures;                    }                    long now = System.nanoTime();                    nanos -= now - lastTime;                    lastTime = now;                }            }            done = true;            return futures;        } finally {            if (!done)                for (Future<T> f : futures)                    f.cancel(true);        }    }

最后說說invokeAny,它的難點在于只要一個任務執行成功就要返回,并且會取消其他任務,也就是說重點在于找到第一個執行成功的任務。

這里我想到了BlockingQueue,當所有的任務被提交后,任務執行返回的Future會被依次添加到一個BlockingQueue中,然后找到第一個執行成功任務的方法就是從BlockingQueue取出第一個元素,這個就是doInvokeAny方法用到的ExecutorCompletionService的基本原理。

因為兩個invokeAny方法都是調用doInvokeAny方法,下面是doInvokeAny的代碼分析:

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,                            boolean timed, long nanos)        throws InterruptedException, ExecutionException, TimeoutException {        if (tasks == null)            throw new NullPointerException();        int ntasks = tasks.size();        if (ntasks == 0)            throw new IllegalArgumentException();        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);        // ExecutorCompletionService負責執行任務,后面調用用poll返回第一個執行結果        ExecutorCompletionService<T> ecs =            new ExecutorCompletionService<T>(this);        // 這里出于效率的考慮,每次提交一個任務之后,就檢查一下有沒有執行完成的任務        try {            ExecutionException ee = null;            long lastTime = timed ? System.nanoTime() : 0;            Iterator<? extends Callable<T>> it = tasks.iterator();            // 先提交一個任務            futures.add(ecs.submit(it.next()));            --ntasks;            int active = 1;            for (;;) {                // 嘗試獲取有沒有執行結果(這個結果是立刻返回的)                Future<T> f = ecs.poll();                // 沒有執行結果                if (f == null) {                    // 如果還有任務沒有被提交執行的,就再提交一個任務                    if (ntasks > 0) {                        --ntasks;                        futures.add(ecs.submit(it.next()));                        ++active;                    }                    // 沒有任務在執行了,而且沒有拿到一個成功的結果。                    else if (active == 0)                        break;                    // 如果設置了超時情況                    else if (timed) {                        // 等待執行結果直到有結果或者超時                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);                        if (f == null)                            throw new TimeoutException();                        // 這里的更新不可少,因為這個Future可能是執行失敗的情況,那么還需要再次等待下一個結果,超時的設置還是需要用到。                        long now = System.nanoTime();                        nanos -= now - lastTime;                        lastTime = now;                    }                    // 沒有設置超時,并且所有任務都被提交了,則一直等到第一個執行結果出來                    else                        f = ecs.take();                }                // 有返回結果了,嘗試從future中獲取結果,如果失敗了,那么需要接著等待下一個執行結果                if (f != null) {                    --active;                    try {                        return f.get();                    } catch (ExecutionException eex) {                        ee = eex;                    } catch (RuntimeException rex) {                        ee = new ExecutionException(rex);                    }                }            }            // ExecutorCompletionService執行時發生錯誤返回了全是null的future            if (ee == null)                ee = new ExecutionException();            throw ee;        } finally {            // 嘗試取消所有的任務(對于已經完成的任務沒有影響)            for (Future<T> f : futures)                f.cancel(true);        }    }

后面接著分析ThreadPoolExecutor和ScheduledThreadPoolExecutor。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
2018中文字幕一区二区三区| 国产精品高潮粉嫩av| 欧美视频二区36p| 欧美日韩爱爱视频| 亚洲精品狠狠操| 亚洲精品美女视频| 国产亚洲一区精品| 岛国精品视频在线播放| 日韩国产欧美精品在线| 国产主播精品在线| 日韩精品极品在线观看播放免费视频| 狠狠做深爱婷婷久久综合一区| 亚洲精品电影网| 国产成人午夜视频网址| 国产精品国模在线| 91亚洲精品视频| 日韩欧美国产高清91| 国产成人高清激情视频在线观看| 日韩av中文字幕在线播放| 青草青草久热精品视频在线网站| 青青a在线精品免费观看| 国产一级揄自揄精品视频| 久久精品视频一| 亚洲男人第一网站| 在线观看欧美日韩| 久久久久日韩精品久久久男男| 欧美性猛交xxxxx免费看| 国产一区二区三区日韩欧美| 精品国产欧美成人夜夜嗨| 欧美日韩aaaa| 久久精品国产69国产精品亚洲| 91伊人影院在线播放| 蜜臀久久99精品久久久无需会员| 欧美猛交ⅹxxx乱大交视频| 亚洲第一区中文99精品| 亚洲欧洲激情在线| 国产精品亚洲美女av网站| 色综合伊人色综合网站| 91人人爽人人爽人人精88v| 一本久久综合亚洲鲁鲁| 亚洲欧美日韩精品久久奇米色影视| 国产精品一区二区av影院萌芽| 国产成人中文字幕| 操91在线视频| 中文字幕亚洲情99在线| 欧美在线观看www| 丰满岳妇乱一区二区三区| 91在线免费视频| 精品国产乱码久久久久久婷婷| 国产精品久久久久久久av电影| 一本色道久久88亚洲综合88| 久久中文字幕一区| 国产亚洲a∨片在线观看| 欧美性猛交视频| 国产精品免费一区| 91色中文字幕| 亚洲精品久久久久久久久久久久| 国产成人在线一区| 欧美日韩一区二区三区在线免费观看| 亚洲欧美日韩直播| 久久久精品影院| 欧美野外猛男的大粗鳮| 日本国产欧美一区二区三区| 国产精品av在线| 国产精品视频不卡| 97国产精品人人爽人人做| 97国产精品免费视频| 国产午夜精品一区二区三区| 欧美一级淫片videoshd| 91精品一区二区| 黑人巨大精品欧美一区二区一视频| 成人av.网址在线网站| 欧美日韩激情小视频| 亚洲在线免费看| 日韩精品在线观看一区| 欧美刺激性大交免费视频| 亚洲品质视频自拍网| 亚洲女同精品视频| 国产精品久久久999| 精品亚洲精品福利线在观看| 亚洲欧洲在线观看| 在线播放精品一区二区三区| 亚洲免费电影一区| 色偷偷av一区二区三区乱| 亚洲精品第一国产综合精品| 亚洲一区二区三区四区在线播放| 亚洲午夜色婷婷在线| 亚洲直播在线一区| 午夜精品一区二区三区在线播放| 欧美在线视频导航| 亚洲欧美制服中文字幕| 国产免费一区二区三区在线观看| 欧美不卡视频一区发布| 国产91|九色| 韩国19禁主播vip福利视频| 狠狠做深爱婷婷久久综合一区| 亚洲成人网久久久| 疯狂做受xxxx欧美肥白少妇| 亚洲欧洲视频在线| 在线一区二区日韩| 国产一区私人高清影院| 亚洲在线观看视频网站| 国产九九精品视频| 中文字幕成人精品久久不卡| 91国产精品91| 国产成人av在线播放| 国产精品视频导航| 国产精品一二三在线| 精品国产一区久久久| 久久激情视频免费观看| 成人女保姆的销魂服务| 久久亚洲精品一区| 国产一区二区丝袜| 日本伊人精品一区二区三区介绍| 精品国产欧美一区二区三区成人| 欧美成年人网站| 亚洲图中文字幕| 激情成人在线视频| 久久久999精品视频| 亚洲精品国产精品国产自| 国产精品久久久久久久7电影| 色琪琪综合男人的天堂aⅴ视频| 亚洲www在线| 日韩精品在线视频观看| 国产精品久久77777| 国产精品色午夜在线观看| 清纯唯美日韩制服另类| 午夜免费久久久久| 中文字幕亚洲图片| 久久久视频在线| 欧美日韩国产在线看| 色偷偷亚洲男人天堂| 国产婷婷色综合av蜜臀av| 日韩一级黄色av| 日韩欧美国产成人| 国产亚洲精品成人av久久ww| 中文精品99久久国产香蕉| 人人澡人人澡人人看欧美| 国产欧美va欧美va香蕉在| 丰满岳妇乱一区二区三区| 国产欧美精品日韩精品| 在线观看久久久久久| 国产午夜精品全部视频播放| 2020国产精品视频| 伊人久久久久久久久久久久久| 97视频免费在线看| 成人av番号网| 欧美在线观看网址综合| 国产精品久久久久久久久久东京| 欧美乱大交xxxxx| 亚洲最新av网址| 亚洲欧美一区二区精品久久久| 97视频在线观看免费高清完整版在线观看| 免费不卡在线观看av| 欧美性猛交视频| 亚洲免费视频在线观看| 亚洲欧美一区二区三区情侣bbw| 亚洲一区二区三区香蕉| 国产91在线播放九色快色| 国模精品视频一区二区| 日韩电影中文字幕在线| 日韩av免费看网站| 久久精品亚洲一区| 国产精品高潮呻吟久久av黑人|