本系列文章主要用户整理redis,分成4节:
本节为本系列第二篇
整理看的资料,将redis原理分成以下几个部分:
左侧部分主要是单点内容,右侧部分主要是集群内容。单点内容可分成了存储、控制,集群内容可以分成基于存储的数据同步以及搭建集群的3种控制方案。
本文是第二篇,主要介绍单线程的事件体系。
事件循环
Redis服务中因为包含了时间事件和文件事件,事情也就变得复杂了,服务器要决定何时处理文件事件、何时处理时间事件、并且还要明确知道处理时间的时间长度,因此事件的执行和调度就成为重点。
Redis服务器会轮流处理文件事件和时间事件,这两种事件的处理都是同步、有序、原子地执行的,服务器也不会终止正在执行的事件,也不会对事件进行抢占。
整个的处理流程是在src/ae.c中的aeProcessEvents中实现的
这里看一下伪码:
def aeProcessEvents()
#获取当前最近的待执行的时间事件
time_event = aeGetNearestTimer()
#计算最近执行事件与当前时间的差值
remain_gap_time = time_event.when - uinx_time_now()
#判断时间事件是否已经到期 则重置 马上执行
if remain_gap_time < 0:
remain_gap_time = 0
#阻塞等待文件事件 具体的阻塞等待时间由remain_gap_time决定
#如果remain_gap_time为0 那么不阻塞立刻返回
aeApiPoll(remain_gap_time)
#处理所有文件事件
ProcessAllFileEvent()
#处理所有时间事件
ProcessAllTimeEvent()
可以看到Redis服务器是边阻塞边执行的,具体的阻塞事件由最近待执行时间事件的等待时间决定的,在阻塞该最小等待时间返回之后,开始处理事件任务,并且先执行文件事件、再执行时间事件,所有即使时间事件要即刻执行,也需要等待文件事件完成之后再执行时间事件,所以比预期的稍晚。
流程如下图所示:
时间事件
Redis的时间事件分为两类:
-
定时事件:任务在等待指定大小的等待时间之后就执行,执行完成就不再执行,只触发一次;
-
周期事件:任务每隔一定时间就执行,执行完成之后等待下一次执行,会周期性的触发;
Redis中大部分是周期事件,周期事件主要是服务器定期对自身运行情况进行检测和调整,从而保证稳定性,这项工作主要是ServerCron函数来完成的,周期事件的内容主要包括:
- 删除数据库的key
- 触发RDB和AOF持久化
- 主从同步
- 集群化保活
- 关闭清理死客户端链接
- 统计更新服务器的内存、key数量等信息
可见 Redis的周期性事件虽然主要处理辅助任务,但是对整个服务的稳定运行,起到至关重要的作用。
时间事件的结构:
Redis的每个时间事件分为三个部分:
- 事件ID 全局唯一 依次递增
- 触发时间戳 ms级精度
- 事件处理函数 事件回调函数
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
Redis的时间事件是存储在链表中的,并且是按照ID存储的,新事件在头部旧事件在尾部,但是并不是按照即将被执行的顺序存储的。(最小栈MinStack思路会更快)
代码如下:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
遍历整个链表,先判断该te是否是是执行完之后的定时事件,这种事件需要删除;然后判断时间是否到时,若到时,则调用相关的事件处理函数,并根据是否是定时事件,重置标志。
文件事件
这个文件事件的模型,被称为reactor(反应堆)模型,先来看看这个reactor模型:
- 功能Func 可以理解为读写事件 可以注册到Reactor进行监控
- EventController可以理解为epoll/kqueue/select等作为IO事件的采集器
- Dispatcher 提供注册/删除事件并进行分发,作为事件分发器
- Event Handler 事件处理器 完成具体事件的回调 供Dispatcher调用
来看看redis中的处理
-
初始化:首先在redis启动初始化的时候,redis会先将事件处理器中的连接应答处理器和AE_READABLE事件关联起来;
-
建立连接:如果客户端向redis发起连接,会产生AE_READABLE事件(步骤A),产生该事件后会被IO多路复用程序监听到(步骤B),然后IO多路复用程序会把监听到的socket信息放入到队列中(步骤C),事件分配器每次从队列中取出一个socket(步骤D),然后事件分派器把socket给对应的事件处理器(步骤E)。
由于连接应答处理器和AE_READABLE事件在redis初始化的时候已经关联起来,所以由连接应答处理器来处理跟客户端建立连接,然后通过ServerSocket创建一个与客户端一对一对应的socket,如叫socket01,同时将这个socket01的AE_READABLE事件和命令请求处理器关联起来。
-
请求请求:当客户端向redis发生请求时(读、写操作),首先就会在对应的socket如socket01上会产生AE_READABLE事件(步骤A),产生该事件后会被IO多路复用程序监听到(步骤B),然后IO多路复用程序会把监听到的socket信息放入到队列中(步骤C),事件分配器每次从队列中取出一个socket(步骤D),然后事件分派器把socket给对应的事件处理器(步骤E)。
这个命令请求处理器会从事件分配器传递过来的socket01上读取相关的数据,如何执行相应的读写处理。操作执行完之后,redis就会将准备好相应的响应数据(如你在redis客户端输入 set a 123回车时会看到响应ok),并将socket01的AE_WRITABLE事件和命令回复处理器关联起来。
-
数据的返回
当客户端会查询redis是否完成相应的操作,就会在socket01上产生一个AE_WRITABLE事件,会由对应的命令回复处理器来处理,就是将准备好的相应数据写入socket01(由于socket连接是双向的),返回给客户端,如读操作,客户端会显示ok。