合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
        事件驱动这个名词出现的越来越频繁了,听起来非常高大上,今天本人把Redis内部的驱动模型研究了一番,感觉收获颇丰啊。一个ae.c主程序,加上4个事件类型的文件,让你彻底弄清楚,Redis是如何处理这些事件的。在Redis的事件处理中,用到了epoll,select,kqueue和evport,evport可能大家会陌生许多。前面3个都是非常常见的事件,在libevent的事件网络库中也都有出现。作者在写这个事件驱动模型的时候,也说了,这只是为了简单的复用了,设计的一个小小的处理模型: ~~~ /* A simple event-driven programming library. Originally I wrote this code * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * * ae是作者写的一个简单的事件驱动库,后面进行了转化,变得更为简单的复用 ~~~ 所以不是很复杂。在了解整个事件驱动的模型前,有先了解一些定义的事件结构体,事件类型总共2个一个FileEvent,TimeEvent: ~~~ /* File event structure */ /* 文件事件结构体 */ typedef struct aeFileEvent { //只为读事件或者写事件中的1种 int mask; /* one of AE_(READABLE|WRITABLE) */ //读方法 aeFileProc *rfileProc; //写方法 aeFileProc *wfileProc; //客户端数据 void *clientData; } aeFileEvent; /* Time event structure */ /* 时间事件结构体 */ typedef struct aeTimeEvent { //时间事件id long long id; /* time event identifier. */ //时间秒数 long when_sec; /* seconds */ //时间毫秒 long when_ms; /* milliseconds */ //时间事件中的处理函数 aeTimeProc *timeProc; //被删除的时候将会调用的方法 aeEventFinalizerProc *finalizerProc; //客户端数据 void *clientData; //时间结构体内的下一个结构体 struct aeTimeEvent *next; } aeTimeEvent; /* A fired event */ /* fired结构体,用来表示将要被处理的文件事件 */ typedef struct aeFiredEvent { //文件描述符id int fd; int mask; } aeFiredEvent; ~~~ FireEvent只是用来标记要处理的文件Event。 这些事件都存在于一个aeEventLoop的结构体内: ~~~ /* State of an event based program */ typedef struct aeEventLoop { //目前创建的最高的文件描述符 int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ //下一个时间事件id long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ //3种事件类型 aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; //事件停止标志符 int stop; //这里存放的是event API的数据,包括epoll,select等事件 void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; } aeEventLoop; ~~~ 在每种事件内部,都有定义相应的处理函数,把函数当做变量一样存在结构体中。下面看下ae.c中的一些API的组成: ~~~ /* Prototypes */ aeEventLoop *aeCreateEventLoop(int setsize); /* 创建aeEventLoop,内部的fileEvent和Fired事件的个数为setSize个 */ void aeDeleteEventLoop(aeEventLoop *eventLoop); /* 删除EventLoop,释放相应的事件所占的空间 */ void aeStop(aeEventLoop *eventLoop); /* 设置eventLoop中的停止属性为1 */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); /* 在eventLoop中创建文件事件 */ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); /* 删除文件事件 */ int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //根据文件描述符id,找出文件的属性,是读事件还是写事件 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */ int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); //根据时间id,删除时间事件,涉及链表的操作 int aeProcessEvents(aeEventLoop *eventLoop, int flags); /* 处理eventLoop中的所有类型事件 */ int aeWait(int fd, int mask, long long milliseconds); /* 让某事件等待 */ void aeMain(aeEventLoop *eventLoop); /* ae事件执行主程序 */ char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); /* 每次eventLoop事件执行完后又重新开始执行时调用 */ int aeGetSetSize(aeEventLoop *eventLoop); /* 获取eventLoop的大小 */ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); /* EventLoop重新调整大小 */ ~~~ 无非涉及一些文件,时间事件的添加,修改等,都是在eventLoop内部的修改,我们来看下最主要,最核心的方法: ~~~ /* ae事件执行主程序 */ void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; //如果eventLoop中的stop标志位不为1,就循环处理 while (!eventLoop->stop) { //每次eventLoop事件执行完后又重新开始执行时调用 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //while循环处理所有的evetLoop的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } ~~~ 道理很简单通过,while循环,处理eventLoop中的所有类型事件,截取部分processEvents()代码: ~~~ numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; //根据掩码计算判断是否为ae读事件,调用时间中的读的处理方法 fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } ~~~ ae中创建时间事件都是以当前时间为基准创建的; ~~~ /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */ long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; //新加的变为timeEvent的头部 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; //返回新创建的时间事件的id return id; } ~~~   下面说说如何调用事件API库里的方法呢。首先隆重介绍什么是epoll,poll,select,kqueu和evport。这些都是一种事件模型。 select事件的模型  (1)创建所关注的事件的描述符集合(fd_set),对于一个描述符,可以关注其上面的读(read)、写(write)、异常(exception)事件,所以通常,要创建三个fd_set, 一个用来收集关注读事件的描述符,一个用来收集关注写事件的描述符,另外一个用来收集关注 异常事件的描述符集合。 (2)轮询所有fd_set中的每一个fd ,检查是否有相应的事件发生,如果有,就进行处理。   poll和上面的区别是可以复用文件描述符,上面对一个文件需要轮询3个文件描述符集合,而poll只需要一个,效率更高 epoll是poll的升级版本,把描述符列表交给内核,一旦有事件发生,内核把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表。效率极大提高  evport这个出现的比较少,大致意思是evport将某一个对象的特定 event 与 Event port 相关联: 在了解了3种事件模型的原理之后,我们看看ae.c在Redis中是如何调用的呢, ~~~ //这里存放的是event API的数据,包括epoll,select等事件 void *apidata; /* This is used for polling API specific data */ ~~~ 就是上面这个属性,在上面的4种事件中,分别对应着3个文件,分别为ae_poll.c,ae_select.c,但是他们的API结构是类似的,我举其中一个例子,epoll的例子,首先都会有此事件特定的结构体: ~~~ typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState; ~~~ 还有共同套路的模板方法: ~~~ static int aeApiCreate(aeEventLoop *eventLoop) static int aeApiResize(aeEventLoop *eventLoop, int setsize) static void aeApiFree(aeEventLoop *eventLoop) static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) static char *aeApiName(void) ~~~ 在创建的时候赋值到eventloop的API data里面去: ~~~ state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } //最后将state的数据赋值到eventLoop的API data中 eventLoop->apidata = state; return 0; ~~~ 在取出事件的poll方法的时候是这些方法的一个区分点: ~~~ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { ..... ~~~ 而在select中的poll方法是这样的: ~~~ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); ...... ~~~ 最后都是基于state中的事件和eventLoop之间的转化实现操作。传入eventLoop中的信息,传入state的信息,经过内部的处理得出终的事件结果。调用就这么简单。