Linux I/O多路复用详解及实例
LinuxI/O多路复用
Linux中一切皆文件,不论是我们存储在磁盘上的字符文件,可执行文件还是我们的接入电脑的I/O设备等都被VFS抽象成了文件,比如标准输入设备默认是键盘,我们在操作标准输入设备的时候,其实操作的是默认打开的一个文件描述符是0的文件,而一切软件操作硬件都需要通过OS,而OS操作一切硬件都需要相应的驱动程序,这个驱动程序里配置了这个硬件的相应配置和使用方法。Linux的I/O分为阻塞I/O,非阻塞I/O,I/O多路复用,信号驱动I/O四种。对于I/O设备的驱动,一般都会提供关于阻塞和非阻塞两种配置。我们最常见的I/O设备之一--键盘(标准输入设备)的驱动程序默认是阻塞的。
多路复用就是为了使进程能够从多个阻塞I/O中获得自己想要的数据并继续执行接下来的任务。其主要的思路就是同时监视多个文件描述符,如果有文件描述符的设定状态的被触发,就继续执行进程,如果没有任何一个文件描述符的设定状态被触发,进程进入sleep
多路复用的一个主要用途就是实现"I/O多路复用并发服务器",和多线程并发或者多进程并发相比,这种服务器的系统开销更低,更适合做web服务器。
阻塞I/O
阻塞I/O,就是当进程试图访问这个I/O设备而这个设备并没有准备好的时候,设备的驱动程序会通过内核让这个试图访问的进程进入sleep状态。阻塞I/O的一个好处就是可以大大的节约CPU时间,因为一旦一个进程试图访问一个没有准备好的阻塞I/O,就会进入sleep状态,而进入sleep状态的进程是不在内核的进程调度链表中,直到目标I/O准备好了将其唤醒并加入调度链表,这样就可以节约CPU时间。当然阻塞I/O也有其固有的缺点,如果进程试图访问一个阻塞I/O,但是否访问成功并不对接下来的任务有决定性影响,那么直接使其进入sleep状态显然会延误其任务的完成。
典型的默认阻塞IO有标准输入设备,socket设备,管道设备等,当我们使用gets(),scanf(),read()等操作请求这些IO时而IO并没有数据流入,就会造成进程的sleep。
假设一个进程希望通过三个管道中任意一个中读取数据并显示,伪代码如下
read(pipe_0,buf,sizeof(buf));//sleep printbuf; read(pipe_1,buf,sizeof(buf)); printbuf; read(pipe_2,buf,sizeof(buf)); printbuf;
由于管道是阻塞I/O,所以如果pipe_0没有数据流入,进程就是在第一个read()处进入sleep状态而即使pipe_1和pipe_2有数据流入也不会被读取。
如果我们使用下述代码重新设置管道的阻塞属性,显然,如果三个管道都没有数据流入,那么进程就无法获得请求的数据而继续执行,倘若这些数据很重要(所以我们才要用阻塞I/O),那结果就会十分的糟糕,改为轮询却又大量的占据CPU时间。
intfl=fcntl(pipe_fd,F_GETFL);
fcntl(pipe_fd,F_SETFL,fl|O_NONBLOCK);
如何让进程同时监视三个管道,其中一个有数据就继续执行而不会sleep,如果全部没有数据流入再sleep,就是多路复用技术需要解决的问题。
非阻塞I/O
非阻塞I/O就是当一个进程试图访问一个I/O设备的时候,无论是否从中获取了请求的数据都会返回并继续执行接下来的任务。,但非常适合请求是否成功对接下来的任务影响不大的I/O请求。但如果访问一个非阻塞I/O,但这个请求如果失败对进程接下来的任务有致命影响,最粗暴的就是使用while(1){read()}轮询。显然,这种方式会占用大量的CPU时间。
select机制
select是一种非常"古老"的同步I/O接口,但是提供了一种很好的I/O多路复用的思路
模型
fd_set//创建fd_set对象,将来从中增减需要监视的fd FD_ZERO()//清空fd_set对象 FD_SET()//将一个fd加入fd_set对象中 select()//监视fd_set对象中的文件描述符 pselect()//先设定信号屏蔽,再监视 FD_ISSET()//测试fd是否属于fd_set对象 FD_CLR()//从fd_set对象中删除fd
Note:
select的第一个参数nfds是指集合中的最大的文件描述符+1,因为select会无差别遍历整个文件描述符表直到找到目标,而文件描述符是从0开始的,所以一共是集合中的最大的文件描述符+1次。
上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次
select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备
例子_I/O多路复用并发服务器
关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型
#defineBUFSIZE100 #defineMAXNFD1024 intmain() { /***********服务器的listenfd已经准本好了**************/ fd_setreadfds; fd_setwritefds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_SET(listenfd,&readfds); fd_settemprfds=readfds; fd_settempwfds=writefds; intmaxfd=listenfd; intnready; charbuf[MAXNFD][BUFSIZE]={0}; while(1){ temprfds=readfds; tempwfds=writefds; nready=select(maxfd+1,&temprfds,&tempwfds,NULL,NULL) if(FD_ISSET(listenfd,&temprfds)){ //如果监听到的是listenfd就进行accept intsockfd=accept(listenfd,(structsockaddr*)&clientaddr,&len); //将新accept的scokfd加入监听集合,并保持maxfd为最大fd FD_SET(sockfd,&readfds); maxfd=maxfd>sockfd?maxfd:sockfd; //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环 if(--nready==0) continue; } intfd=0; //遍历文件描述符表,处理接收到的消息 for(;fd<=maxfd;fd++){ if(fd==listenfd) continue; if(FD_ISSET(fd,&temprfds)){ intret=read(fd,buf[fd],sizeofbuf[0]); if(0==ret){//客户端链接已经断开 close(fd); FD_CLR(fd,&readfds); if(maxfd==fd) --maxfd; continue; } //将fd加入监听可写的集合 FD_SET(fd,&writefds); } //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中 //将在下一次while()循环开始监视 if(FD_ISSET(fd,&tempwfds)){ intret=write(fd,buf[fd],sizeofbuf[0]); printf("ret%d:%d\n",fd,ret); FD_CLR(fd,&writefds); } } } close(listenfd); }
poll机制
poll是SystemV提出的一种基于select的改良机制,其针对select的诸多明显的缺陷进行了重新设计,包括只遍历被触发个数个文件描述符,不需要备份fd_set等等
模型
structpollfdfds//创建一个pollfd类型的数组 fds[0].fd//向fds[0]中放入需要监视的fd fds[0].events//向fds[0]中放入需要监视的fd的触发事件 POLLIN//I/O有输入 POLLPRI//有紧急数据需要读取 POLLOUT//I/O可写 POLLRDHUP//流式套接字连接断开或套接字处于半关闭状态 POLLERR//错误条件(仅针对输出) POLLHUP//挂起(仅针对输出) POLLNVAL//无效的请求:fd没有被打开(仅针对输出)
例子_I/O多路复用并发服务器
/*...*/ intmain() { /*...*/ structpollfdmyfds[MAXNFD]={0}; myfds[0].fd=listenfd; myfds[0].events=POLLIN; intmaxnum=1; intnready; //准备二维数组buf,每个fd使用buf的一行,数据干扰 charbuf[MAXNFD][BUFSIZE]={0}; while(1){ //poll直接返回event被触发的fd的个数 nready=poll(myfds,maxnum,-1) inti=0; for(;i<maxnum;i++){ //poll通过将相应的二进制位置一来表示已经设置 //如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了 if(myfds[i].revents&POLLIN){ if(myfds[i].fd==listenfd){ intsockfd=accept(listenfd,(structsockaddr*)&clientaddr,&len); //将新accept的scokfd加入监听集合 myfds[maxnum].fd=sockfd; myfds[maxnum].events=POLLIN; maxnum++; //如果意见检查了nready个fd,就直接下一个循环 if(--nready==0) continue; } else{ intret=read(myfds[i].fd,buf[myfds[i].fd],sizeofbuf[0]); if(0==ret){//如果连接断开了 close(myfds[i].fd); //初始化将文件描述符表所有的文件描述符标记为-1 //close的文件描述符也标记为-1 //打开新的描述符时从表中搜索第一个-1 //open()就是这样实现始终使用最小的fd //这里为了演示并没有使用这种机制 myfds[i].fd=-1; continue; } myfds[i].events=POLLOUT; } } elseif(myfds[i].revents&POLLOUT){ intret=write(myfds[i].fd,buf[myfds[i].fd],sizeofbuf[0]); myfds[i].events=POLLIN; } } } close(listenfd); }
epoll
epoll在poll基础上实现的更为健壮的接口,也是现在主流的web服务器使用的多路复用技术,epoll一大特色就是支持EPOLLET(边沿触发)和EPOLLLT(水平触发),前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是EPOLLLT
模型
epoll_create()//创建epoll对象 structepoll_event//准备事件结构体和事件结构体数组 event.events event.data.fd... epoll_ctl()//配置epoll对象 epoll_wait()//监控epoll对象中的fd及其相应的event
例子_I/O多路复用并发服务器
/*...*/ intmain() { /*...*/ /*创建epoll对象*/ intepoll_fd=epoll_create(1024); //准备一个事件结构体 structepoll_eventevent={0}; event.events=EPOLLIN; event.data.fd=listenfd;//data是一个共用体,除了fd还可以返回其他数据 //ctl是监控listenfd是否有event被触发 //如果发生了就把event通过wait带出。 //所以,如果event里不标明fd,我们将来获取就不知道哪个fd epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listenfd,&event); structepoll_eventrevents[MAXNFD]={0}; intnready; charbuf[MAXNFD][BUFSIZE]={0}; while(1){ //wait返回等待的event发生的数目 //并把相应的event放到event类型的数组中 nready=epoll_wait(epoll_fd,revents,MAXNFD,-1) inti=0; for(;i<nready;i++){ //wait通过在events中设置相应的位来表示相应事件的发生 //如果输入可用,那么下面的这个结果应该为真 if(revents[i].events&EPOLLIN){ //如果是listenfd有数据输入 if(revents[i].data.fd==listenfd){ intsockfd=accept(listenfd,(structsockaddr*)&clientaddr,&len); structepoll_eventevent={0}; event.events=EPOLLIN; event.data.fd=sockfd; epoll_ctl(epoll_fd,EPOLL_CTL_ADD,sockfd,&event); } else{ intret=read(revents[i].data.fd,buf[revents[i].data.fd],sizeofbuf[0]); if(0==ret){ close(revents[i].data.fd); epoll_ctl(epoll_fd,EPOLL_CTL_DEL,revents[i].data.fd,&revents[i]); } revents[i].events=EPOLLOUT; epoll_ctl(epoll_fd,EPOLL_CTL_MOD,revents[i].data.fd,&revents[i]); } } elseif(revents[i].events&EPOLLOUT){ intret=write(revents[i].data.fd,buf[revents[i].data.fd],sizeofbuf[0]); revents[i].events=EPOLLIN; epoll_ctl(epoll_fd,EPOLL_CTL_MOD,revents[i].data.fd,&revents[i]); } } } close(listenfd); }
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!