linux内核select/poll,epoll实现与区别
下面文章在这段时间内研究select/poll/epoll的内核实现的一点心得体会:
select,poll,epoll都是多路复用IO的函数,简单说就是在一个线程里,可以同时处理多个文件描述符的读写。
select/poll的实现很类似,epoll是从select/poll扩展而来,主要是为了解决select/poll天生的缺陷。
epoll在内核版本2.6以上才出现的新的函数,而他们在linux内核中的实现都是十分相似。
这三种函数都需要设备驱动提供poll回调函数,对于套接字而言,他们是tcp_poll,udp_poll和datagram_poll;
对于自己开发的设备驱动而言,是自己实现的poll接口函数。
select实现(2.6的内核,其他版本的内核,应该都相差不多)
应用程序调用select,进入内核调用sys_select,做些简单初始化工作,接着进入core_sys_select,
此函数主要工作是把描述符集合从用户空间复制到内核空间,最终进入do_select,完成其主要的功能。
do_select里,调用poll_initwait,主要工作是注册poll_wait的回调函数为__pollwait,
当在设备驱动的poll回调函数里调用poll_wait,其实就是调用__pollwait,
__pollwait的主要工作是把当前进程挂载到等待队列里,当等待的事件到来就会唤醒此进程。
接着执行for循环,循环里首先遍历每个文件描述符,调用对应描述符的poll回调函数,检测是否就绪,
遍历完所有描述符之后,只要有描述符处于就绪状态,信号中断,出错或者超时,就退出循环,
否则会调用schedule_xxx函数,让当前进程睡眠,一直到超时或者有描述符就绪被唤醒。
接着又会再次遍历每个描述符,调用poll再次检测。
如此循环,直到符合条件才会退出。
以下是2.6.31内核的有关select函数的部分片段:
他们调用关系:
select-->sys_select-->core_sys_select-->do_select
intdo_select(intn,fd_set_bits*fds,structtimespec*end_time) { ktime_texpire,*to=NULL; structpoll_wqueuestable; poll_table*wait; intretval,i,timed_out=0; unsignedlongslack=0; ///这里为了获得集合中的最大描述符,这样可减少循环中遍历的次数。 ///也就是为什么linux中select第一个参数为何如此重要了 rcu_read_lock(); retval=max_select_fd(n,fds); rcu_read_unlock(); if(retval<0) returnretval; n=retval; ////初始化poll_table结构,其中一个重要任务是把__pollwait函数地址赋值给它, poll_initwait(&table); wait=&table.pt; if(end_time&&!end_time->tv_sec&&!end_time->tv_nsec){ wait=NULL; timed_out=1; } if(end_time&&!timed_out) slack=estimate_accuracy(end_time); retval=0; ///主循环,将会在这里完成描述符的状态轮训 for(;;){ unsignedlong*rinp,*routp,*rexp,*inp,*outp,*exp; inp=fds->in;outp=fds->out;exp=fds->ex; rinp=fds->res_in;routp=fds->res_out;rexp=fds->res_ex; for(i=0;i<n;++rinp,++routp,++rexp){ unsignedlongin,out,ex,all_bits,bit=1,mask,j; unsignedlongres_in=0,res_out=0,res_ex=0; conststructfile_operations*f_op=NULL; structfile*file=NULL; ///select中fd_set以及do_select中的fd_set_bits参数,都是按照位来保存描述符,意思是比如申请一个1024位的内存, ///如果第28位置1,说明此集合有描述符28, in=*inp++;out=*outp++;ex=*exp++; all_bits=in|out|ex;//检测读写异常3个集合中有无描述符 if(all_bits==0){ i+=__NFDBITS; continue; } for(j=0;j<__NFDBITS;++j,++i,bit<<=1){ intfput_needed; if(i>=n) break; if(!(bit&all_bits)) continue; file=fget_light(i,&fput_needed);///通过描述符index获得structfile结构指针, if(file){ f_op=file->f_op;//通过structfile获得file_operations,这是操作文件的回调函数集合。 mask=DEFAULT_POLLMASK; if(f_op&&f_op->poll){ wait_key_set(wait,in,out,bit); mask=(*f_op->poll)(file,wait);//调用我们的设备中实现的poll函数, //因此,为了能让select正常工作,在我们设备驱动中,必须要提供poll的实现, } fput_light(file,fput_needed); if((mask&POLLIN_SET)&&(in&bit)){ res_in|=bit; retval++; wait=NULL;///此处包括以下的,把wait设置为NULL,是因为检测到mask=(*f_op->poll)(file,wait);描述符已经就绪 ///无需再把当前进程添加到等待队列里,do_select遍历完所有描述符之后就会退出。 } if((mask&POLLOUT_SET)&&(out&bit)){ res_out|=bit; retval++; wait=NULL; } if((mask&POLLEX_SET)&&(ex&bit)){ res_ex|=bit; retval++; wait=NULL; } } } if(res_in) *rinp=res_in; if(res_out) *routp=res_out; if(res_ex) *rexp=res_ex; cond_resched(); } wait=NULL;//已经遍历完一遍,该加到等待队列的,都已经加了,无需再加,因此设置为NULL if(retval||timed_out||signal_pending(current))//描述符就绪,超时,或者信号中断就退出循环 break; if(table.error){//出错退出循环 retval=table.error; break; } /* *Ifthisisthefirstloopandwehaveatimeout *given,thenweconverttoktime_tandsettheto *pointertotheexpiryvalue. */ if(end_time&&!to){ expire=timespec_to_ktime(*end_time); to=&expire; } /////让进程休眠,直到超时,或者被就绪的描述符唤醒, if(!poll_schedule_timeout(&table,TASK_INTERRUPTIBLE, to,slack)) timed_out=1; } poll_freewait(&table); returnretval; } voidpoll_initwait(structpoll_wqueues*pwq) { init_poll_funcptr(&pwq->pt,__pollwait);//设置poll_table的回调函数为__pollwait,这样当我们在驱动中调用poll_wait就会调用到__pollwait ........ } staticvoid__pollwait(structfile*filp,wait_queue_head_t*wait_address, poll_table*p) { ................... init_waitqueue_func_entry(&entry->wait,pollwake);//设置唤醒进程调用的回调函数,当在驱动中调用wake_up唤醒队列时候, //pollwake会被调用,这里其实就是调用队列的默认函数default_wake_function //用来唤醒睡眠的进程。 add_wait_queue(wait_address,&entry->wait);//加入到等待队列 } intcore_sys_select(intn,fd_set__user*inp,fd_set__user*outp, fd_set__user*exp,structtimespec*end_time) { ........ //把描述符集合从用户空间复制到内核空间 if((ret=get_fd_set(n,inp,fds.in))|| (ret=get_fd_set(n,outp,fds.out))|| (ret=get_fd_set(n,exp,fds.ex))) ......... ret=do_select(n,&fds,end_time); ............. ////把do_select返回集合,从内核空间复制到用户空间 if(set_fd_set(n,inp,fds.res_in)|| set_fd_set(n,outp,fds.res_out)|| set_fd_set(n,exp,fds.res_ex)) ret=-EFAULT; ............ }
poll的实现跟select基本差不多,按照
poll-->do_sys_poll-->do_poll-->do_pollfd的调用序列
其中do_pollfd是对每个描述符调用其回调poll状态轮训。
poll比select的好处就是没有描述多少限制,select有1024的限制,描述符不能超过此值,poll不受限制。
我们从上面代码分析,可以总结出select/poll天生的缺陷:
1)每次调用select/poll都需要要把描述符集合从用户空间copy到内核空间,检测完成之后,又要把检测的结果集合从内核空间copy到用户空间
当描述符很多,而且select经常被唤醒,这种开销会比较大
2)如果说描述符集合来回复制不算什么,那么多次的全部描述符遍历就比较恐怖了,
我们在应用程序中,每次调用select/poll都必须首先遍历描述符,把他们加到fd_set集合里,这是应用层的第一次遍历,
接着进入内核空间,至少进行一次遍历和调用每个描述符的poll回调检测,一般可能是2次遍历,第一次没发现就绪描述符,
加入等待队列,第二次是被唤醒,接着再遍历一遍。再回到应用层,我们还必须再次遍历所有描述符,用FD_ISSET检测结果集。
如果描述符很多,这种遍历就很消耗CPU资源了。
3)描述符多少限制,当然poll没有限制,select却有1024的硬性限制,除了修改内核增加1024限制外没别的办法。
既然有这么些缺点,那不是select/poll变得一无是处了,那就大错特错了。
他们依然是代码移植的最好函数,因为几乎所有平台都有对它们的实现提供接口。
在描述符不是太多,他们依然十分出色的完成多路复用IO,
而且如果每个连接上的描述符都处于活跃状态,他们的效率其实跟epoll也差不了多少。
曾经使用多个线程+每个线程采用poll的办法开发TCP服务器,处理文件收发,连接达到几千个,
当时的瓶颈已经不在网络IO,而在磁盘IO了。
我们再来看epoll为了解决select/poll天生的缺陷,是如何实现的。
epoll只是select/poll的扩展,他不是在linux内核中另起炉灶,做颠覆性的设计的,他只是在select的基础上来解决他们的缺陷。
他的底层依然需要设备驱动提供poll回调来作为状态检测基础。
epoll分为三个函数epoll_create,epoll_ctl,epoll_wait。
他们的实现在eventpoll.c代码里。
epoll_create创建epoll设备,用来管理所有添加进去的描述符,epoll_ctl用来添加新的描述符,修改或者删除描述符。
epoll_wait等待描述符事件。
epoll_wait的等待已经不再是轮训方式的等待了,epoll内部有个描述符就绪队列,epoll_wait只检测这个队列即可,
他采用睡眠一会检测一下的方式,如果发现描述符就绪队列不为空,就把此队列中的描述符copy到用户空间,然后返回。
描述符就绪队列里的数据又是从何而来的?
原来使用epoll_ctl添加新描述符时候,epoll_ctl内核实现里会修改两个回调函数,
一个是poll_table结构里的qproc回调函数指针,
在select中是__pollwait函数,在epoll中换成ep_ptable_queue_proc,
当在epoll_ctl中调用新添加的描述符的poll回调时候,底层驱动就会调用poll_wait添加等待队列,
底层驱动调用poll_wait时候,
其实就是调用ep_ptable_queue_proc,此函数会修改等待队列的回调函数为ep_poll_callback,并加入到等待队列头里;
一旦底层驱动发现数据就绪,就会调用wake_up唤醒等待队列,从而ep_poll_callback将被调用,
在ep_poll_callback中会把这个就绪的描述符添加到epoll的描述符就绪队列里,并同时唤醒epoll_wait所在的进程。
如此这般,就是epoll的内核实现的精髓。
看他是如何解决select/poll的缺陷的,首先他通过epoll_ctl的EPOLL_CTL_ADD命令把描述符添加进epoll内部管理器里,
只需添加一次即可,直到用epoll_ctl的EPOLL_CTL_DEL命令删除此描述符为止,
而不像select/poll是每次执行都必须添加,很显然大量减少了描述符在内核和用户空间不断的来回copy的开销。
其次虽然epoll_wait内部也是循环检测,但是它只需检测描述符就绪队列是否为空即可,
比起select/poll必须轮训每个描述符的poll,其开销简直可以忽略不计。
他同时也没描述符多少的限制,只要你机器的内存够大,就能容纳非常多的描述符。
以下是epoll相关部分内核代码片段:
structepitem{ /*RBtreenodeusedtolinkthisstructuretotheeventpollRBtree*/ structrb_noderbn;//红黑树节点, structepoll_filefdffd;//存储此变量对应的描述符 structepoll_eventevent;//用户定义的结构 /*其他成员*/ }; structeventpoll{ /*其他成员*/ ....... /*Waitqueueusedbyfile->poll()*/ wait_queue_head_tpoll_wait; /*Listofreadyfiledescriptors*/ structlist_headrdllist;///描述符就绪队列,挂载的是epitem结构 /*RBtreerootusedtostoremonitoredfdstructs*/ structrb_rootrbr;///存储新添加的描述符的红黑树根,此成员用来存储添加进来的所有描述符。挂载的是epitem结构 ......... }; //epoll_create SYSCALL_DEFINE1(epoll_create1,int,flags) { interror; structeventpoll*ep=NULL; /*其他代码*/ ...... //分配eventpoll结构,这个结构是epoll的灵魂,他包含了所有需要处理得数据。 error=ep_alloc(&ep); if(error<0) returnerror; error=anon_inode_getfd("[eventpoll]",&eventpoll_fops,ep, flags&O_CLOEXEC);///打开eventpoll的描述符,并把ep存储到file->private_data变量里。 if(error<0) ep_free(ep); returnerror; } SYSCALL_DEFINE4(epoll_ctl,int,epfd,int,op,int,fd, structepoll_event__user*,event) { /*其他代码*/ ..... ep=file->private_data; ...... epi=ep_find(ep,tfile,fd);///从eventpoll的rbr里查找描述符是fd的epitem, error=-EINVAL; switch(op){ caseEPOLL_CTL_ADD: if(!epi){ epds.events|=POLLERR|POLLHUP; error=ep_insert(ep,&epds,tfile,fd);//在这个函数里添加新描述符,同时修改重要的回调函数。 //同时还调用描述符的poll,查看就绪状态 }else error=-EEXIST; break; /*其他代码*/ ........ } staticintep_insert(structeventpoll*ep,structepoll_event*event, structfile*tfile,intfd) { ...../*其他代码*/ init_poll_funcptr(&epq.pt,ep_ptable_queue_proc);//设置poll_tabe回调函数为ep_ptable_queue_proc //ep_ptable_queue_proc会设置等待队列的回调指针为ep_epoll_callback,同时添加等待队列。 ......../*其他代码*/ revents=tfile->f_op->poll(tfile,&epq.pt);//调用描述符的poll回调,在此函数里ep_ptable_queue_proc会被调用 ......./*其他代码*/ ep_rbtree_insert(ep,epi);//把新生成关于epitem添加到红黑树里 ....../*其他代码*/ if((revents&event->events)&&!ep_is_linked(&epi->rdllink)){ list_add_tail(&epi->rdllink,&ep->rdllist);//如果上边的poll调用,检测到描述符就绪,添加本描述符到就绪队列里。 if(waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if(waitqueue_active(&ep->poll_wait)) pwake++; } ....../*其他代码*/ /*Wehavetocallthisoutsidethelock*/ if(pwake) ep_poll_safewake(&ep->poll_wait);//如果描述符就绪队列不为空,则唤醒epoll_wait所在的进程。 ........./*其他代码*/ } //这个函数设置等待队列回调函数为ep_poll_callback, //这样到底层有数据唤醒等待队列时候,ep_poll_callback就会被调用,从而把就绪的描述符加到就绪队列。 staticvoidep_ptable_queue_proc(structfile*file,wait_queue_head_t*whead, poll_table*pt) { structepitem*epi=ep_item_from_epqueue(pt); structeppoll_entry*pwq; if(epi->nwait>=0&&(pwq=kmem_cache_alloc(pwq_cache,GFP_KERNEL))){ init_waitqueue_func_entry(&pwq->wait,ep_poll_callback); pwq->whead=whead; pwq->base=epi; add_wait_queue(whead,&pwq->wait); list_add_tail(&pwq->llink,&epi->pwqlist); epi->nwait++; }else{ /*Wehavetosignalthatanerroroccurred*/ epi->nwait=-1; } } staticintep_poll_callback(wait_queue_t*wait,unsignedmode,intsync,void*key) { intpwake=0; unsignedlongflags; structepitem*epi=ep_item_from_wait(wait); structeventpoll*ep=epi->ep; ........./*其他代码*/ if(!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink,&ep->rdllist);//把当前就绪的描述epitem结构添加到就绪队列里 ........./*其他代码*/ if(pwake) ep_poll_safewake(&ep->poll_wait);//如果队列不为空,唤醒epoll_wait所在进程 ........./*其他代码*/ } epoll_wait内核代码里主要是调用ep_poll,列出ep_poll部分代码片段: staticintep_poll(structeventpoll*ep,structepoll_event__user*events, intmaxevents,longtimeout) { intres,eavail; unsignedlongflags; longjtimeout; wait_queue_twait; ........./*其他代码*/ if(list_empty(&ep->rdllist)){ init_waitqueue_entry(&wait,current); wait.flags|=WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq,&wait); //如果检测到就绪队列为空,添加当前进程到等待队列,并执行否循环 for(;;){ set_current_state(TASK_INTERRUPTIBLE); if(!list_empty(&ep->rdllist)||!jtimeout)//如果就绪队列不为空,或者超时则退出循环 break; if(signal_pending(current)){//如果信号中断,退出循环 res=-EINTR; break; } spin_unlock_irqrestore(&ep->lock,flags); jtimeout=schedule_timeout(jtimeout);//睡眠,知道被唤醒或者超时为止。 spin_lock_irqsave(&ep->lock,flags); } __remove_wait_queue(&ep->wq,&wait); set_current_state(TASK_RUNNING); } ........./*其他代码*/ if(!res&&eavail&& !(res=ep_send_events(ep,events,maxevents))&&jtimeout) gotoretry; //ep_send_events主要任务是把就绪队列的就绪描述符copy到用户空间的epoll_event数组里, returnres; }
可以看到ep_poll既epoll_wait的循环是相当轻松的循环,他只是简单检测就绪队列而已,因此他的开销很小。
我们最后看看描述符就绪时候,是如何通知给select/poll/epoll的,以网络套接字的TCP协议来进行说明。
tcp协议对应的poll回调是tcp_poll,对应的等待队列头是structsock结构里sk_sleep成员,
在tcp_poll中会把sk_sleep加入到等待队列,等待数据就绪。
当物理网卡接收到数据包,引发硬件中断,驱动在中断ISR例程里,构建skb包,把数据copy进skb,接着调用netif_rx
把skb挂载到CPU相关的input_pkt_queue队列,同时引发软中断,在软中断的net_rx_action回调函数里从input_pkt_queue里取出
skb数据包,通过分析,调用协议相关的回调函数,这样层层传递,一直到structsock,此结构里的sk_data_ready回调指针被调用
sk_data_ready指向sock_def_readable函数,sock_def_readable函数其实就是wake_up唤醒sock结构里的sk_sleep。
以上机制,对select/poll/epoll都是一样的,接下来唤醒sk_sleep方式就不一样了,因为他们指向了不同的回调函数。
在select/poll实现中,等待队列回调函数是pollwake其实就是调用default_wake_function,唤醒被select阻塞住的进程。
epoll实现中,等待回调函数是ep_poll_callback,此回调函数只是把就绪描述符加入到epoll的就绪队列里。
所以呢select/poll/epoll其实他们在内核实现中,差别也不是太大,其实都差不多。
epoll虽然效率不错,可以跟windows平台中的完成端口比美,但是移植性太差,
目前几乎就只有linux平台才实现了epoll而且必须是2.6以上的内核版本。