RxJava2 线程调度的方法
subscribeOn和observeOn负责线程切换,同时某些操作符也默认指定了线程.
我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程.
subscribeOn
Observable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn对象.
主要看一下ObservableSubscribeOn的subscribeActual方法.
@Override
publicvoidsubscribeActual(finalObserverobserver){
finalSubscribeOnObserverparent=newSubscribeOnObserver(observer);
//调用下游的Observer的onSubscribe方法
observer.onSubscribe(parent);
//通过SubscribeTask执行了上游Observable的subscribeActual方法
parent.setDisposable(scheduler.scheduleDirect(newSubscribeTask(parent)));
}
scheduler.scheduleDirect(Runnable)用于执行SubscribeTask这个任务.SubscribeTask本身是Runnable的实现类.看一下其run方法.
@Override
publicvoidrun(){
//上游的Observable.subscribe方法被切换到了新的线程
source.subscribe(parent);
}
首先可以得出结论:subscribeOn将上游的Observable的subscribe方法切换到了新的线程.
如果多次调用subscribeOn切换线程,会有什么效果?
由下往上,每次调用subscribeOn,都会导致上游的Observable的subscribeActual切换到指定的线程.那么最后一次调用的切换最上游的创建型操作符的subscribeActual的执行线程.如果操作符有默认执行线程怎么办?
操作符默认线程
如果是创建型操作符,处于最上游,那么subscribeOn的线程切换对它不起作用.天高皇帝远,县官不如现管.就是这个道理.
如果是其它操作符,会是怎样的?
以操作符timeout为例:它对应ObservableTimeoutTimed和TimeoutObserver
@Override
publicvoidonNext(Tt){
downstream.onNext(t);
//超时计时
startTimeout(idx+1);
}
voidstartTimeout(longnextIndex){
//交给操作符默认的线程执行
task.replace(worker.schedule(newTimeoutTask(nextIndex,this),timeout,unit));
}
@Override
publicvoidonError(Throwablet){
downstream.onError(t);
}
@Override
publicvoidonComplete(){
downstream.onComplete();
}
}
@Override
publicvoidonTimeout(longidx){
downstream.onError(newTimeoutException(timeoutMessage(timeout,unit)));
}
//TimeoutTask.java
staticfinalclassTimeoutTaskimplementsRunnable{
@Override
publicvoidrun(){
parent.onTimeout(idx);
}
}
可以看到操作符默认的执行线程只用来做超时计时任务,如果超时了,会在操作符的默认线程执行onError方法..操作符默认线程对下游的observer造成什么影响要做具体对待.
observeOn
observeOn对应ObservableObserveOn和ObserveOnObserver.
//ObservableObserveOn.java
@Override
protectedvoidsubscribeActual(Observerobserver){
if(schedulerinstanceofTrampolineScheduler){
source.subscribe(observer);
}else{
Scheduler.Workerw=scheduler.createWorker();
source.subscribe(newObserveOnObserver(observer,w,delayError,bufferSize));
}
}
//ObserveOnObserver.java
@Override
publicvoidonSubscribe(Disposabled){
if(DisposableHelper.validate(this.upstream,d)){
if(dinstanceofQueueDisposable){
if(m==QueueDisposable.SYNC){
//执行下游Observer的onSubscribe方法
downstream.onSubscribe(this);
schedule();
return;
}
if(m==QueueDisposable.ASYNC){
//执行下游Observer的onSubscribe方法
downstream.onSubscribe(this);
return;
}
}
//执行下游Observer的onSubscribe方法
downstream.onSubscribe(this);
}
}
@Override
publicvoidonNext(Tt){
//省略
schedule();
}
@Override
publicvoidonError(Throwablet){
//省略
schedule();
}
voidschedule(){
if(getAndIncrement()==0){
/*
ObserveOnObserver是Runnable的实现类.交给线程池执行
*/
worker.schedule(this);
}
}
voiddrainNormal(){
finalObservera=downstream;
for(;;){
for(;;){
Tv;
try{
v=q.poll();
}catch(Throwableex){
a.onError(ex);
return;
}
//执行下游Observer的onNext方法
a.onNext(v);
}
}
}
voiddrainFused(){
for(;;){
if(!delayError&&d&&ex!=null){
//执行下游Observer的onError方法
downstream.onError(error);
return;
}
downstream.onNext(null);
if(d){
ex=error;
if(ex!=null){
//执行下游Observer的onError方法
downstream.onError(ex);
}else{
//执行下游Observer的onComplete方法
downstream.onComplete();
}
return;
}
}
}
//执行线程任务
@Override
publicvoidrun(){
if(outputFused){
drainFused();
}else{
drainNormal();
}
}
从上面可以看出ObservableObserveOn在其subscribeActual方法中并没有切换上游Observable的subscribe方法的执行线程.但是ObserveOnObserver在其onNext,onError和onComplete中通过schedule()方法将下游Observer的各个方法切换到了新的线程.
得出结论:observeOn负责切换的是下游Observer的各个方法的执行线程
如果下游多次通过observeOn切换线程,会有什么效果?
每次切换都会对其下游造成影响,直到遇到下一个observeOn为止.
Observer(onSubscribe,onNext,onError,onComplete)
onNext,onError,onComplete与上游最近的observeOn所切换的线程保持一致.onSubscribe则不同.
遇到线程切换的时候,会首先在对应的Observable的subscribeActual方法内,先调用observer.onSubscribe方法.而observer.onSubscribe会逐级向上传递直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法内调用,这是在主线程执行的.所以onSubscribe方法无论如何都是在主线程执行.
doOnSubscribe
.doOnSubscribe(newConsumer(){ @Override publicvoidaccept(Disposabledisposable)throwsException{ } })
我们要看的是方法accept的执行线程.
通过源码找到对应的DisposableLambdaObserver.
@Override
publicvoidonSubscribe(Disposabled){
//在这里调用了accept方法.
onSubscribe.accept(d);
}
这就要看上游在哪个线程执行了Observer.onSubscribe(disposable)方法.
在创建型操作符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.
doFinally
对应ObservableDoFinally和DoFinallyObserver
//DoFinallyObserver.java
@Override
publicvoidonError(Throwablet){
runFinally();
}
@Override
publicvoidonComplete(){
runFinally();
}
@Override
publicvoiddispose(){
runFinally();
}
voidrunFinally(){
onFinally.run();
}
可以看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.如果没有observeOn,则会受到最上游的observable.subscribeActual方法影响.
doOnError
对应ObservableDoOnEach和DoOnEachObserver
//DoOnEachObserver.java
@Override
publicvoidonError(Throwablet){
onError.accept(t);
}
和自身对应的observer.onError所在线程保持一致.
doOnNext
对应ObservableDoOnEach和DoOnEachObserver
//DoOnEachObserver.java
@Override
publicvoidonNext(Tt){
onNext.accept(t);
}
和自身对应的observer.onNext所在线程保持一致.
操作符对应方法参数的执行线程
包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onNext中调用.所以他们的线程保持一致.
总结:
subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeOn的线程切换不起作用.subscribeOn由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另一个observeOn会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。