RxJava取消订阅的各种方式的实现
手动取消订阅
Consumer类型
Observable创建返回Disposable取消
publicclassSecondActivityextendsAppCompatActivity{
privatestaticfinalStringTAG="SecondActivity";
privateDisposabledisposable;
@Override
protectedvoidonCreate(BundlesavedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_second);
disposable=Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(ObservableEmitteremitter)throwsException{
try{
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(newConsumer(){
@Override
publicvoidaccept(Strings)throwsException{
Log.d(TAG,"accept:"+s);
}
});
}
@Override
protectedvoidonDestroy(){
super.onDestroy();
Log.d(TAG,"onDestroy:");
//取消订阅
if(disposable!=null&&!disposable.isDisposed()){
disposable.dispose();
Log.d(TAG,"onDestroy:dispose");
}
}
}
普通类型Observer
在Observer中获取Disposable然后取消
publicclassThirdActivityextendsAppCompatActivity{
privatestaticfinalStringTAG="ThirdActivity";
Disposabledisposable;
@Override
protectedvoidonCreate(BundlesavedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_third);
Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(ObservableEmitteremitter)throwsException{
try{
Thread.sleep(5000);
emitter.onNext("testInfo");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(newObserver(){
@Override
publicvoidonSubscribe(Disposabled){
disposable=d;
}
@Override
publicvoidonNext(Strings){
Log.d(TAG,"onNext:"+s);
}
@Override
publicvoidonError(Throwablee){
Log.d(TAG,"onError:");
}
@Override
publicvoidonComplete(){
Log.d(TAG,"onComplete:");
}
});
}
@Override
protectedvoidonDestroy(){
super.onDestroy();
Log.d(TAG,"onDestroy:");
//然后在需要取消订阅的地方调用即可
if(disposable!=null&&!disposable.isDisposed()){
Log.d(TAG,"dispose:");
disposable.dispose();
}
}
}
DisposableObserver类型
利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消
publicclassFourthActivityextendsAppCompatActivity{
privatestaticfinalStringTAG="FourthActivity";
privateDisposableObserverobserver;
@Override
protectedvoidonCreate(BundlesavedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_fourth);
observer=Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(ObservableEmitteremitter)throwsException{
try{
Thread.sleep(5000);
emitter.onNext("testInfo");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(newDisposableObserver(){
@Override
publicvoidonNext(Stringo){
Log.d(TAG,"onNext:"+o);
}
@Override
publicvoidonError(Throwablee){
Log.d(TAG,"onError:");
}
@Override
publicvoidonComplete(){
Log.d(TAG,"onComplete:");
}
});
}
@Override
protectedvoidonDestroy(){
super.onDestroy();
if(observer!=null&&!observer.isDisposed()){
Log.d(TAG,"dispose:");
observer.dispose();
}
}
}
取消多个Observer
把多个Observer添加CompositeDisposable,一次取消
publicclassComDisposableActivityextendsAppCompatActivity{
privateDisposabledisposable1;
privateDisposabledisposable2;
privatestaticfinalStringTAG="ComDisposableActivity";
@Override
protectedvoidonCreate(BundlesavedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_com_disposable);
Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(ObservableEmitteremitter)throwsException{
try{
Thread.sleep(5000);
emitter.onNext("testInfo");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnDispose(newAction(){
@Override
publicvoidrun()throwsException{
Log.d(TAG,"run:UnsubscribingsubscriptionfromonCreate()");
}
})
.subscribe(newObserver(){
@Override
publicvoidonSubscribe(Disposabled){
disposable1=d;
}
@Override
publicvoidonNext(Strings){
Log.d(TAG,"onNext:"+s);
}
@Override
publicvoidonError(Throwablee){
Log.d(TAG,"onError:");
}
@Override
publicvoidonComplete(){
Log.d(TAG,"onComplete:");
}
});
Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(ObservableEmitteremitter)throwsException{
try{
Thread.sleep(5000);
emitter.onNext("testInfo");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(newObserver(){
@Override
publicvoidonSubscribe(Disposabled){
disposable2=d;
}
@Override
publicvoidonNext(Strings){
Log.d(TAG,"onNext:"+s);
}
@Override
publicvoidonError(Throwablee){
Log.d(TAG,"onError:");
}
@Override
publicvoidonComplete(){
Log.d(TAG,"onComplete:");
}
});
}
@Override
protectedvoidonDestroy(){
super.onDestroy();
CompositeDisposablecompositeDisposable=newCompositeDisposable();
//批量添加
compositeDisposable.add(disposable1);
compositeDisposable.add(disposable2);
//最后一次性全部取消订阅
compositeDisposable.dispose();
}
}
RxLifecyle取消
OnDestory取消
Observable.interval(1,TimeUnit.SECONDS)
.doOnDispose(newAction(){
@Override
publicvoidrun()throwsException{
Log.d(TAG,"UnsubscribingbindToLifecyclefromonDestroy()");
}
})
.compose(this.bindToLifecycle())
.subscribe(newConsumer(){
@Override
publicvoidaccept(Longnum)throwsException{
Log.d(TAG,"accept:"+num);
}
});
指定生命周期取消
Observable.interval(1,TimeUnit.SECONDS)
.doOnDispose(newAction(){
@Override
publicvoidrun()throwsException{
Log.d(TAG,"UnsubscribingUbindUntilEventfromonPause()");
}
}).compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(newConsumer(){
@Override
publicvoidaccept(LongaLong)throwsException{
Log.d(TAG,"bindUntilEventaccept:"+aLong);
}
});
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。