Rxjava功能操作符的使用方法详解
Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,
被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据
下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:
依赖:
compile'io.reactivex.rxjava2:rxandroid:2.0.1' //BecauseRxAndroidreleasesarefewandfarbetween,itisrecommendedyoualso //explicitlydependonRxJava'slatestversionforbugfixesandnewfeatures. compile'io.reactivex.rxjava2:rxjava:2.1.5'
这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。
compile'com.alibaba:fastjson:1.2.39'
importandroid.os.Bundle; importandroid.support.v7.app.AppCompatActivity; importandroid.view.View; importandroid.widget.TextView; importcom.alibaba.fastjson.JSONObject; importjava.io.IOException; importjava.util.concurrent.TimeUnit; importio.reactivex.BackpressureStrategy; importio.reactivex.Flowable; importio.reactivex.FlowableEmitter; importio.reactivex.FlowableOnSubscribe; importio.reactivex.Observable; importio.reactivex.ObservableEmitter; importio.reactivex.ObservableOnSubscribe; importio.reactivex.Observer; importio.reactivex.android.schedulers.AndroidSchedulers; importio.reactivex.annotations.NonNull; importio.reactivex.disposables.Disposable; importio.reactivex.functions.BiFunction; importio.reactivex.functions.Consumer; importio.reactivex.functions.Function; importio.reactivex.schedulers.Schedulers; importokhttp3.Call; importokhttp3.Callback; importokhttp3.OkHttpClient; importokhttp3.Request; importokhttp3.Response; publicclassMainActivityextendsAppCompatActivity{ privateTextViewname; @Override protectedvoidonCreate(BundlesavedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); name=(TextView)findViewById(R.id.name); //用来调用下面的方法,监听。 name.setOnClickListener(newView.OnClickListener(){ @Override publicvoidonClick(Viewv){ interval(); } }); } //例1:Observer publicvoidobserver(){ //观察者 Observerobserver=newObserver (){ @Override publicvoidonSubscribe(@NonNullDisposabled){ } @Override publicvoidonNext(@NonNullStrings){ //接收从被观察者中返回的数据 System.out.println("onNext:"+s); } @Override publicvoidonError(@NonNullThrowablee){ } @Override publicvoidonComplete(){ } }; //被观察者 Observable observable=newObservable (){ @Override protectedvoidsubscribeActual(Observerobserver){ observer.onNext("11111"); observer.onNext("22222"); observer.onComplete(); } }; //产生了订阅 observable.subscribe(observer); } //例2:Flowable privatevoidflowable(){ //被观察者 Flowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(@NonNullFlowableEmitter e)throwsException{ for(inti=0;i<100;i++){ e.onNext(i+""); } } //背压的策略,buffer缓冲区观察者 //背压一共给了五种策略 //BUFFER、 //DROP、打印前128个,后面的删除 //ERROR、 //LATEST、打印前128个和最后一个,其余删除 //MISSING //这里的策略若不是BUFFER那么,会出现著名的:MissingBackpressureException错误 },BackpressureStrategy.BUFFER).subscribe(newConsumer (){ @Override publicvoidaccept(Strings)throwsException{ System.out.println("subscribeaccept"+s); Thread.sleep(1000); } }); } //例3:线程调度器Scheduler publicvoidflowable1(){ Flowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(@NonNullFlowableEmitter e)throwsException{ for(inti=0;i<100;i++){ //输出在哪个线程 System.out.println("subscribeThread.currentThread.getName="+Thread.currentThread().getName()); e.onNext(i+""); } } },BackpressureStrategy.BUFFER) //被观察者一般放在子线程 .subscribeOn(Schedulers.io()) //观察者一般放在主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(newConsumer (){ @Override publicvoidaccept(Strings)throwsException{ System.out.println("s"+s); Thread.sleep(100); //输出在哪个线程 System.out.println("subscribeThread.currentThread.getName="+Thread.currentThread().getName()); } }); } //例4:http请求网络,map转化器,fastjson解析器 publicvoidmap1(){ Observable.create(newObservableOnSubscribe (){ @Override publicvoidsubscribe(@NonNullfinalObservableEmitter e)throwsException{ OkHttpClientclient=newOkHttpClient(); Requestrequest=newRequest.Builder() .url("https://qhb.2dyt.com/Bwei/login") .build(); client.newCall(request).enqueue(newCallback(){ @Override publicvoidonFailure(Callcall,IOExceptione){ } @Override publicvoidonResponse(Callcall,Responseresponse)throwsIOException{ Stringresult=response.body().string(); e.onNext(result); } }); } }) //map转换器flatmap(无序),concatmap(有序) .map(newFunction (){ @Override publicBeanapply(@NonNullStrings)throwsException{ //用fastjson来解析数据 returnJSONObject.parseObject(s,Bean.class); } }).subscribe(newConsumer (){ @Override publicvoidaccept(Beanbean)throwsException{ System.out.println("bean="+bean.toString()); } }); } //常见rxjava操作符 //例定时发送消息 publicvoidinterval(){ Observable.interval(2,1,TimeUnit.SECONDS) .take(10) .subscribe(newConsumer (){ @Override publicvoidaccept(LongaLong)throwsException{ System.out.println("aLong="+aLong); } }); } //例zip字符串合并 publicvoidzip(){ Observableobservable1=Observable.create(newObservableOnSubscribe (){ @Override publicvoidsubscribe(@NonNullObservableEmitter e)throwsException{ e.onNext("1"); e.onNext("2"); e.onNext("3"); e.onNext("4"); e.onComplete(); } }); Observableobservable2=Observable.create(newObservableOnSubscribe (){ @Override publicvoidsubscribe(@NonNullObservableEmitter e)throwsException{ e.onNext("A"); e.onNext("B"); e.onNext("C"); e.onNext("D"); e.onComplete(); } }); Observable.zip(observable1,observable2,newBiFunction (){ @Override publicStringapply(@NonNullStringo,@NonNullStringo2)throwsException{ returno+o2; } }).subscribe(newConsumer (){ @Override publicvoidaccept(Stringo)throwsException{ System.out.println("o"+o); } }); }
总结
以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解、分享一个简单的java爬虫框架、Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!