0. 前言
本文是继续上一节来看看事件模块,事件模块是nginx的核心,它负责事件的收集、管理、分发。这里所说的事件,主要以网络事件和定时器事件为主,而网络事件中又以TCP网络事件为主(Nginx毕竟是个Web服务器)。
1. 基本结构
ngx_event_t
简单看就是一个连接data,一个定时器timer,一个事件处理函数handler,以及一堆标志位。定义如下:
struct ngx_event_s {
// 事件相关的对象。通常data都是指向ngx_connection_t连接对象。开启文件异步I/O时,它可能会指向ngx_event_aio_t结构体
void *data;
//标志位,为1时表示事件是可写的。通常情况下,它表示对应的TCP连接目前状态是可写的,也就是连接处于可以发送网络包的状态
unsigned write:1;
//标志位,为1时表示为此事件可以建立新的连接。通常情况下,在ngx_cycle_t中的listening动态数组中,每一个监听对象ngx_listening_t对应的读事件中的accept标志位才会是1
unsigned accept:1;
// 这个标志位用于区分当前事件是否是过期的,它仅仅是给事件驱动模块使用的,而事件消费模块可不用关心
unsigned instance:1;
// 为1时表示当前事件是活跃的,为0时表示事件是不活跃的。这个状态对应着事件驱动模块处理方式的不同。例如,在添加事件、删除事件和处理事件时,active标志位的不同都会对应着不同的处理方式。
unsigned active:1;
// 为1时表示禁用事件,仅在kqueue或者rtsig事件驱动模块中有效,而对于epoll事件驱动模块则无意义
unsigned disabled:1;
// 为1时表示当前事件已经准备就绪,也就是说,允许这个事件的消费模块处理这个事件。在HTTP框架中,经常会检查事件的ready标志位以确定是否可以接收请求或者发送响应
unsigned ready:1;
unsigned oneshot:1;
// 异步aio已经完成表示,该标志位用于异步AIO事件的处理
unsigned complete:1;
// 为1时表示当前处理的字符流已经结束
unsigned eof:1;
// 为1时表示事件在处理过程中出现错误
unsigned error:1;
// 为1时表示这个事件已经超时,用以提示事件的消费模块做超时处理
unsigned timedout:1;
// 为1时表示这个事件存在于定时器中
unsigned timer_set:1;
// 为1时表示需要延迟处理这个事件,它仅用于限速功能
unsigned delayed:1;
// 为1时表示延迟建立TCP连接,也就是说,经过TCP三次握手后并不建立连接,而是要等到真正收到数据包后才会建立TCP连接
unsigned deferred_accept:1;
// 只与kqueue和aio事件驱动机制有关,与epoll无关
unsigned pending_eof:1;
unsigned posted:1;
// 为1时表示当前事件已经关闭,epoll模块没有使用它
unsigned closed:1;
/* to test on worker exit */
unsigned channel:1;
unsigned resolver:1;
unsigned cancelable:1;
#if (NGX_HAVE_KQUEUE)
unsigned kq_vnode:1;
/* the pending errno reported by kqueue */
int kq_errno;
#endif
/*
* kqueue only:
* accept: number of sockets that wait to be accepted
* read: bytes to read when event is ready
* or lowat when event is set with NGX_LOWAT_EVENT flag
* write: available space in buffer when event is ready
* or lowat when event is set with NGX_LOWAT_EVENT flag
*
* iocp: TODO
*
* otherwise:
* accept: 1 if accept many, 0 otherwise
* read: bytes to read when event is ready, -1 if not known
*/
// 在epoll事件驱动机制下表示一次尽可能多地建立TCP连接,它与multi_accept配置项对应
int available;
// // 这个事件发生时的处理方法,每个事件消费模块都会重新实现它
ngx_event_handler_pt handler;
#if (NGX_HAVE_IOCP)
ngx_event_ovlp_t ovlp;
#endif
// epoll事件驱动方式不使用
ngx_uint_t index;
// 记录error_log日志的ngx_log_t对象
ngx_log_t *log;
// 定时器节点,用于定时器红黑树中
ngx_rbtree_node_t timer;
/* the posted queue */
ngx_queue_t queue;
#if 0
/* the threads support */
/*
* the event thread context, we store it here
* if $(CC) does not understand __thread declaration
* and pthread_getspecific() is too costly
*/
void *thr_ctx;
#if (NGX_EVENT_T_PADDING)
/* event should not cross cache line in SMP */
uint32_t padding[NGX_EVENT_T_PADDING];
#endif
#endif
};
事件是不需要创建的,因为Nginx在启动时已经在ngx_cycle_t的read_events成员中预分配了所有的读事件,一般在向epoll中添加可读或者可写事件时,都是使用ngx_handle_read_event或者ngx_handle_write_event方法的。
ngx_int_t
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags)
{
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue, epoll */
if (!rev->active && !rev->ready) {
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
}
.....
}
没有准备好就通过ngx_add_event
加入到事件驱动模块,ngx_handle_write_event与此类似。
ngx_connection_t
这里的连接指的是指客户端发起的、服务器被动接受的连接,这样的连接都是使用ngx_connection_t结构体表示的。
这里的连接是预先分配的,什么是连接呢,首先是有socket(fd),其次是事件(read或者write),然后收发网络字节流的方法(recv、send),还有就是对这个连接的内存分配(pool、buffer),最后是一些标志位(timedout、idle、close、reusable、error等)。
struct ngx_connection_s {
// 连接未使用时,data成员用于充当连接池中空闲连接链表中的next指针。当连接被使用时,data的意义由使用它的Nginx模块而定,如在HTTP框架中,data指向ngx_http_request_t请求
void *data;
// 连接对应的读事件
ngx_event_t *read;
// 连接对应的写事件
ngx_event_t *write;
// 套接字句柄
ngx_socket_t fd;
// 直接接收网络字符流的方法
ngx_recv_pt recv;
// 直接发送网络字符流的方法
ngx_send_pt send;
// 以ngx_chain_t链表为参数来接收网络字符流的方法
ngx_recv_chain_pt recv_chain;
// 以ngx_chain_t链表为参数来发送网络字符流的方法
ngx_send_chain_pt send_chain;
// 这个连接对应的ngx_listening_t监听对象,此连接由listening监听端口的事件建立
ngx_listening_t *listening;
// // 这个连接上已经发送出去的字节数
off_t sent;
ngx_log_t *log;
// 内存池。一般在accept一个新连接时,会创建一个内存池,而在这个连接结束时会销毁内存池。注意,这里所说的连接是指成功建立的TCP连接,所有的ngx_connection_t结构体都是预分配的。这个内存池的大小将由上面的listening监听对象中的pool_size成员决定
ngx_pool_t *pool;
int type;
// 连接客户端的sockaddr结构体
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
ngx_proxy_protocol_t *proxy_protocol;
#if (NGX_SSL || NGX_COMPAT)
ngx_ssl_connection_t *ssl;
#endif
ngx_udp_connection_t *udp;
// 本机的监听端口对应的sockaddr结构体,也就是listening监听对象中的sockaddr成员
struct sockaddr *local_sockaddr;
socklen_t local_socklen;
// 用于接收、缓存客户端发来的字符流,每个事件消费模块可自由决定从连接池中分配多大的空间给buffer这个接收缓存字段。例如,在HTTP模块中,它的大小决定于client_header_buffer_size配置项
ngx_buf_t *buffer;
// 该字段用来将当前连接以双向链表元素的形式添加到ngx_cycle_t核心结构体的reusable_connections_queue双向链表中,表示可以重用的连接
ngx_queue_t queue;
// 连接使用次数。ngx_connection_t结构体每次建立一条来自客户端的连接,或者用于主动向后端服务器发起连接时(ngx_peer_connection_t也使用它),number都会加1
ngx_atomic_uint_t number;
ngx_msec_t start_time;
// 处理的请求次数
ngx_uint_t requests;
unsigned buffered:8;
unsigned log_error:3; /* ngx_connection_log_error_e */
unsigned timedout:1;
unsigned error:1;
unsigned destroyed:1;
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned shared:1;
unsigned sendfile:1;
unsigned sndlowat:1;
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
unsigned need_last_buf:1;
#if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT)
unsigned busy_count:2;
#endif
#if (NGX_THREADS || NGX_COMPAT)
ngx_thread_task_t *sendfile_task;
#endif
};
ngx_peer_connection_s
作为Web服务器,Nginx也需要向其他服务器主动发起连接,这样的连接与上边介绍的被动连接是不同的,这种主动连接用ngx_peer_connection_t来标志,其定义如下:
struct ngx_peer_connection_s {
// 一个主动连接实际上也需要ngx_connection_t 结构体中的大部分成员,并且出于重用的考虑而定义了connection 成员
ngx_connection_t *connection;
// 远端服务器的socket 地址
struct sockaddr *sockaddr;
socklen_t socklen;
// 远端服务器的名称
ngx_str_t *name;
// 表示在连接一个远端服务器时,当前连接出现异常失败后可以重试的次数,也就是允许的最多失败次数
ngx_uint_t tries;
ngx_msec_t start_time;
// 获取连接的方法,如果使用长连接构成的连接池,那么必须要实现get 方法
ngx_event_get_peer_pt get;
// 与get 方法对应的释放连接的方法
ngx_event_free_peer_pt free;
ngx_event_notify_peer_pt notify;
// 这个data 指针仅用于和上面的get 、free 方法配合传递参数
void *data;
#if (NGX_SSL || NGX_COMPAT)
ngx_event_set_peer_session_pt set_session;
ngx_event_save_peer_session_pt save_session;
#endif
// 本机地址信息
ngx_addr_t *local;
int type;
// 套接字的接收缓冲区大小
int rcvbuf;
// 记录日志的ngx_log_t 对象
ngx_log_t *log;
// 为1 时表示上面的connection 连接已经缓存
unsigned cached:1;
unsigned transparent:1;
unsigned so_keepalive:1;
unsigned down:1;
/* ngx_connection_log_error_e */
unsigned log_error:2;
NGX_COMPAT_BEGIN(2)
NGX_COMPAT_END
};
连接池
Nginx在接受客户端的连接时,所使用的ngx_connection_t结构体都是在启动阶段就预分配好的,使用时从连接池中获取空闲连接。这些连接池都是在上篇讲的ngx_cycle_t结构体中,如下图所示:
操作连接池2个方法:ngx_get_connection()
获取控线连接,同时获取相应的读/写事件,ngx_free_connection()
回收连接。
2. ngx_events_module核心模块
模块定义
ngx_events_module是一个核心模块(type:NGX_CORE_MODULE),它定义了一类新的模块:事件模块(type:NGX_EVENT_MODULE)。并定义了每个事件模块都需要实现的ngx_event_module_t接口,还需要管理这些事件模块生成的配置项结构体,并解析事件类配置项,当然,在解析配置项时会调用其在ngx_command_t数组中定义的回调方法。
ngx_events_module
ngx_module_t ngx_events_module = {
NGX_MODULE_V1,
&ngx_events_module_ctx, /* module context */
ngx_events_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
这里主要就是定义了ngx_events_commands。ngx_events_module_ctx的init做了一些验证工作,并没实质性的工作。
ngx_command_t
定义一个Nginx模块就是在实现ngx_modult_t结构体,这里需要先定义好ngx_command_t数组,何模块都是以配置项来定制功能的。ngx_events_commands数组决定了ngx_events_module模块是如何定制其功能的。
static ngx_command_t ngx_events_commands[] = {
{ ngx_string("events"),
NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
ngx_events_block,
0,
0,
NULL },
ngx_null_command
};
ngx_events_module模块只对一个块配置项感兴趣,也就是nginx.conf中必须有的events配置项。
定制化的功能是在ngx_events_block
中实现的,在此函数中,完成了对所有事件模块的初始化。
模块初始化
事件初始化是在ngx_events_block()
中完成的,过程如下图所示:
ctx_index指的是该模块在该类型模块中的id,其生成方式书上写的与当前代码中有些差异,真个过程就是初始化了各个事件模块的配置。
模块的配置
每一个事件模块都必须实现ngx_event_module_t接口,这个接口中允许每个事件模块建立自己的配置项结构体,用于存储感兴趣的配置项在nginx.conf中对应的参数。
typedef struct {
ngx_str_t *name;
void *(*create_conf)(ngx_cycle_t *cycle);
char *(*init_conf)(ngx_cycle_t *cycle, void *conf);
ngx_event_actions_t actions;
} ngx_event_module_t;
每一个事件模块产生的配置结构体指针都会被放到ngx_events_module模块创建的指针数组中,这个指针数组又与ngx_cycle_t核心结构体中的conf_ctx成员相关。
void ****conf_ctx
中有4个*
,它首先指向一个存放指针的数组,这个数组中的指针成员同时又指向了另外一个存放指针的数组,所以是4个*
。
3. ngx_event_core_module事件模块
模块定义
ngx_event_core_module该模块的定义如下:
ngx_module_t ngx_event_core_module = {
NGX_MODULE_V1,
&ngx_event_core_module_ctx, /* module context */
ngx_event_core_commands, /* module directives */
NGX_EVENT_MODULE, /* module type */
NULL, /* init master */
ngx_event_module_init, /* init module */
ngx_event_process_init, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
该结构体中除了ngx_events_module中的context与command,它还实现了ngx_event_module_init方法和ngx_event_process_init方法,回想ngx_init_cycle(),在Nginx启动过程中还没有fork出worker子进程时,会首先调用ngx_event_core_module模块的ngx_event_module_init方法,而在fork出worker子进程后,每一个worker进程会在调用ngx_event_core_module模块的ngx_event_process_init方法后才会进入正式的工作循环。ngx_event_module_init方法其实很简单,它主要初始化了一些变量,而ngx_event_process_init方法就做了许多事情。
ngx_event_core_commands数组
static ngx_command_t ngx_event_core_commands[] = {
// worker进行连接池大小,也就是每个worker进程中支持的TCP最大连接数
{ ngx_string("worker_connections"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_event_connections,
0,
0,
NULL },
// 确定选择哪一个事件模块作为事件驱动机制
{ ngx_string("use"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_event_use,
0,
0,
NULL },
// 对于epoll事件驱动模式来说,意味着在接收到一个新连接事件时,调用accept以尽可能多地接收连接
{ ngx_string("multi_accept"),
NGX_EVENT_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
0,
offsetof(ngx_event_conf_t, multi_accept),
NULL },
// 确定是否使用accept_mutex负载均衡锁,默认为开启
{ ngx_string("accept_mutex"),
NGX_EVENT_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
0,
offsetof(ngx_event_conf_t, accept_mutex),
NULL },
// 启用accept_mutex负载均衡锁后,延迟accept_mutex_delay毫秒后再试图处理新连接事件
{ ngx_string("accept_mutex_delay"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
0,
offsetof(ngx_event_conf_t, accept_mutex_delay),
NULL },
// 需要对来自指定IP的TCP连接打印debug级别的调试日志
{ ngx_string("debug_connection"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_event_debug_connection,
0,
0,
NULL },
ngx_null_command
};
与这个command数组对应的是ngx_event_conf_t
,将这些配置存储到了该结构体中,如下:
typedef struct {
ngx_uint_t connections;
ngx_uint_t use;
ngx_flag_t multi_accept;
ngx_flag_t accept_mutex;
ngx_msec_t accept_mutex_delay;
u_char *name;
#if (NGX_DEBUG)
ngx_array_t debug_connection;
#endif
} ngx_event_conf_t;
ngx_event_process_init方法
这个方法中完成了很多初始化的工作,包括定时器Timer、连接池Connection、事件数组read_events、write_events,并定义读事件的处理方法为ngx_event_accept(),最后将监听读事件添加到事件驱动模块ngx_add_event()
。
4. ngx_epoll_module事件驱动模块
epoll的原理与用法
epoll在Linux内核中申请了一个简易的文件系统,把原先的一个select或者poll调用分成了3个部分:
-
调用epoll_create建立1个epoll对象eventpoll(在epoll文件系统中给这个句柄分配资源)
-
调用epoll_ctl向epoll对象中添加这待处理的100万个套接字
-
调用epoll_wait收集发生事件的连接
在实际收集事件时,epoll_wait的效率就会非常高,因为调用epoll_wait时并没有向它传递这100万个连接,内核也不需要去遍历全部的连接,只传递了有事件发生的连接。
eventpoll对象中有两个重要成员:rbr监控的连接、rdllink右键发生的连接
模块定义
ngx_module_t ngx_epoll_module = {
NGX_MODULE_V1,
&ngx_epoll_module_ctx, /* module context */
ngx_epoll_commands, /* module directives */
NGX_EVENT_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
这个模块的两个参数ngx_epoll_commands与ngx_epoll_module_ctx,ngx_epoll_commands中说明了对哪些配置项感兴趣,以及conf的存储等内容,先来看看它
static ngx_command_t ngx_epoll_commands[] = {
// 这个配置项表示调用一次epoll_wait时最多可以返回的事件数,当然,它也会预分配那么多epoll_event结构体用于存储事件
{ ngx_string("epoll_events"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, events),
NULL },
// 指明在开启异步I/O且使用io_setup系统调用初始化异步I/O上下文环境时,初始分配的异步I/O事件个数
{ ngx_string("worker_aio_requests"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, aio_requests),
NULL },
ngx_null_command
};
其配置项结构体就是这两个参数:
typedef struct {
ngx_uint_t events;
ngx_uint_t aio_requests;
} ngx_epoll_conf_t;
下边来看看epoll是如何定义ngx_event_module_t事件模块接口的,也就是这个context
static ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */
{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};
初始化
首先从实现init接口的ngx_epoll_init方法讲起。ngx_epoll_init(),实在ngx_event_core_commands的模块的第4部中调用,它主要做了两件事情。
- 调用epoll_create方法创建epoll对象。
- 创建event_list数组,用于进行epoll_wait调用时传递内核态的事件。
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t *epcf;
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
if (ep == -1) {
ep = epoll_create(cycle->connection_n / 2);
if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
....
}
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
...
}
ep是epoll对象的描述符,nevents是上面说到的epoll_events配置项参数,它既指明了epoll_wait一次返回的最大事件数,也告诉了event_list应该分配的数组大小。
添加事件
对于epoll而言,并没有enable事件和disable事件的概念,另外,从ngx_epoll_module_ctx结构体中可以看出,enable和add接口都是使用ngx_epoll_add_event方法实现的,而disable和del接口都是使用ngx_epoll_del_event方法实现的。
下边先来看看ngx_epoll_add_event
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
c = ev->data;
events = (uint32_t) event;
// 根据event参数确定当前事件是读事件还是写事件,这会决定events是加上EPOLLIN标志位还是EPOLLOUT标志位
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}
// 根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
if (flags & NGX_EXCLUSIVE_EVENT) {
events &= ~EPOLLRDHUP;
}
#endif
// 加入flags参数到events标志位中
ee.events = events | (uint32_t) flags;
// ptr成员存储的是ngx_connection_t连接,
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
// 向epoll中添加事件或者在epoll中修改事件
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
// 将事件的active标志位置为1,表示当前事件是活跃的
ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif
return NGX_OK;
}
对事件的处理
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
/* NGX_TIMER_INFINITE == INFTIM */
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
// 调用epoll_wait获取事件
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
// Nginx对时间的缓存和管理,当flags标志位指示要更新时间时,就是在这里更新的
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
...
// 遍历本次epoll_wait返回的所有事件
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
// ptr成员就是ngx_connection_t连接的地址,但最后1位有特殊含义,需要把它屏蔽掉
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
// 取出读事件
rev = c->read;
// 判断这个读事件是否为过期事件
if (c->fd == -1 || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
// 取出事件类型
revents = event_list[i].events;
...
// 如果是读事件且该事件是活跃的
if ((revents & EPOLLIN) && rev->active) {
...
rev->ready = 1;
rev->available = -1;
if (flags & NGX_POST_EVENTS) {
// 如果要在post队列中延后处理该事件,首先要判断它是新连接事件还是普通事件,以决定把它加入到ngx_posted_accept_events队列或者ngx_posted_events队列中
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
// 立即调用读事件的回调方法来处理这个事件
rev->handler(rev);
}
}
// 取出写事件
wev = c->write;
// 如果是写事件且该事件是活跃的
if ((revents & EPOLLOUT) && wev->active) {
// // 判断这个写事件是否为过期事件
if (c->fd == -1 || wev->instance != instance) {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
if (flags & NGX_POST_EVENTS) {
// 将这个事件添加到post队列中延后处理
ngx_post_event(wev, &ngx_posted_events);
} else {
// 立即调用这个写事件的回调方法来处理这个事件
wev->handler(wev);
}
}
}
return NGX_OK;
}
ngx_epoll_process_events方法会收集当前触发的所有事件,对于不需要加入到post队列延后处理的事件,该方法会立刻执行它们的回调方法,这其实是在做分发事件的工作,只是它会在自己的进程中调用这些回调方法而已,因此,每一个回调方法都不能导致进程休眠或者消耗太多的时间,以免epoll不能即时地处理其他事件.
instance标志位为什么可以判断事件是否过期?从上面的代码可以看出,instance标志位的使用其实很简单,它利用了指针的最后一位一定是0这一特性。既然最后一位始终都是0,那么不如用来表示instance。这样,在使用ngx_epoll_add_event方法向epoll中添加事件时,就把epoll_event中联合成员data的ptr成员指向ngx_connection_t连接的地址,同时把最后一位置为这个事件的instance标志。而在ngx_epoll_process_events方法中取出指向连接的ptr地址时,先把最后一位instance取出来,再把ptr还原成正常的地址赋给ngx_connection_t连接。这样,instance究竟放在何处的问题也就解决了.
那么,过期事件又是怎么回事呢?举个例子,假设epoll_wait一次返回3个事件,在第1个事件的处理过程中,由于业务的需要,所以关闭了一个连接,而这个连接恰好对应第3个事件。这样的话,在处理到第3个事件时,这个事件就已经是过期事件了.
如何解决这个问题?依靠instance标志位。当调用ngx_get_connection从连接池中获取一个新连接时,instance标志位就会置反。这样,当这个ngx_connection_t连接重复使用时,它的instance标志位一定是不同的。因此,在ngx_epoll_process_events方法中一旦判断instance发生了变化,就认为这是过期事件而不予处理。
5. 定时器事件
缓存时间与时间更新
Nginx与一般的服务器不同,出于性能的考虑(不需要每次获取时间都调用gettimeofday方法),Nginx使用的时间是缓存在其内存中的,这样,在Nginx模块获取时间时,只是获取内存中的几个整型变量而已。
定时器的结构体主要有ngx_time_t
与ngx_tm_t
,一些时间的定义,这里不多说了。
这样缓存时间就带来对时间更新的要求,如何更新呢?
它是与ngx_epoll_process_events方法的调用频率及其flag参数相关的。实际上,Nginx还提供了设置更新缓存时间频率的功能(也就是至少每隔timer_resolution毫秒必须更新一次缓存时间),通过在nginx.conf文件中的timer_resolution配置项可以设置更新的最小频率,这样就保证了缓存时间的精度。下面看一下timer_resolution是如何起作用的。
- 在ngx_event_core_module模块初始化时会使用setitimer系统调用告诉内核每隔timer_resolution毫秒调用一次ngx_timer_signal_handler方法。
- 而ngx_timer_signal_handler方法则会将ngx_event_timer_alarm标志位设为1
- 这样一来,一旦调用ngx_epoll_process_events方法,如果间隔的时间超过timer_resolution毫秒,肯定会更新缓存时间。
定时器的原理
定时器是通过一棵红黑树实现的。ngx_event_timer_rbtree就是所有定时器事件组成的红黑树,而ngx_event_timer_sentinel就是这棵红黑树的哨兵节点。
这棵红黑树中的每个节点都是ngx_event_t事件中的timer成员,节点的关键字就是事件的超时时间。以这个超时时间的大小组成了二叉排序树ngx_event_timer_rbtree。
如果需要找出最有可能超时的事件,那么将ngx_event_timer_rbtree树中最左边的节点取出来即可。只要用当前时间去比较这个最左边节点的超时时间,就会知道这个事件有没有触发超时,如果还没有触发超时,那么会知道最少还要经过多少毫秒满足超时条件而触发超时。
提供了几个常用的函数:增加定时器事件void ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer)
,删除定时器事件void ngx_event_del_timer(ngx_event_t *ev)
。以及定时器事件的过期检查ngx_event_expire_timers
,它的代码如下:
void
ngx_event_expire_timers(void)
{
ngx_event_t *ev;
ngx_rbtree_node_t *node, *root, *sentinel;
sentinel = ngx_event_timer_rbtree.sentinel;
for ( ;; ) {
root = ngx_event_timer_rbtree.root;
if (root == sentinel) {
return;
}
// 获取最小的超时时间的节点
node = ngx_rbtree_min(root, sentinel);
/* node->key > ngx_current_msec */
// 判断它是否过期
if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
return;
}
// 获取事件
ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"event timer del: %d: %M",
ngx_event_ident(ev->data), ev->timer.key);
// 移除事件
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
#if (NGX_DEBUG)
ev->timer.left = NULL;
ev->timer.right = NULL;
ev->timer.parent = NULL;
#endif
ev->timer_set = 0;
ev->timedout = 1;
// 定时器回调
ev->handler(ev);
}
}
6. 事件处理流程
以上所有的介绍,其实都是为了本节
创建新连接
在ngx_event_core_module的ngx_event_process_init
中,将接收到连接之后回调函数设为了ngx_event_accept
,在这里完成了新连接的建立,其流程如下:
惊群现象
何谓“惊群”?master进程开始监听Web端口,fork出多个worker子进程,这些子进程开始同时监听同一个Web端口。一般情况下,有多少CPU核心,就会配置多少个worker子进程,这样所有的worker子进程都在承担着Web服务器的角色。但下面假定这样一个场景:没有用户连入
服务器,某一时刻恰好所有的worker子进程都休眠且等待新连接的系统调用(如epoll_wait),这时有一个用户向服务器发起了连接,内核在收到TCP的SYN包时,会激活所有的休眠worker子进程,当然,此时只有最先开始执行accept的子进程可以成功建立新连接,而其他worker子进程都会accept失败。这些accept失败的子进程被内核唤醒是不必要的,它们被唤醒后的执行很可能也是多余的,那么这一时刻它们占用了本不需要占用的系统资源,引发了不必要的进程上下文切换,增加了系统开销。
ps:epoll的唤醒在linux内核中是有解决的,它提供了唤醒的参数,可以选择全唤醒,也可以只唤醒一个。linux自己也解决了这个问题,也就是利用accept_mutex,使同一时刻只能有唯一一个worker子进程监听Web端口,这样就不会发生“惊
群”了,此时新连接事件只能唤醒唯一正在监听端口的worker子进程。
先来看看accept_mutex的获取
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
// 获取锁
if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex locked");
if (ngx_accept_mutex_held && ngx_accept_events == 0) {
return NGX_OK;
}
// 将所有监听连接的读事件添加到当前的epoll等事件驱动模块中
if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
ngx_shmtx_unlock(&ngx_accept_mutex);
return NGX_ERROR;
}
ngx_accept_events = 0;
ngx_accept_mutex_held = 1;
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex lock failed: %ui", ngx_accept_mutex_held);
// 没有获取到,且ngx_accept_mutex_held,需要将所有监听连接的读事件从事件驱动模块中移除
if (ngx_accept_mutex_held) {
if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_accept_mutex_held = 0;
}
return NGX_OK;
}
如果ngx_trylock_accept_mutex方法没有获取到锁,接下来调用事件驱动模块的process_events方法时只能处理已有的连接上的事件;如果获取到了锁,调用process_events方法时就会既处理已有连接上的事件,也处理新连接的事件。
在worker进程的工作循环中,如果获取到了,则将flag加上NGX_POST_EVENTS标志。
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
}
在process_events方法中,新连接事件全部放到ngx_posted_accept_events队列中,普通事件则放到ngx_posted_events队列中。
if ((revents & EPOLLIN) && rev->active) {
...
rev->ready = 1;
rev->available = -1;
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
rev->handler(rev);
}
}
负载均衡
有一个全局变量ngx_accept_disabled,它就是负载均衡机制实现的关键阈值。ngx_accept_disabled的值就是一个负数,其值为连接总数的7/8。其实,ngx_accept_disabled的用法很简单,当它为负数时,不会进行触发负载均衡操作;而
当ngx_accept_disabled是正数时,就会触发Nginx进行负载均衡操作了。Nginx的做法也很简单,就是当ngx_accept_disabled是正数时当前进程将不再处理新连接事件,取而代之的仅仅是ngx_accept_disabled值减1。
初始话是在ngx_event_accept()中设置的
ngx_accept_disabled = ngx_cycle->connection_n / 8
- ngx_cycle->free_connection_n;
是用是在worker主线程中中使用
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
}
ngx_process_events_and_timers流程
这里就是worker的主循环
根据不论是获取锁还是负载均衡都在这里进行着控制,这里主要的事件处理包括第7步ngx_process_events(),也就是在epoll模块中定义的ngx_epoll_process_events()
,再就是第8步处理新连接的ngx_event_process_posted
,第10步的处理定时器事件ngx_event_expire_timers();
,最后是第11步,它是处理普通的事件。
7. TCP协议
前边学习过TCP协议,从理论上进行了学习,这里讲TCP协议,是nginx中看看tcp协议的细节。
三次握手
- 内核在我们调用listen方法时,就已经为这个监听端口建立了SYN队列和ACCEPT队列,
- 当客户端使用connect方法向服务器发起TCP连接,随后图中1.1步骤客户端的SYN包到达了服务器后,内核会把这一信息放到SYN队列(即未完成握手队列)中,同时回一个SYN+ACK包给客户端。
- 2.1步骤中客户端再次发来了针对服务器SYN包的ACK网络分组时,内核会把连接从SYN队列中取出,再把这个连接放到ACCEPT队列(即已完成握手队列)中。
- 而服务器在第3步调用accept方法建立连接时,其实就是直接从ACCEPT队列中取出已经建好的连接而已。
如果大量连接同时到来,而应用程序不能及时地调用accept方法,就会导致以上两个队列满(ACCEPT队列满,进而也会导致SYN队列满),从而导致连接无法建立。这其实很常见,比如Nginx的每个worker进程都负责调用accept方法,如果一个Nginx模块在处理请求时长时间陷入了某个方法的执行中(如执行计算或者等待IO),就有可能导致新连接无法建立。
send流程
send流程如下:
这里的分片,TCP连接建立时,就可以判断出双方的网络间最适宜的、不会被再次切分的报文大小,TCP层把它叫做MSS最大报文段长度。假定待发送的内存将按照MSS被切分为3个报文,应用程序在第1步调用send方法、第10步send方法返回之间,内核的主要任务是把用户态的内存内容拷贝到内核态的TCP缓冲区上。从这里看到,send方法成功返回并不等于就把报文发送出去了。
recv流程
流程如下图:
当调用recv这样的方法接收报文时,Nginx是基于事件驱动的,也就是说只有epoll通知worker进程收到了网络报文,recv才会被调用。在第1~4步表示接收到了无序的报文后,内核是怎样重新排序的;第5步开始,应用程序调用了recv方法,内核开始把TCP读缓冲区的内容拷贝到应用程序的用户态内存中,第13步recv方法返回拷贝的字节数。
图中用到了linux内核中为TCP准备的2个队列:
receive队列是允许用户进程直接读取的,它是将已经接收到的TCP报文,去除了TCP头部、排好序放入的、用户进程可以直接按序读取的队列;
out_of_order队列存放乱序的报文。
8. 小结
本文一边看书,一边看代码,对事件的的流程进行了较为消息的整理。
这里看到了3个模块:ngx_event_module、ngx_event_core_module、ngx_epoll_module。ngx_event_module调用了各个模块的init_conf,ngx_event_core_module为各个事件模块分配了一些列资源、ngx_epoll_module是事件的驱动模块,事件的init、add、del、process等都在这里定义。
看到worker进程主循环中主要的事件处理循环:ngx_process_events_and_timers
。
模块之间的关系、模块与进程之间的关系都能清除的看到了。