详细介绍RxJS在Angular中的应用
RxJS是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释RxJS其目标就是异步编程,Angular引入RxJS为了就是让异步可控、更简单。
而今就是要探讨什么是Observable、observer、operator、Submit、EventEmmit,以及如何去使用它们。
什么是Observable?
Observable只是一个普通函数,要想让他有所作为,就需要跟observer一起使用;前者是受后者是攻。而这个observer(后面我们会介绍)只是一个带有next、error、complete的简单对象而已。最后,还需要通过subscribe订阅来启动Observable;否则它是不会有任何反应;可以理解为陌*为了他们能在一起而提供的环境,而订阅也会返回一个可用于取消操作(在RxJS里叫unsubscribe)。
当Observable设置观察者后,而连接并获取原始数据的这个过程叫生产者,可能是DOM中的click事件、input事件、或者更加复杂的HTTP通信。
为了更好理解,先从一个简单的示例开始:
import{Component}from'@angular/core';
import{Observable,Subscription}from'rxjs';
@Component({
selector:'app-home',
template:``
})
exportclassHomeComponent{
ngOnInit(){
constnode=document.querySelector('input[type=text]');
//第二个参数input是事件名,对于input元素有一个oninput事件用于接受用户输入
constinput$=Observable.fromEvent(node,'input');
input$.subscribe({
next:(event:any)=>console.log(`Youjusttyped${event.target.value}!`),
error:(err)=>console.log(`Oops...${err}`),
complete:()=>console.log(`Complete!`)
});
}
}
示例中Observable.fromEvent()会返回一个Observable,并且监听input事件,当事件被触发后会发送一个Event给对应的observer观察者。
什么是observer?
observer非常简单,像上面示例中subscribe订阅就是接收一个observer方法。
一般在Angular我们subscribe会这么写:
input$.subscribe((event:any)=>{
});
从语法角度来讲和subscribe({next,error,complete})是一样的。
当Observable产生一个新值时,会通知observer的next(),而当捕获失败可以调用error()。
当Observable被订阅后,除非调用observer的complete()或unsubscribe()取消订阅两情况以外;会一直将值传递给observer。
Observable的生产的值允许经过一序列格式化或操作,最终得到一个有价值的数据给观察者,而这一切是由一序列链式operator来完成的,每一个operator都会产生一个新的Observable。而我们也称这一序列过程为:流。
什么是operator?
正如前面说到的,Observable可以链式写法,这意味着我们可以这样:
Observable.fromEvent(node,'input')
.map((event:any)=>event.target.value)
.filter(value=>value.length>=2)
.subscribe(value=>{console.log(value);});
下面是整个顺序步骤:
- 假设用户输入:a
- Observable对触发oninput事件作出反应,将值以参数的形式传递给observer的next()。
- map()根据event.target.value的内容返回一个新的Observable,并调用next()传递给下一个observer。
- filter()如果值长度>=2的话,则返回一个新的Observable,并调用next()传递给下一个observer。
- 最后,将结果传递给subscribe订阅块。
你只要记住每一次operator都会返回一个新的Observable,不管operator有多少个,最终只有最后一个Observable会被订阅。
不要忘记取消订阅
为什么需要取消订阅
Observable当有数据产生时才会推送给订阅者,所以它可能会无限次向订阅者推送数据。正因为如此,在Angular里面创建组件的时候务必要取消订阅操作,以避免内存泄漏,要知道在SPA世界里懂得擦屁股是一件必须的事。
unsubscribe
前面示例讲过,调用subscribe()后,会返回一个Subscription可用于取消操作unsubscribe()。最合理的方式在ngOnDestroy调用它。
ngOnDestroy(){
this.inputSubscription.unsubscribe();
}
takeWhile
如果组件有很多订阅者的话,则需要将这些订阅者存储在数组中,并组件被销毁时再逐个取消订阅。但,我们有更好的办法:
使用takeWhile()operator,它会在你传递一个布尔值是调用next()还是complete()。
privatealive:boolean=true;
ngOnInit(){
constnode=document.querySelector('input[type=text]');
this.s=Observable.fromEvent(node,'input')
.takeWhile(()=>this.alive)
.map((event:any)=>event.target.value)
.filter(value=>value.length>=2)
.subscribe(value=>{console.log(value)});
}
ngOnDestroy(){
this.alive=false;
}
简单有效,而且优雅。
Subject
如果说Observable与observer是攻受结合体的话,那么Subject就是一个人即攻亦受。正因为如此,我们在写一个Service用于数据传递时,总是使用newSubject。
@Injectable()
exportclassMessageService{
privatesubject=newSubject();
send(message:any){
this.subject.next(message);
}
get():Observable{
returnthis.subject.asObservable();
}
}
当F组件需要向M组件传递数据时,我们可以在F组件中使用send()。
constructor(publicsrv:MessageService){}
ngOnInit(){
this.srv.send('wskfm?')
}
而M组件只需要订阅内容就行:
constructor(privatesrv:MessageService){}
message:any;
ngOnInit(){
this.srv.get().subscribe((result)=>{
this.message=result;
})
}
EventEmitter
其实EventEmitter跟RxJS没有直接关系,因为他是Angular的产物,而非RxJS的东西。或者我们压根没必要去谈,因为EventEmitter就是Subject。
EventEmitter的作用是使指令或组件能自定义事件。
@Output()changed=newEventEmitter(); click(){ this.changed.emit('hi~'); }
@Component({
template:` `
})
exportclassHomeComponent{
subscribe(message:string){
//接收:hi~
}
}
上面示例其实和上一个示例中MessageService如出一辙,只不过是将next()换成emit()仅此而已。
结论
RxJS最难我想就是各种operator的应用了,这需要一些经验的积累。
RxJS很火很大原因我认还是提供了丰富的API,以下是摘抄:
创建数据流:
- 单值:of,empty,never
- 多值:from
- 定时:interval,timer
- 从事件创建:fromEvent
- 从Promise创建:fromPromise
- 自定义创建:create
转换操作:
- 改变数据形态:map,mapTo,pluck
- 过滤一些值:filter,skip,first,last,take
- 时间轴上的操作:delay,timeout,throttle,debounce,audit,bufferTime
- 累加:reduce,scan
- 异常处理:throw,catch,retry,finally
- 条件执行:takeUntil,delayWhen,retryWhen,subscribeOn,ObserveOn
- 转接:switch
组合数据流:
- concat,保持原来的序列顺序连接两个数据流
- merge,合并序列
- race,预设条件为其中一个数据流完成
- forkJoin,预设条件为所有数据流都完成
- zip,取各来源数据流最后一个值合并为对象
- combineLatest,取各来源数据流最后一个值合并为数组
另,最好使用$结尾的命名方式来表示Observable,例:input$。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。