简单谈谈RxJava和多线程并发
前言
相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的。
RxJava与并发
首先让我们来看一段RxJava协议的原文:
Observablesmustissuenotificationstoobserversserially(notinparallel).Theymayissuethesenotificationsfromdifferentthreads,buttheremustbeaformalhappens-beforerelationshipbetweenthenotifications.
如上所述,RxJava对多线程并发其实并没有做非常的多保护,这段话中说,如果多个Observables从多个线程中发射数据,必须要满足happens-before原则。
下面来看一个简单的例子:
finalPublishSubjectsubject=PublishSubject.create(); subject.subscribe(newSubscriber (){ @Override publicvoidonCompleted(){ } @Override publicvoidonError(Throwablee){ } @Override publicvoidonNext(Integerinteger){ unSafeCount=unSafeCount+integer; Log.d("TAG","onNext:"+unSafeCount); } }); findViewById(R.id.send).setOnClickListener(newView.OnClickListener(){ @Override publicvoidonClick(Viewv){ finalintunit=1; for(inti=0;i<10;i++){ newThread(newRunnable(){ @Override publicvoidrun(){ for(intj=0;j<1000;j++){ subject.onNext(unit); } } }).start(); } } });
这是一个最典型的多线程问题,从10个线程中发射数据并相加,这样最终得到的答案是小于10000的。虽然使用了RxJava,但是这样的使用对于并发是没有意义的,因为RxJava并没有去处理并发带来的问题。我们可以看下subject的onNext方法的源码,里面很简单,就是调用了对应observer的onNext方法而已。不止是这样,绝大多数的Subject都是线程不安全的,所以当你在使用这样的类的时候(典型场景就是自制的RxBus),如果从多个线程中发射数据,那你就要小心了。
对于这样的问题,有两种解决方案:
第一种就是简单的使用传统的解决方法,比如用AtomicInteger代替int。
第二种则是使用RxJava的解决方案,在这里就是用SerializedSubject去代替Subject:
finalPublishSubjectsubject=PublishSubject.create(); subject.subscribe(newSubscriber (){ @Override publicvoidonCompleted(){ } @Override publicvoidonError(Throwablee){ } @Override publicvoidonNext(Integerinteger){ unSafeCount=unSafeCount+integer; count.addAndGet(integer); Log.d("TAG","onNext:"+count); } }); finalSerializedSubject ser=newSerializedSubject (subject); findViewById(R.id.send).setOnClickListener(newView.OnClickListener(){ @Override publicvoidonClick(Viewv){ finalintunit=1; for(inti=0;i<10;i++){ newThread(newRunnable(){ @Override publicvoidrun(){ for(intj=0;j<1000;j++){ ser.onNext(unit); } } }).start(); } } });
可以看一下SerializedSubject的onNext方法做了什么:
@Override
publicvoidonNext(Tt){
if(terminated){
return;
}
synchronized(this){
if(terminated){
return;
}
if(emitting){
FastListlist=queue;
if(list==null){
list=newFastList();
queue=list;
}
list.add(nl.next(t));
return;
}
emitting=true;
}
try{
actual.onNext(t);
}catch(Throwablee){
terminated=true;
Exceptions.throwOrReport(e,actual,t);
return;
}
for(;;){
for(inti=0;i
处理方式很简单,如果有其他线程在发射数据,那就将数据放置到队列中,等待下次发射。这保证了同一时间只会有一个线程调用onNext,onComplete和onError这些方法。
但是这样操作显然是会造成性能的影响的,所以RxJava并不会把所有的操作都打上线程安全的标签。
在这里就要引申出一个问题,那就是使用者对create方法的滥用,其实这个方法不应该被使用者频繁的调用的,因为你必须要小心的处理所有的数据发射,接收的逻辑。相反的,使用已有的操作符能很好的解决这个问题,所以下次大家在遇到问题的时候不要简单的使用create去自己写,而是应该想想有没有现成的操作符可以完成相应的需求。
RxJava中的一些操作符
RxJava中有一些操作符也和多线程并发有关,下面让我来讲一讲merge和concat,以及他们的一些变种操作符。
对于多线程发射数据,有时候我们需要得到的结果也保持和发射时候一样的顺序,这个时候如果我们使用merge这个操作符去结合多个发射源,那么就会产生一定的问题了(例子中做了非常不好的示范——使用了create操作符,请大家不要学习这样的写法,这里单纯是为了求证结果)。
Observableo1=Observable.create(newObservable.OnSubscribe(){
@Override
publicvoidcall(finalSubscribersubscriber){
newThread(newRunnable(){
@Override
publicvoidrun(){
try{
Thread.sleep(1000);
subscriber.onNext(1);
subscriber.onCompleted();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}).start();
}
});
Observableo2=Observable.create(newObservable.OnSubscribe(){
@Override
publicvoidcall(Subscribersubscriber){
subscriber.onNext(2);
subscriber.onCompleted();
}
});
Observable.merge(o1,o2)
.subscribe(newSubscriber(){
@Override
publicvoidonCompleted(){
}
@Override
publicvoidonError(Throwablee){
}
@Override
publicvoidonNext(Integeri){
Log.d("TAG","onNext:"+i);
}
});
对于这样的场景,我们得到的答案将是2,1而不是先得到o1发射的数据,再获取o2的数据。
究其原因,就是因为merge其实就是给什么传什么,也不会去管数据发射的顺序:
@Override
publicvoidonNext(Observablet){
if(t==null){
return;
}
if(t==Observable.empty()){
emitEmpty();
}else
if(tinstanceofScalarSynchronousObservable){
tryEmit(((ScalarSynchronousObservable)t).get());
}else{
InnerSubscriberinner=newInnerSubscriber(this,uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
可以看到在经过lift操作之后,对应的中间人MergeSubscriber的onNext,没有什么多余的代码,所以在多个Observable从多线程中发射数据的时候,顺序当然不能得到保证。
一个单词说明这个问题:interleaving——交错。merge后的数据源可能是交错的。由于merge有这样数据交错的问题,所以它的变种—flatMap也会有同样的问题。
对于这样的场景,我们可以使用concat操作符来完成:
ConcatwaitstosubscribetoeachadditionalObservablethatyoupasstoituntilthepreviousObservablecompletes.
根据文档,我们知道concat操作符是一个接一个的处理数据源的数据的。
if(wip.getAndIncrement()!=0){
return;
}
finalintdelayErrorMode=this.delayErrorMode;
for(;;){
if(actual.isUnsubscribed()){
return;
}
if(!active){
if(delayErrorMode==BOUNDARY){
if(error.get()!=null){
Throwableex=ExceptionsUtils.terminate(error);
if(!ExceptionsUtils.isTerminated(ex)){
actual.onError(ex);
}
return;
}
}
booleanmainDone=done;
Objectv=queue.poll();
booleanempty=v==null;
if(mainDone&&empty){
Throwableex=ExceptionsUtils.terminate(error);
if(ex==null){
actual.onCompleted();
}else
if(!ExceptionsUtils.isTerminated(ex)){
actual.onError(ex);
}
return;
}
if(!empty){
Observablesource;
try{
source=mapper.call(NotificationLite.instance().getValue(v));
}catch(ThrowablemapperError){
Exceptions.throwIfFatal(mapperError);
drainError(mapperError);
return;
}
if(source==null){
drainError(newNullPointerException("Thesourcereturnedbythemapperwasnull"));
return;
}
if(source!=Observable.empty()){
if(sourceinstanceofScalarSynchronousObservable){
ScalarSynchronousObservablescalarSource=(ScalarSynchronousObservable)source;
active=true;
arbiter.setProducer(newConcatMapInnerScalarProducer(scalarSource.get(),this));
}else{
ConcatMapInnerSubscriberinnerSubscriber=newConcatMapInnerSubscriber(this);
inner.set(innerSubscriber);
if(!innerSubscriber.isUnsubscribed()){
active=true;
source.unsafeSubscribe(innerSubscriber);
}else{
return;
}
}
request(1);
}else{
request(1);
continue;
}
}
}
if(wip.decrementAndGet()==0){
break;
}
}
通过源码我们可以知道,active字段就保证了如果上一个数据源还没有发射完数据,就会一直在for循环中等待,直到上一个数据源发射完了数据重置了active字段。
对于concat,其实还存在一个问题,那就是多个Observable变成了串行,会大大的增加整个RxJava事件流的处理时间,对于这个场景,我们可以使用concatEager来解决。concatEager的源码就不带大家分析了,有兴趣的同学可以自行查看。
总结
这篇文章比较短,讲的东西也比较浅显,其实就是讨论了一下RxJava中多线程并发的几个问题。最后我想说,RxJava并不是什么高大上的东西,在你的项目引入之前,要考虑一下是否真的有必要这么做。就算真的有场景需要RxJava,也请不要一口气把项目中所有的操作都换成RxJava,一些简单的操作不一定需要使用RxJava的操作符的实现,用了反而降低了代码的可读性,切勿为了使用Rx而使用Rx。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。