Java扩展库RxJava的基本结构与适用场景小结
基本结构
我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。
Observable.OnSubscribe<String>onSubscriber1=newObservable.OnSubscribe<String>(){ @Override publicvoidcall(Subscriber<?superString>subscriber){ subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber<String>subscriber1=newSubscriber<String>(){ @Override publicvoidonCompleted(){ } @Override publicvoidonError(Throwablee){ } @Override publicvoidonNext(Strings){ } }; Observable.create(onSubscriber1) .subscribe(subscriber1);
首先我们来看一下Observable.create的代码
publicfinalstatic<T>Observable<T>create(OnSubscribe<T>f){ returnnewObservable<T>(hook.onCreate(f)); } protectedObservable(OnSubscribe<T>f){ this.onSubscribe=f; }
直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。
接下来我们来看看subscribe方法。
publicfinalSubscriptionsubscribe(Subscriber<?superT>subscriber){ returnObservable.subscribe(subscriber,this); } privatestatic<T>Subscriptionsubscribe(Subscriber<?superT>subscriber,Observable<T>observable){ ... subscriber.onStart(); hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber); returnhook.onSubscribeReturn(subscriber); }
可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?
publicvoidcall(Subscriber<?superString>subscriber){ subscriber.onNext("1"); subscriber.onCompleted(); }
RxJava使用场景小结
1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的
finalObservable<String>memory=Observable.create(newObservable.OnSubscribe<String>(){ @Override publicvoidcall(Subscriber<?superString>subscriber){ if(memoryCache!=null){ subscriber.onNext(memoryCache); }else{ subscriber.onCompleted(); } } }); Observable<String>disk=Observable.create(newObservable.OnSubscribe<String>(){ @Override publicvoidcall(Subscriber<?superString>subscriber){ StringcachePref=rxPreferences.getString("cache").get(); if(!TextUtils.isEmpty(cachePref)){ subscriber.onNext(cachePref); }else{ subscriber.onCompleted(); } } }); Observable<String>network=Observable.just("network"); //主要就是靠concatoperator来实现 Observable.concat(memory,disk,network) .first() .subscribeOn(Schedulers.newThread()) .subscribe(s->{ memoryCache="memory"; System.out.println("--------------subscribe:"+s); });
2.界面需要等到多个接口并发取完数据,再更新
//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者 privatevoidtestMerge(){ Observable<String>observable1=DemoUtils.createObservable1().subscribeOn(Schedulers.newThread()); Observable<String>observable2=DemoUtils.createObservable2().subscribeOn(Schedulers.newThread()); Observable.merge(observable1,observable2) .subscribeOn(Schedulers.newThread()) .subscribe(System.out::println); }
3.一个接口的请求依赖另一个API请求返回的数据
举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。
这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫Callbackhell
NetworkService.getToken("username","password") .flatMap(s->NetworkService.getMessage(s)) .subscribe(s->{ System.out.println("message:"+s); });
4.界面按钮需要防止连续点击的情况
RxView.clicks(findViewById(R.id.btn_throttle)) .throttleFirst(1,TimeUnit.SECONDS) .subscribe(aVoid->{ System.out.println("click"); });
5.响应式的界面
比如勾选了某个checkbox,自动更新对应的preference
SharedPreferencespreferences=PreferenceManager.getDefaultSharedPreferences(this); RxSharedPreferencesrxPreferences=RxSharedPreferences.create(preferences); Preference<Boolean>checked=rxPreferences.getBoolean("checked",true); CheckBoxcheckBox=(CheckBox)findViewById(R.id.cb_test); RxCompoundButton.checkedChanges(checkBox) .subscribe(checked.asAction());
6.复杂的数据变换
Observable.just("1","2","2","3","4","5") .map(Integer::parseInt) .filter(s->s>1) .distinct() .take(3) .reduce((integer,integer2)->integer.intValue()+integer2.intValue()) .subscribe(System.out::println);//9