亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 系統 > Android > 正文

RxJava2 線程調度的方法

2019-10-21 21:19:19
字體:
來源:轉載
供稿:網友

subscribeOn和observeOn負責線程切換,同時某些操作符也默認指定了線程.

我們這里不分析在線程中怎么執行的.只看如何切換到某個指定線程.

subscribeOn

Observable.subscribeOn()在方法內部生成了一個ObservableSubscribeOn對象.

主要看一下ObservableSubscribeOn的subscribeActual方法.

 @Override  public void subscribeActual(final Observer<? super T> observer) {    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);    //調用下游的Observer的onSubscribe方法    observer.onSubscribe(parent);    //通過SubscribeTask執行了上游Observable的subscribeActual方法    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));  }

scheduler.scheduleDirect(Runnable)用于執行SubscribeTask這個任務.SubscribeTask本身是Runnable的實現類.看一下其run方法.

    @Override    public void run() {      //上游的Observable.subscribe方法被切換到了新的線程      source.subscribe(parent);    }

首先可以得出結論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.

如果多次調用subscribeOn切換線程,會有什么效果?

由下往上,每次調用subscribeOn,都會導致上游的Observable的subscribeActual切換到指定的線程.那么最后一次調用的切換最上游的創建型操作符的subscribeActual的執行線程.如果操作符有默認執行線程怎么辦?

操作符默認線程

如果是創建型操作符,處于最上游,那么subscribeOn的線程切換對它不起作用.天高皇帝遠,縣官不如現管.就是這個道理.
如果是其它操作符,會是怎樣的?

以操作符timeout為例:它對應ObservableTimeoutTimed和TimeoutObserver

 @Override    public void onNext(T t) {      downstream.onNext(t);      //超時計時      startTimeout(idx + 1);    }    void startTimeout(long nextIndex) {      //交給操作符默認的線程執行      task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));    }    @Override    public void onError(Throwable t) {        downstream.onError(t);     }    @Override    public void onComplete() {        downstream.onComplete();      }    }    @Override    public void onTimeout(long idx) {        downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));    }
//TimeoutTask.javastatic final class TimeoutTask implements Runnable {    @Override    public void run() {      parent.onTimeout(idx);    }  }

可以看到操作符默認的執行線程只用來做超時計時任務,如果超時了,會在操作符的默認線程執行onError方法..操作符默認線程對下游的observer造成什么影響要做具體對待.

observeOn

observeOn對應ObservableObserveOnObserveOnObserver.

 //ObservableObserveOn.java @Override  protected void subscribeActual(Observer<? super T> observer) {    if (scheduler instanceof TrampolineScheduler) {      source.subscribe(observer);    } else {      Scheduler.Worker w = scheduler.createWorker();      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));    }  }
 //ObserveOnObserver.java   @Override    public void onSubscribe(Disposable d) {      if (DisposableHelper.validate(this.upstream, d)) {        if (d instanceof QueueDisposable) {          if (m == QueueDisposable.SYNC) {          //執行下游Observer的onSubscribe方法            downstream.onSubscribe(this);            schedule();            return;          }          if (m == QueueDisposable.ASYNC) {           //執行下游Observer的onSubscribe方法            downstream.onSubscribe(this);            return;          }        }         //執行下游Observer的onSubscribe方法        downstream.onSubscribe(this);      }    }    @Override    public void onNext(T t) {     //省略      schedule();    }    @Override    public void onError(Throwable t) {     //省略      schedule();    }     void schedule() {      if (getAndIncrement() == 0) {      /*      ObserveOnObserver是Runnable的實現類.交給線程池執行      */        worker.schedule(this);      }    }            void drainNormal() {      final Observer<? super T> a = downstream;      for (;;) {        for (;;) {          T v;          try {            v = q.poll();          } catch (Throwable ex) {            a.onError(ex);            return;          }          //執行下游Observer的onNext方法          a.onNext(v);        }      }    }    void drainFused() {      for (;;) {        if (!delayError && d && ex != null) {          //執行下游Observer的onError方法          downstream.onError(error);          return;        }        downstream.onNext(null);        if (d) {          ex = error;          if (ex != null) {            //執行下游Observer的onError方法            downstream.onError(ex);          } else {            //執行下游Observer的onComplete方法            downstream.onComplete();          }          return;        }      }    }    //執行線程任務    @Override    public void run() {      if (outputFused) {        drainFused();      } else {        drainNormal();      }    }

從上面可以看出ObservableObserveOn在其subscribeActual方法中并沒有切換上游Observable的subscribe方法的執行線程.但是ObserveOnObserver在其onNext,onError和onComplete中通過schedule()方法將下游Observer的各個方法切換到了新的線程.

得出結論: observeOn負責切換的是下游Observer的各個方法的執行線程

如果下游多次通過observeOn切換線程,會有什么效果?

每次切換都會對其下游造成影響,直到遇到下一個observeOn為止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不同.
遇到線程切換的時候,會首先在對應的Observable的subscribeActual方法內,先調用observer.onSubscribe方法.而observer.onSubscribe會逐級向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內調用,這是在主線程執行的.所以onSubscribe方法無論如何都是在主線程執行.

doOnSubscribe

.doOnSubscribe(new Consumer<Disposable>() {          @Override          public void accept(Disposable disposable) throws Exception {                     }        })

我們要看的是方法accept的執行線程.

通過源碼找到對應的DisposableLambdaObserver.

 @Override  public void onSubscribe(Disposable d) {  //在這里調用了accept方法.      onSubscribe.accept(d);  }

這就要看上游在哪個線程執行了Observer.onSubscribe(disposable)方法.

在創建型操作符的subscribeActual方法和subscribeOn對應的Observable的subscribeActual方法內調用了Observer.onSubscribe(disposable)方法.那么這兩處的執行線程就決定了onSubscribe.accept(d);的執行線程.

doFinally

對應ObservableDoFinally和DoFinallyObserver

 //DoFinallyObserver.java @Override    public void onError(Throwable t) {      runFinally();    }    @Override    public void onComplete() {      runFinally();    }    @Override    public void dispose() {      runFinally();    }         void runFinally() {       onFinally.run();    }

可以看到與它所對應的DoFinallyObserver的onError,onComplete,dispose方法的執行線程有關,這三個方法的執行線程又受到上游的observeOn的影響.如果沒有observeOn,則會受到最上游的observable.subscribeActual方法影響.

doOnError

對應ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java @Override    public void onError(Throwable t) {        onError.accept(t);    }

和自身對應的observer.onError所在線程保持一致.

doOnNext

對應ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java @Override    public void onNext(T t) {        onNext.accept(t);    }

和自身對應的observer.onNext所在線程保持一致.

操作符對應方法參數的執行線程

包io.reactivex.functions下的接口類一般用于處理上游數據然后往下傳遞.這些接口類的方法一般在對應的observer.onNext中調用.所以他們的線程保持一致.

總結:

subscribeOn由下往上逐級切換Observable.subscribe的執行線程,不受observeOn影響,也不受具有默認指定線程的非創建型操作符影響,但是會被更上游的subscribeOn奪取線程切換的權利,直到最上游.如果最上游的創建型操作符也有默認執行線程,那么任何一個subscribeOn的線程切換不起作用.subscribeOn由下向上到達最上游后,然后由上往下影響下游的observer的執行線程.遇到observeOn會被奪取線程切換的權利.observeOn影響的是下游的observer的執行線程,由上往下,遇到另一個observeOn會移交線程控制權力,遇到指定默認線程非創建型的操作符,要視具體情況對待.

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。


注:相關教程知識閱讀請移步到Android開發頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲男人第一av网站| 国产精品日韩电影| 久久99久久99精品免观看粉嫩| 琪琪亚洲精品午夜在线| 亚洲精品久久久久久久久久久| 亚洲区免费影片| 国产精品久久久久免费a∨| 久久久精品影院| 欧美激情欧美激情| 粉嫩老牛aⅴ一区二区三区| 久久精品成人一区二区三区| 欧美最猛黑人xxxx黑人猛叫黄| 中文字幕日韩精品在线| 久久99久久99精品免观看粉嫩| 日韩电影中文字幕一区| 成人网页在线免费观看| 日韩中文字幕网址| 欧美日韩国产一区中文午夜| 久久综合五月天| 亚洲影院在线看| 欧美多人爱爱视频网站| 国产婷婷色综合av蜜臀av| 色老头一区二区三区在线观看| 亚洲成人国产精品| 亚洲欧洲日产国码av系列天堂| 欧美一区二区大胆人体摄影专业网站| 亚洲女人初尝黑人巨大| 亚洲视频在线观看免费| 91麻豆国产精品| 欧美一区深夜视频| 色综合视频网站| 日韩视频免费看| 欧美在线视频一区二区| 欧美激情中文字幕在线| 久久91精品国产| 欧美丝袜美女中出在线| 欧美精品videosex极品1| 欧美性猛交xxxx免费看久久久| 久久九九有精品国产23| 欧美大片大片在线播放| xvideos亚洲人网站| 日韩中文字幕久久| 日韩电影第一页| 久久精品色欧美aⅴ一区二区| 久久夜色精品国产| 国产日韩在线免费| 国产精品麻豆va在线播放| 欧美性猛交xxxxx水多| 欧美日韩在线观看视频| 国产精品亚洲综合天堂夜夜| 中文字幕无线精品亚洲乱码一区| 欧美日本亚洲视频| 亚洲深夜福利视频| 91日本在线视频| 影音先锋欧美精品| 中文字幕欧美国内| 精品久久久久国产| 亚洲黄页视频免费观看| 色综合天天综合网国产成人网| 国产一区二区欧美日韩| 久久精品亚洲一区| 国产精品美女无圣光视频| xxx欧美精品| 成人深夜直播免费观看| 亚洲日本欧美日韩高观看| 久久久久久中文| 原创国产精品91| 伊人亚洲福利一区二区三区| 亚洲最大成人网色| 日韩精品中文在线观看| 性日韩欧美在线视频| 亚洲永久免费观看| 国产香蕉一区二区三区在线视频| 久久天天躁狠狠躁夜夜躁2014| 久久久久久久999精品视频| 黄色成人在线播放| 亚洲福利视频二区| 亚洲第一福利在线观看| 国产精欧美一区二区三区| 永久免费精品影视网站| 69**夜色精品国产69乱| 日韩av电影在线免费播放| 日韩经典一区二区三区| 51精品在线观看| 国产精品美女久久久久久免费| 久久久亚洲影院| 久久99精品久久久久久青青91| 久久精品国产69国产精品亚洲| 91在线中文字幕| 中文字幕日韩精品有码视频| 美日韩精品免费视频| 日本成熟性欧美| 九九热精品视频国产| 国产精品电影一区| 亚洲在线观看视频| 久久影视电视剧凤归四时歌| 成人午夜小视频| 欧美激情网站在线观看| 不卡av电影在线观看| 久久久精品免费| 国产一区视频在线| 红桃视频成人在线观看| 色与欲影视天天看综合网| 九九热在线精品视频| 亚洲欧美日韩国产中文| 国产美女搞久久| 俺去啦;欧美日韩| 色婷婷久久av| 国产91色在线|| 久久精品久久精品亚洲人| 色妞在线综合亚洲欧美| 日本乱人伦a精品| 少妇高潮 亚洲精品| 奇米4444一区二区三区| 最近2019中文字幕第三页视频| 九九视频这里只有精品| 国产精品久久久久久av下载红粉| 久久精品99无色码中文字幕| 成人黄色在线播放| 久久的精品视频| 亚洲精品99999| 欧美成人国产va精品日本一级| 色综合色综合久久综合频道88| 欧美日韩精品二区| 久久夜色撩人精品| 国产精品美女免费视频| 2018日韩中文字幕| 亚洲第一页中文字幕| 亚洲综合日韩中文字幕v在线| 亚洲在线观看视频| 精品久久久久久电影| 国产精品99久久久久久久久| 都市激情亚洲色图| 日本成人精品在线| 在线精品视频视频中文字幕| 成人免费在线视频网址| 国产综合在线视频| 亚洲成人精品视频在线观看| 久久99精品国产99久久6尤物| 欧美一级电影在线| 成人精品视频久久久久| 亚洲视频在线免费看| 亚洲伦理中文字幕| 最新91在线视频| 欧美性xxxxx| 久久精品国产清自在天天线| 欧美高清视频在线播放| 国产精品美女久久| 在线成人免费网站| 国产精品久久精品| 久99九色视频在线观看| 深夜福利日韩在线看| 国产成人精品久久二区二区| 日韩av一区二区在线| 国自产精品手机在线观看视频| 亚洲国产精品va在看黑人| 国内精品久久久| 日韩成人在线播放| 69国产精品成人在线播放| 精品亚洲va在线va天堂资源站| 日韩欧美精品免费在线| 欧美大尺度激情区在线播放| 欧美性色视频在线| 欧美日韩精品在线视频|