版本并未选用最新的2.1.12stable,而用的是1.4.15stable。

首先定义了一个evepoll结构体,这里官方也给出了注释,大意是由于epoll的接口限制,我们需要手动的追踪所监听的事件类型

1
2
3
4
5
6
/* due to limitations in the epoll interface, we need to keep track of all file descriptors out self.
*/
struct evepoll {
struct event* evread;
struct event* evwrite;
};

然后顺带看一下struct event结构体的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct event {
// 事件队列 链接到event_base.eventqueue
TAILQ_ENTRY (event) ev_next;
// active队列 链接到event_base.activequeues
TAILQ_ENTRY (event) ev_active_next; //active list
// 信号队列 链接到event_base.sig.evsigevents
TAILQ_ENTRY (event) ev_signal_next; //singnal list
// 最小堆下标 存储在event_base.timeheap
unsigned int min_heap_idx; /* for managing timeouts*/
//指向所属的事件循环event_base
struct event_base* ev_base;
//关联的文件描述符
int ev_fd;
//监听的类型
short ev_events;
//加入active队列之后要被调用的次数
short ev_ncalls;
//通过该变量可以在调用过程中删除,因为
// 有些event会在回调函数中删除自己,使用
// 该变量就是为了防止这种情况,让event可以
// 正确的将自己从event_loop中删除
short* ev_pncalls; /* Allows deletes in callback */
//超时的时间与min_heap_idx配合使用,用于二叉堆排序
struct timeval ev_timeout;
/* 优先级,事件触发后根据优先级放入不同active队列event_base.activequeues[ev_pri]中,ev_pri越小优先级越高*/
int ev_pri;
//指定的回调函数与参数
void (*ev_callback)(int, short, void* arg);
void* ev_arg;
// 在活动队列被回调的时候,该变量说明发生了什么事件,event result的简称
int ev_res; /* result passed to event callback */
//标志位,标志该event已经被插入哪几个链表中,为EVLIST_*的多种组合
int ev_flags;
};

其中TAILQ_ENTRY(type)的定义在queue.h中如下,可以看见是一个双向链表的定义,是为了后续进行队列优化以及最小堆优化做准备的。

1
2
3
4
5
#define TAILQ_ENTRY(type)						\
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
}

这里其实我们阅读epoll.h的源码时要关注的只有

1
2
3
4
//关联的文件描述符
int ev_fd;
//监听的类型
short ev_events;

在往下走又定义了如下结构,此结构是一个对epoll实例,监听对evepoll的一个综合管理的结构体

1
2
3
4
5
6
7
8
9
10
11
struct epollop {
// 对应的event管理,通过fds[sock_fd]得到与socket关联的evepoll
struct evepoll* fds;
// fds的数量
int nfds;

// epoll相关
struct epoll_event* events;
int nevents;
int epfd /*epoll_create(32000)*/;
};

接下来是epoll_init函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static void*
epoll_init(struct event_base* base)
{
int epfd;
struct epollop* epollop;

/* Disable epollueue when this environment variable is set */
if (evutil_getenv("EVENT_NOEPOLL"))
return (NULL);

/* Initalize the kernel queue */
if ((epfd = epoll_create(32000)) == -1) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
}

FD_CLOSEONEXEC(epfd);

if (!(epollop = calloc(1, sizeof(struct epollop))))
return (NULL);

epollop->epfd = epfd;

/* Initalize fields */
epollop->events = malloc(INITIAL_NEVENTS * sizeof(struct epoll_event));
if (epollop->events == NULL) {
free(epollop);
return (NULL);
}
epollop->nevents = INITIAL_NEVENTS;

epollop->fds = calloc(INITIAL_NFILES, sizeof(struct evepoll));
if (epollop->fds == NULL) {
free(epollop->events);
free(epollop);
return (NULL);
}
epollop->nfds = INITIAL_NFILES;

evsignal_init(base);

return (epollop);
}

首先是这样一段代码

1
2
3
/* Disable epollueue when this environment variable is set */
if (evutil_getenv("EVENT_NOEPOLL"))
return (NULL);

这里首先是evutil_getenv这个函数,其实是一个嵌套很深的函数,最终会找到这样一个宏,在这里确定了是否能开启epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int
evutil_issetugid(void)
{
#ifdef _EVENT_HAVE_ISSETUGID
return issetugid();
#else

#ifdef _EVENT_HAVE_GETEUID
if (getuid() != geteuid())
return 1;
#endif
#ifdef _EVENT_HAVE_GETEGID
if (getgid() != getegid())
return 1;
#endif
return 0;
#endif
}

之后是初始化epoll并用eollop记录epfd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Initalize the kernel queue */
if ((epfd = epoll_create(32000)) == -1) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
}

FD_CLOSEONEXEC(epfd);// 设置close_on_exec标志,假如之前父进程使用了该文件描述符,则将其关闭。
//参照文章:https://www.cnblogs.com/fengtai/p/12903626.html

if (!(epollop = calloc(1, sizeof(struct epollop))))
return (NULL);

epollop->epfd = epfd; // 引用epoll实例

接下来是初始化其他字段,由前文的宏定义可知初始化的事件数量为32

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Initalize fields */
//events对应的是libevent的事件记录
epollop->events = malloc(INITIAL_NEVENTS * sizeof(struct epoll_event));//INITIAL_NEVENTS == 32
if (epollop->events == NULL) {
free(epollop);
return (NULL);
}
epollop->nevents = INITIAL_NEVENTS; //记录目前事件数量

epollop->fds = calloc(INITIAL_NFILES, sizeof(struct evepoll)); // 对应epoll的events
if (epollop->fds == NULL) {
free(epollop->events);
free(epollop);
return (NULL);
}
epollop->nfds = INITIAL_NFILES;

evsignal_init(base);//内部会绑定套接字端口,设置非阻塞等

return (epollop);

以下为event base的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct event_base {
// 与操作系统相关的io多路复用模型
const struct eventop* evsel;
//调用i/o模型evsel->init返回的变量,相当于io多路复用模型上线文,之后调用与evsel相关的io模型函数都会将该变量传入
void* evbase;
//当前注册的事件event总数
int event_count; /* counts number of total events */
//处于活动队列的事件event总数,这部分事件已经触发即将被回调
int event_count_active; /* counts number of active events */

int event_gotterm; /* Set to terminate loop 正常退出dispatch*/
int event_break; /* Set to terminate loop immediately 马上退出dispatch*/

/* active event management */
//1. active list active队里,事件已经触发等待回调通知
// - 注册一个2s计时器,2s过后该event会被放到active队列等待回调
// - 注册一个socket读事件,当socket可读会将socket读事件放到active队列等待回调
//2. 指针数组的原因是要实现优先级功能
//3. 越靠前优先级越大
struct event_list** activequeues;
// 优先级队列数
int nactivequeues;

/* signal handling info */
struct evsignal_info sig; //信号相关

struct event_list eventqueue; //添加到事件循环中的所有event
struct timeval event_tv;

struct min_heap timeheap; //最小二叉堆用于处理计时器

struct timeval tv_cache;
};

假如你对默认的events容量不满意,还可以手动扩容,手动扩容并未设置event的大小限制,自动扩容会限制大小4096个事件数量,在下面的函数中会有所体现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 重新分配max大小存储event的数组
static int
epoll_recalc(struct event_base* base, void* arg, int max)
{
struct epollop* epollop = arg;

if (max >= epollop->nfds) {
struct evepoll* fds;
int nfds;

nfds = epollop->nfds;
while (nfds <= max)
nfds <<= 1;

fds = realloc(epollop->fds, nfds * sizeof(struct evepoll));
if (fds == NULL) {
event_warn("realloc");
return (-1);
}
epollop->fds = fds;
// 清空后面数组
memset(fds + epollop->nfds, 0,
(nfds - epollop->nfds) * sizeof(struct evepoll));
epollop->nfds = nfds;
}

return (0);
}

epoll_dispatch函数主要是做了如下几件事情,这也是epoll.c中最重要的函数

第一:进入epoll_wait,由于用户可能有设置超时参数,因此对超时进行了设置,又由于Linux系统对最大超时时间有上限要求,因此在超过最大超时时间时会自动转化成MAX_EPOLL_TIMEOUT_MSEC == 35601000(msec)

第二:处理对应的socket读写事件。

第三:假如epoll中的所有被监听的事件都被激活,那么此时考虑自动扩容问题,假如当前的容量大小小于MAX_NEVENTS,将自动扩容*2,扩容函数采用的是

realloc,因此不需要手动管理free的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
static int
epoll_dispatch(struct event_base* base, void* arg, struct timeval* tv)
{
struct epollop* epollop = arg;
struct epoll_event* events = epollop->events;
struct evepoll* evep;
int i, res, timeout = -1;
// 得到毫秒msecond
if (tv != NULL)
timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;

if (timeout > MAX_EPOLL_TIMEOUT_MSEC) {
/* Linux kernels can wait forever if the timeout is too big;
* see comment on MAX_EPOLL_TIMEOUT_MSEC. */
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}

res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
// 产生中断,处理信号
evsignal_process(base);
return (0);
} else if (base->sig.evsignal_caught) {
// 捕捉到信号
evsignal_process(base);
}

event_debug(("%s: epoll_wait reports %d", __func__, res));
// 处理socket读写
for (i = 0; i < res; i++) {
int what = events[i].events;
struct event* evread = NULL, *evwrite = NULL;
int fd = events[i].data.fd;

if (fd < 0 || fd >= epollop->nfds)
continue;
evep = &epollop->fds[fd];

if (what & (EPOLLHUP | EPOLLERR)) {
evread = evep->evread;
evwrite = evep->evwrite;
} else {
if (what & EPOLLIN) {
evread = evep->evread;
}

if (what & EPOLLOUT) {
evwrite = evep->evwrite;
}
}

if (!(evread || evwrite))
continue;

if (evread != NULL)
event_active(evread, EV_READ, 1);
if (evwrite != NULL)
event_active(evwrite, EV_WRITE, 1);
}

if (res == epollop->nevents && epollop->nevents < MAX_NEVENTS) {
/* We used all of the event space this time. We should
be ready for more events next time. */
int new_nevents = epollop->nevents * 2;
struct epoll_event* new_events;

new_events = realloc(epollop->events,
new_nevents * sizeof(struct epoll_event));
if (new_events) {
epollop->events = new_events;
epollop->nevents = new_nevents;
}
}

return (0);
}

接下来是epoll_add,这个函数没啥好说的,就是把相应的事件类型设置一边,注册到epoll事件里去,假如epoll_ctl添加失败会返回-1。又由于nfds采用的是数组的记录方式,因此当fd >= nfds时需要扩容epollop的fds,扩容后的大小为fd,扩容失败也会返回-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static int
epoll_add(void* arg, struct event* ev)
{
struct epollop* epollop = arg;
struct epoll_event epev = {0, {0}};
struct evepoll* evep;
int fd, op, events;
// 信号处理
if (ev->ev_events & EV_SIGNAL)
return (evsignal_add(ev));

fd = ev->ev_fd;
if (fd >= epollop->nfds) {
/* Extent the file descriptor array as necessary */
if (epoll_recalc(ev->ev_base, epollop, fd) == -1)
return (-1);
}
evep = &epollop->fds[fd];
op = EPOLL_CTL_ADD;
events = 0;
if (evep->evread != NULL) {
events |= EPOLLIN;
op = EPOLL_CTL_MOD;
}
if (evep->evwrite != NULL) {
events |= EPOLLOUT;
op = EPOLL_CTL_MOD;
}

if (ev->ev_events & EV_READ)
events |= EPOLLIN;
if (ev->ev_events & EV_WRITE)
events |= EPOLLOUT;

epev.data.fd = fd;
epev.events = events;
// 假如epoll
if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -1)
return (-1);

/* Update events responsible */
if (ev->ev_events & EV_READ)
evep->evread = ev;
if (ev->ev_events & EV_WRITE)
evep->evwrite = ev;

return (0);
}

接下来是epoll_del

这个函数也比较简单,值得一提的是一开始epoll_ctl的op给的选项是EPOLL_CTL_DEL,当发现在删除之前有监听read或者write事件,会将op的EPOLL_CTL_DEL改为EPOLL_CTL_MOD,之后会清理event的资源。假如poll_ctl失败会返回-1.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static int
epoll_del(void* arg, struct event* ev)
{
struct epollop* epollop = arg;
struct epoll_event epev = {0, {0}};
struct evepoll* evep;
int fd, events, op;
int needwritedelete = 1, needreaddelete = 1;

if (ev->ev_events & EV_SIGNAL)
return (evsignal_del(ev));

fd = ev->ev_fd;
if (fd >= epollop->nfds)
return (0);
evep = &epollop->fds[fd];

op = EPOLL_CTL_DEL;
events = 0;

if (ev->ev_events & EV_READ)
events |= EPOLLIN;
if (ev->ev_events & EV_WRITE)
events |= EPOLLOUT;
// 读写中的一个或者一个都没有
if ((events & (EPOLLIN | EPOLLOUT)) != (EPOLLIN | EPOLLOUT)) {
// 下面没看懂~
if ((events & EPOLLIN) && evep->evwrite != NULL) {// 读
needwritedelete = 0;
events = EPOLLOUT;
op = EPOLL_CTL_MOD;
} else if ((events & EPOLLOUT) && evep->evread != NULL) {//写
needreaddelete = 0;
events = EPOLLIN;
op = EPOLL_CTL_MOD;
}
}

epev.events = events;
epev.data.fd = fd;

if (needreaddelete) // 如果读事件被删除则关闭读事件
evep->evread = NULL;
if (needwritedelete) // 如果写事件被删除则关闭写事件
evep->evwrite = NULL;
// 假如原本有读写事件的监听,那么关闭该事件的监听,如果原本没有事件的监听,则从epoll中移除该事件
if (epoll_ctl(epollop->epfd, op, fd, &epev) == -1)
return (-1);

return (0);
}

最后是epoll_dealloc函数

清理所有的epollop的所有资源,包括epoll实例,记录的events等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void
epoll_dealloc(struct event_base* base, void* arg)
{
struct epollop* epollop = arg;
evsignal_dealloc(base);
if (epollop->fds)
free(epollop->fds);
if (epollop->events)
free(epollop->events);
if (epollop->epfd >= 0)
close(epollop->epfd);
memset(epollop, 0, sizeof(struct epollop));
free(epollop);
}