Rxjava功能操作符的使用方法详解
Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,
被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据
下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:
依赖:
compile'io.reactivex.rxjava2:rxandroid:2.0.1' //BecauseRxAndroidreleasesarefewandfarbetween,itisrecommendedyoualso //explicitlydependonRxJava'slatestversionforbugfixesandnewfeatures. compile'io.reactivex.rxjava2:rxjava:2.1.5'
这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。
compile'com.alibaba:fastjson:1.2.39'
importandroid.os.Bundle;
importandroid.support.v7.app.AppCompatActivity;
importandroid.view.View;
importandroid.widget.TextView;
importcom.alibaba.fastjson.JSONObject;
importjava.io.IOException;
importjava.util.concurrent.TimeUnit;
importio.reactivex.BackpressureStrategy;
importio.reactivex.Flowable;
importio.reactivex.FlowableEmitter;
importio.reactivex.FlowableOnSubscribe;
importio.reactivex.Observable;
importio.reactivex.ObservableEmitter;
importio.reactivex.ObservableOnSubscribe;
importio.reactivex.Observer;
importio.reactivex.android.schedulers.AndroidSchedulers;
importio.reactivex.annotations.NonNull;
importio.reactivex.disposables.Disposable;
importio.reactivex.functions.BiFunction;
importio.reactivex.functions.Consumer;
importio.reactivex.functions.Function;
importio.reactivex.schedulers.Schedulers;
importokhttp3.Call;
importokhttp3.Callback;
importokhttp3.OkHttpClient;
importokhttp3.Request;
importokhttp3.Response;
publicclassMainActivityextendsAppCompatActivity{
privateTextViewname;
@Override
protectedvoidonCreate(BundlesavedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
name=(TextView)findViewById(R.id.name);
//用来调用下面的方法,监听。
name.setOnClickListener(newView.OnClickListener(){
@Override
publicvoidonClick(Viewv){
interval();
}
});
}
//例1:Observer
publicvoidobserver(){
//观察者
Observerobserver=newObserver(){
@Override
publicvoidonSubscribe(@NonNullDisposabled){
}
@Override
publicvoidonNext(@NonNullStrings){
//接收从被观察者中返回的数据
System.out.println("onNext:"+s);
}
@Override
publicvoidonError(@NonNullThrowablee){
}
@Override
publicvoidonComplete(){
}
};
//被观察者
Observableobservable=newObservable(){
@Override
protectedvoidsubscribeActual(Observerobserver){
observer.onNext("11111");
observer.onNext("22222");
observer.onComplete();
}
};
//产生了订阅
observable.subscribe(observer);
}
//例2:Flowable
privatevoidflowable(){
//被观察者
Flowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(@NonNullFlowableEmittere)throwsException{
for(inti=0;i<100;i++){
e.onNext(i+"");
}
}
//背压的策略,buffer缓冲区观察者
//背压一共给了五种策略
//BUFFER、
//DROP、打印前128个,后面的删除
//ERROR、
//LATEST、打印前128个和最后一个,其余删除
//MISSING
//这里的策略若不是BUFFER那么,会出现著名的:MissingBackpressureException错误
},BackpressureStrategy.BUFFER).subscribe(newConsumer(){
@Override
publicvoidaccept(Strings)throwsException{
System.out.println("subscribeaccept"+s);
Thread.sleep(1000);
}
});
}
//例3:线程调度器Scheduler
publicvoidflowable1(){
Flowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(@NonNullFlowableEmittere)throwsException{
for(inti=0;i<100;i++){
//输出在哪个线程
System.out.println("subscribeThread.currentThread.getName="+Thread.currentThread().getName());
e.onNext(i+"");
}
}
},BackpressureStrategy.BUFFER)
//被观察者一般放在子线程
.subscribeOn(Schedulers.io())
//观察者一般放在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(newConsumer(){
@Override
publicvoidaccept(Strings)throwsException{
System.out.println("s"+s);
Thread.sleep(100);
//输出在哪个线程
System.out.println("subscribeThread.currentThread.getName="+Thread.currentThread().getName());
}
});
}
//例4:http请求网络,map转化器,fastjson解析器
publicvoidmap1(){
Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(@NonNullfinalObservableEmittere)throwsException{
OkHttpClientclient=newOkHttpClient();
Requestrequest=newRequest.Builder()
.url("https://qhb.2dyt.com/Bwei/login")
.build();
client.newCall(request).enqueue(newCallback(){
@Override
publicvoidonFailure(Callcall,IOExceptione){
}
@Override
publicvoidonResponse(Callcall,Responseresponse)throwsIOException{
Stringresult=response.body().string();
e.onNext(result);
}
});
}
})
//map转换器flatmap(无序),concatmap(有序)
.map(newFunction(){
@Override
publicBeanapply(@NonNullStrings)throwsException{
//用fastjson来解析数据
returnJSONObject.parseObject(s,Bean.class);
}
}).subscribe(newConsumer(){
@Override
publicvoidaccept(Beanbean)throwsException{
System.out.println("bean="+bean.toString());
}
});
}
//常见rxjava操作符
//例定时发送消息
publicvoidinterval(){
Observable.interval(2,1,TimeUnit.SECONDS)
.take(10)
.subscribe(newConsumer(){
@Override
publicvoidaccept(LongaLong)throwsException{
System.out.println("aLong="+aLong);
}
});
}
//例zip字符串合并
publicvoidzip(){
Observableobservable1=Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(@NonNullObservableEmittere)throwsException{
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onNext("4");
e.onComplete();
}
});
Observableobservable2=Observable.create(newObservableOnSubscribe(){
@Override
publicvoidsubscribe(@NonNullObservableEmittere)throwsException{
e.onNext("A");
e.onNext("B");
e.onNext("C");
e.onNext("D");
e.onComplete();
}
});
Observable.zip(observable1,observable2,newBiFunction(){
@Override
publicStringapply(@NonNullStringo,@NonNullStringo2)throwsException{
returno+o2;
}
}).subscribe(newConsumer(){
@Override
publicvoidaccept(Stringo)throwsException{
System.out.println("o"+o);
}
});
}
总结
以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解、分享一个简单的java爬虫框架、Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!