缝合文章,缝合太多了就不注明出处了。
五种基本的IO模型 阻塞IO (blocking IO) 在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。 所以,blocking IO的特点就是在IO执行的两个阶段都被block了。
非阻塞IO ( nonblocking IO) linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。 所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
多路复用IO (IO multiplexing) IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO 。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
信号驱动IO (signal driven IO) 应用进程使用 sigaction 系统调用,内核立即返回,应用进程可以继续执行,也就是说等待数据阶段应用进程是非阻塞的。内核在数据到达时向应用进程发送 SIGIO 信号,应用进程收到之后在信号处理程序中调用 recvfrom 将数据从内核复制到应用进程中。信号驱动 I/O 的 CPU 利用率很高。
异步IO (asynchronous IO) linux下的asynchronous IO其实用得很少。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
各个IO Model的比较如图所示:
经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。
select函数相关 select函数声明: 1 2 3 4 5 6 int select (int maxfdp,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout) ; int maxfdp; fd_set *readfds, *writefds, exceptfds; struct timeval *timeout ; return int ;
select参数获取: Maxfdp; 1 2 3 4 for (auto it = fd.begin(); it != fd.end(); it++) if (*it > maxfdp) maxfdp = *it; maxfdp += 1 ;
Fd_set 1 2 3 4 5 6 7 8 9 fd_set read_fds; FD_ZERO(read_fds); for (auto it = fd.begin(); it != fd.end(); it++) FD_SET(*it,&read_fds);
Time_out 1 2 3 4 5 6 7 8 9 struct timeval tv ;tv.tv_sec = (int )nums; tv.tv_usec = (int )num;
select优缺点 select原理
时空复杂度 首先是fds 准备阶段,先是清空FD_ZERO(&fds) ,之后需要扫描一遍vectorfd 里面所有的fd(fd_set(&fds,*it)),看看是否有新的fd进来,同时记录fd的最大值,
之后是select函数调用,(定时或阻塞),此处时间不再讨论范围内,需要注意的是此时需要将bitmap(fds)copy到kernel空间,此处空间拷贝有一定的开销,如果有相应的fd有动作时select会返回,此时我们需要在遍历一遍vectorfd 寻找触发事件的fd并把消息取出来(此时触发事件的fd可能有一个或者多个)。
时间复杂度是 O(N)(俩次fd扫描),空间复杂度是常数(bitmap)。 关于bitmap的说明,监听端口数与机型有关,32位上线是1024,64位是2048.
优点 实现了多路复用IO ,在低于1024个端口数监听时效率最高 (此时总体开销低于epoll,据说)。适用于低并发场景。
缺点 单进程可以打开fd有限制 fds不可重用 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低 用户空间和内核空间的复制非常消耗资源; poll函数相关 pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
poll函数声明 1 int poll (struct pollfd *fds, unsigned int nfds, int timeout) ;
不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。
poll参数获取 Struct pollfd 1 2 3 4 5 struct pollfd { int fd; short events; short revents; };
revents 1 2 3 4 5 6 7 8 9 10 11 12 13 14 #define POLLIN 0x0001 #define POLLPRI 0x0002 #define POLLOUT 0x0004 #define POLLERR 0x0008 #define POLLHUP 0x0010 #define POLLNVAL 0x0020 #define POLLRDNORM 0x0040 #define POLLRDBAND 0x0080 #define POLLWRNORM 0x0100 #define POLLWRBAND 0x0200 #define POLLMSG 0x0400 #define POLLREMOVE 0x1000 #define POLLRDHUP 0x2000
POLLIN|POLLPRI类似于select的读事件,POLLOUT|POLLWRBAND类似于select的写事件。当events属性为POLLIN|POLLOUT,表示监控是否可读或可写。在poll返回时,即可通过检查revents变量对应的标志位与events是否相同,比如revents中POLLIN事件标志位被设置,则表示文件描述符可以被读取
timeout timeout
指定poll()
需要block多少ms,如果发生timeout,那么poll()的返回值为0。
返回值 poll()
函数有三种返回值:
返回值>0,表示已侦听到events指定的事件或者出错返回给revents;
返回值=0,表示timeout时间到或者出错返回给revents;
返回值<0,也就是-1,表示出错。
实例代码(比selecet简单多了) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 int sockfd; struct pollfd pollfds ;int timeout; timeout = 5000 ; pollfds.fd = sockfd; pollfds.events = POLLIN|POLLPRI; for (;;){ switch (poll(&pollfds,1 ,timeout)){ case -1 : printf ("poll error \r\n" ); break ; case 0 : printf ("time out \r\n" ); break ; default : printf ("sockfd have some event \r\n" ); printf ("event value is 0x%x" ,pollfds.revents); break ; } }
Poll优缺点 优点 解决了select端口数量的限制 避免了select的fds重复设置,只需要在检查revents时将其清空即可。 缺点 从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降 。
epoll函数相关 epoll的原理和流程 创建epoll对象
如下图所示,当某个进程调用epoll_create方法时,内核会创建一个eventpoll对象(也就是程序中epfd所代表的对象)。eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。
创建一个代表该epoll的eventpoll对象是必须的,因为内核要维护“就绪列表”等数据,“就绪列表”可以作为eventpoll的成员。
维护监视列表
创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket。以添加socket为例,如下图,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。
当socket收到数据后,中断程序会操作eventpoll对象,而不是直接操作进程。
接收数据
当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如下图展示的是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket。
eventpoll对象相当于是socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变eventpoll的就绪列表来改变进程状态。
当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。
阻塞和唤醒进程
假设计算机中正在运行进程A和进程B,在某时刻进程A运行到了epoll_wait语句。如下图所示,内核会将进程A放入eventpoll的等待队列中,阻塞进程。
当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等待队列中的进程,进程A再次进入运行状态(如下图)。也因为rdlist的存在,进程A可以知道哪些socket发生了变化。
epoll函数相关 创建epoll 1 2 #include <sys/epoll.h> int epoll_create (int size) ;
int size epoll_create() 可以创建一个epoll实例。在linux 内核版本大于2.6.8 后,这个size 参数就被弃用了,但是传入的值必须大于0。
在 epoll_create () 的最初实现版本时, size参数的作用是创建epoll实例时候告诉内核需要使用多少个文件描述符。内核会使用 size 的大小去申请对应的内存(如果在使用的时候超过了给定的size, 内核会申请更多的空间)。现在,这个size参数不再使用了(内核会动态的申请需要的内存)。但要注意的是,这个size必须要大于0,为了兼容旧版的linux 内核的代码。
return int epoll_create() 会返回新的epoll对象的文件描述符。这个文件描述符用于后续的epoll操作 。如果不需要使用这个描述符,请使用close关闭 。
设置epoll事件 1 2 3 #include <sys/epoll.h> int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event) ;
int epfd 这个系统调用能够控制给定的文件描述符*epfd* 指向的epoll实例,之前epoll_create的返回值
int fd fd\ 是目标文件描述符。被监听的socket
op是添加事件的类型 EPOLL_CTL_ADD 在*epfd* 中注册指定的fd文件描述符并能把*event* 和*fd* 关联起来。
EPOLL_CTL_MOD 改变*** fd*和* evetn***之间的联系。
EPOLL_CTL_DEL 从指定的*epfd* 中删除*fd* 文件描述符。在这种模式中*event* 是被忽略的,并且为可以等于NULL。
struct event 1 2 3 4 5 6 7 8 9 10 11 typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t ; struct epoll_event { uint32_t events; epoll_data_t data; };
1 2 3 4 5 6 7 8 9 10 events这个参数是一个字节的掩码构成的。下面是可以用的事件: EPOLLIN - 当关联的文件可以执行 read ()操作时。 EPOLLOUT - 当关联的文件可以执行 write ()操作时。 EPOLLRDHUP - (从 linux 2.6 .17 开始)当socket关闭的时候,或者半关闭写段的(当使用边缘触发的时候,这个标识在写一些测试代码去检测关闭的时候特别好用) EPOLLPRI - 当 read ()能够读取紧急数据的时候。 EPOLLERR - 当关联的文件发生错误的时候,epoll_wait() 总是会等待这个事件,并不是需要必须设置的标识。 EPOLLHUP - 当指定的文件描述符被挂起的时候。epoll_wait() 总是会等待这个事件,并不是需要必须设置的标识。当socket从某一个地方读取数据的时候(管道或者socket),这个事件只是标识出这个已经读取到最后了(EOF)。所有的有效数据已经被读取完毕了,之后任何的读取都会返回0 (EOF)。 EPOLLET - 设置指定的文件描述符模式为边缘触发,默认的模式是水平触发。 EPOLLONESHOT - (从 linux 2.6 .17 开始)设置指定文件描述符为单次模式。这意味着,在设置后只会有一次从epoll_wait() 中捕获到事件,之后你必须要重新调用 epoll_ctl() 重新设置。
return int 如果成功,返回0。如果失败,会返回-1, errno\ 将会被设置
1 2 3 4 5 6 EBADF - epfd 或者 fd 是无效的文件描述符。 EEXIST - op是EPOLL_CTL_ADD,同时 fd 在之前,已经被注册到epoll中了。 EINVAL - epfd不是一个epoll描述符。或者fd和epfd相同,或者op参数非法。 ENOENT - op是EPOLL_CTL_MOD或者EPOLL_CTL_DEL,但是fd还没有被注册到epoll上。 ENOMEM - 内存不足。 EPERM - 目标的fd不支持epoll。
等待epoll事件 1 2 3 4 5 6 7 8 #include <sys/epoll.h> int epoll_wait (int epfd, struct epoll_event *events, int maxevents, int timeout) ; int epoll_pwait (int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask) ;
epoll_wait 这个系统调用是用来等待*epfd* 中的事件。*events* 指向调用者可以使用的事件的内存区域。*maxevents* 告知内核有多少个events,必须要大于0.
timeout\ 这个参数是用来制定epoll_wait 会阻塞多少毫秒,会一直阻塞到下面几种情况:
一个文件描述符触发了事件。
被一个信号处理函数打断,或者timeout超时。
当*timeout* 等于-1的时候这个函数会无限期的阻塞下去,当*timeout* 等于0的时候,就算没有任何事件,也会立刻返回。
struct epoll_event 如下定义: 1 2 3 4 5 6 7 8 9 10 11 typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t ; struct epoll_event { uint32_t events; epoll_data_t data; };
返回值 有多少个IO事件已经准备就绪。如果返回0说明没有IO事件就绪,而是timeout超时。遇到错误的时候,会返回-1,并设置 errno。
1 2 3 4 5 6 有以下几种错误: EBADF - epfd是无效的文件描述符 EFAULT - 指针events指向的内存没有访问权限 EINTR - 这个调用被信号打断。 EINVAL - epfd不是一个epoll的文件描述符,或者maxevents小于等于0
epoll工作方式 epoll的两种工作方式:1.水平触发(LT)2.边缘触发(ET) LT模式:若就绪的事件一次没有处理完要做的事件,就会一直去处理。即就会将没有处理完的事件继续放回到就绪队列之中(即那个内核中的链表),一直进行处理。 ET模式:就绪的事件只能处理一次,若没有处理完会在下次的其它事件就绪时再进行处理。而若以后再也没有就绪的事件,那么剩余的那部分数据也会随之而丢失。 由此可见:ET模式的效率比LT模式的效率要高很多。只是如果使用ET模式,就要保证每次进行数据处理时,要将其处理完,不能造成数据丢失,这样对编写代码的人要求就比较高。 注意:ET模式只支持非阻塞的读写:为了保证数据的完整性。
官方demo 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 #define MAX_EVENTS 10 struct epoll_event ev , events [MAX_EVENTS ];int listen_sock, conn_sock, nfds, epollfd;epollfd = epoll_create1( 0 ); if ( epollfd == -1 ){ perror( "epoll_create1" ); exit ( EXIT_FAILURE ); } ev.events = EPOLLIN; ev.data.fd = listen_sock; if ( epoll_ctl( epollfd, EPOLL_CTL_ADD, listen_sock, &ev ) == -1 ){ perror( "epoll_ctl: listen_sock" ); exit ( EXIT_FAILURE ); } for (;; ){ nfds = epoll_wait( epollfd, events, MAX_EVENTS, -1 ); if ( nfds == -1 ) { perror( "epoll_wait" ); exit ( EXIT_FAILURE ); } for ( n = 0 ; n < nfds; ++n ) { if ( events[n].data.fd == listen_sock ) { conn_sock = accept( listen_sock, (struct sockaddr *) &local, &addrlen ); if ( conn_sock == -1 ) { perror( "accept" ); exit ( EXIT_FAILURE ); } setnonblocking( conn_sock ); ev.events = EPOLLIN | EPOLLET; ev.data.fd = conn_sock; if ( epoll_ctl( epollfd, EPOLL_CTL_ADD, conn_sock, &ev ) == -1 ) { perror( "epoll_ctl: conn_sock" ); exit ( EXIT_FAILURE ); } } else { do_use_fd( events[n].data.fd ); } } }
完整可运行的DEMO 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 #include <stdio.h> #include <sys/epoll.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <netdb.h> #include <errno.h> #define MAX_EVENT 20 #define READ_BUF_LEN 256 static int make_socket_non_blocking (int fd) { int flags, s; flags = fcntl(fd, F_GETFL, 0 ); if (-1 == flags) { perror("Get fd status" ); return -1 ; } flags |= O_NONBLOCK; s = fcntl(fd, F_SETFL, flags); if (-1 == s) { perror("Set fd status" ); return -1 ; } return 0 ; } int main () { int epfd = 0 ; int listenfd = 0 ; int result = 0 ; struct epoll_event ev , event [MAX_EVENT ]; const char * const local_addr = "192.168.0.45" ; struct sockaddr_in server_addr = { 0 }; listenfd = socket(AF_INET, SOCK_STREAM, 0 ); if (-1 == listenfd) { perror("Open listen socket" ); return -1 ; } int on = 1 ; result = setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on) ); if (-1 == result) { perror ("Set socket" ); return 0 ; } server_addr.sin_family = AF_INET; inet_aton (local_addr, &(server_addr.sin_addr)); server_addr.sin_port = htons(8080 ); result = bind(listenfd, (const struct sockaddr *)&server_addr, sizeof (server_addr)); if (-1 == result) { perror("Bind port" ); return 0 ; } result = make_socket_non_blocking(listenfd); if (-1 == result) { return 0 ; } result = listen(listenfd, 200 ); if (-1 == result) { perror("Start listen" ); return 0 ; } epfd = epoll_create1(0 ); if (1 == epfd) { perror("Create epoll instance" ); return 0 ; } ev.data.fd = listenfd; ev.events = EPOLLIN | EPOLLET ; result = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (-1 == result) { perror("Set epoll_ctl" ); return 0 ; } for ( ; ; ) { int wait_count; wait_count = epoll_wait(epfd, event, MAX_EVENT, -1 ); for (int i = 0 ; i < wait_count; i++) { uint32_t events = event[i].events; char host_buf[NI_MAXHOST]; char port_buf[NI_MAXSERV]; int __result; if ( events & EPOLLERR || events & EPOLLHUP || (! events & EPOLLIN)) { printf ("Epoll has error\n" ); close (event[i].data.fd); continue ; } else if (listenfd == event[i].data.fd) { for ( ; ; ) { struct sockaddr in_addr = { 0 }; socklen_t in_addr_len = sizeof (in_addr); int accp_fd = accept(listenfd, &in_addr, &in_addr_len); if (-1 == accp_fd) { perror("Accept" ); break ; } __result = getnameinfo(&in_addr, sizeof (in_addr), host_buf, sizeof (host_buf) / sizeof (host_buf[0 ]), port_buf, sizeof (port_buf) / sizeof (port_buf[0 ]), NI_NUMERICHOST | NI_NUMERICSERV); if (! __result) { printf ("New connection: host = %s, port = %s\n" , host_buf, port_buf); } __result = make_socket_non_blocking(accp_fd); if (-1 == __result) { return 0 ; } ev.data.fd = accp_fd; ev.events = EPOLLIN | EPOLLET; __result = epoll_ctl(epfd, EPOLL_CTL_ADD, accp_fd, &ev); if (-1 == __result) { perror("epoll_ctl" ); return 0 ; } } continue ; } else { int done = 0 ; for ( ; ;) { ssize_t result_len = 0 ; char buf[READ_BUF_LEN] = { 0 }; result_len = read(event[i].data.fd, buf, sizeof (buf) / sizeof (buf[0 ])); if (-1 == result_len) { if (EAGAIN != errno) { perror ("Read data" ); done = 1 ; } break ; } else if (! result_len) { done = 1 ; break ; } write(STDOUT_FILENO, buf, result_len); } if (done) { printf ("Closed connection\n" ); close (event[i].data.fd); } } } } close (epfd); return 0 ; }