nginx事件模块

0. 前言

本文是继续上一节来看看事件模块,事件模块是nginx的核心,它负责事件的收集、管理、分发。这里所说的事件,主要以网络事件和定时器事件为主,而网络事件中又以TCP网络事件为主(Nginx毕竟是个Web服务器)。

1. 基本结构

ngx_event_t

简单看就是一个连接data,一个定时器timer,一个事件处理函数handler,以及一堆标志位。定义如下:

struct ngx_event_s {
    // 事件相关的对象。通常data都是指向ngx_connection_t连接对象。开启文件异步I/O时,它可能会指向ngx_event_aio_t结构体
    void            *data;

    //标志位,为1时表示事件是可写的。通常情况下,它表示对应的TCP连接目前状态是可写的,也就是连接处于可以发送网络包的状态
    unsigned         write:1;

    //标志位,为1时表示为此事件可以建立新的连接。通常情况下,在ngx_cycle_t中的listening动态数组中,每一个监听对象ngx_listening_t对应的读事件中的accept标志位才会是1
    unsigned         accept:1;

    // 这个标志位用于区分当前事件是否是过期的,它仅仅是给事件驱动模块使用的,而事件消费模块可不用关心
    unsigned         instance:1;

    // 为1时表示当前事件是活跃的,为0时表示事件是不活跃的。这个状态对应着事件驱动模块处理方式的不同。例如,在添加事件、删除事件和处理事件时,active标志位的不同都会对应着不同的处理方式。
    unsigned         active:1;

    // 为1时表示禁用事件,仅在kqueue或者rtsig事件驱动模块中有效,而对于epoll事件驱动模块则无意义
    unsigned         disabled:1;

	// 为1时表示当前事件已经准备就绪,也就是说,允许这个事件的消费模块处理这个事件。在HTTP框架中,经常会检查事件的ready标志位以确定是否可以接收请求或者发送响应
    unsigned         ready:1;

    unsigned         oneshot:1;

    // 异步aio已经完成表示,该标志位用于异步AIO事件的处理
    unsigned         complete:1;

    // 为1时表示当前处理的字符流已经结束
    unsigned         eof:1;
    
    // 为1时表示事件在处理过程中出现错误
    unsigned         error:1;

    // 为1时表示这个事件已经超时,用以提示事件的消费模块做超时处理
    unsigned         timedout:1;
    // 为1时表示这个事件存在于定时器中
    unsigned         timer_set:1;

    // 为1时表示需要延迟处理这个事件,它仅用于限速功能
    unsigned         delayed:1;

    // 为1时表示延迟建立TCP连接,也就是说,经过TCP三次握手后并不建立连接,而是要等到真正收到数据包后才会建立TCP连接
    unsigned         deferred_accept:1;

    // 只与kqueue和aio事件驱动机制有关,与epoll无关
    unsigned         pending_eof:1;

    unsigned         posted:1;

    // 为1时表示当前事件已经关闭,epoll模块没有使用它
    unsigned         closed:1;

    /* to test on worker exit */
    unsigned         channel:1;
    unsigned         resolver:1;

    unsigned         cancelable:1;

#if (NGX_HAVE_KQUEUE)
    unsigned         kq_vnode:1;

    /* the pending errno reported by kqueue */
    int              kq_errno;
#endif

    /*
     * kqueue only:
     *   accept:     number of sockets that wait to be accepted
     *   read:       bytes to read when event is ready
     *               or lowat when event is set with NGX_LOWAT_EVENT flag
     *   write:      available space in buffer when event is ready
     *               or lowat when event is set with NGX_LOWAT_EVENT flag
     *
     * iocp: TODO
     *
     * otherwise:
     *   accept:     1 if accept many, 0 otherwise
     *   read:       bytes to read when event is ready, -1 if not known
     */

    // 在epoll事件驱动机制下表示一次尽可能多地建立TCP连接,它与multi_accept配置项对应
    int              available;

    // // 这个事件发生时的处理方法,每个事件消费模块都会重新实现它
    ngx_event_handler_pt  handler;


#if (NGX_HAVE_IOCP)
    ngx_event_ovlp_t ovlp;
#endif

    // epoll事件驱动方式不使用
    ngx_uint_t       index;

    // 记录error_log日志的ngx_log_t对象
    ngx_log_t       *log;

    // 定时器节点,用于定时器红黑树中
    ngx_rbtree_node_t   timer;

    /* the posted queue */
    ngx_queue_t      queue;

#if 0

    /* the threads support */

    /*
     * the event thread context, we store it here
     * if $(CC) does not understand __thread declaration
     * and pthread_getspecific() is too costly
     */

    void            *thr_ctx;

#if (NGX_EVENT_T_PADDING)

    /* event should not cross cache line in SMP */

    uint32_t         padding[NGX_EVENT_T_PADDING];
#endif
#endif
};

事件是不需要创建的,因为Nginx在启动时已经在ngx_cycle_t的read_events成员中预分配了所有的读事件,一般在向epoll中添加可读或者可写事件时,都是使用ngx_handle_read_event或者ngx_handle_write_event方法的。

ngx_int_t
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags)
{
    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        /* kqueue, epoll */
        if (!rev->active && !rev->ready) {
            if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT)
                == NGX_ERROR)
            {
                return NGX_ERROR;
            }
        }
        .....
    }

没有准备好就通过ngx_add_event加入到事件驱动模块,ngx_handle_write_event与此类似。

ngx_connection_t

这里的连接指的是指客户端发起的、服务器被动接受的连接,这样的连接都是使用ngx_connection_t结构体表示的。

这里的连接是预先分配的,什么是连接呢,首先是有socket(fd),其次是事件(read或者write),然后收发网络字节流的方法(recv、send),还有就是对这个连接的内存分配(pool、buffer),最后是一些标志位(timedout、idle、close、reusable、error等)。

struct ngx_connection_s {
    
    // 连接未使用时,data成员用于充当连接池中空闲连接链表中的next指针。当连接被使用时,data的意义由使用它的Nginx模块而定,如在HTTP框架中,data指向ngx_http_request_t请求
    void               *data;
    
    // 连接对应的读事件
    ngx_event_t        *read;
    
    // 连接对应的写事件
    ngx_event_t        *write;

    // 套接字句柄
    ngx_socket_t        fd;

    // 直接接收网络字符流的方法
    ngx_recv_pt         recv;
    // 直接发送网络字符流的方法
    ngx_send_pt         send;
    // 以ngx_chain_t链表为参数来接收网络字符流的方法
    ngx_recv_chain_pt   recv_chain;
    // 以ngx_chain_t链表为参数来发送网络字符流的方法
    ngx_send_chain_pt   send_chain;

    // 这个连接对应的ngx_listening_t监听对象,此连接由listening监听端口的事件建立
    ngx_listening_t    *listening;

    // // 这个连接上已经发送出去的字节数
    off_t               sent;

    ngx_log_t          *log;

    // 内存池。一般在accept一个新连接时,会创建一个内存池,而在这个连接结束时会销毁内存池。注意,这里所说的连接是指成功建立的TCP连接,所有的ngx_connection_t结构体都是预分配的。这个内存池的大小将由上面的listening监听对象中的pool_size成员决定
    ngx_pool_t         *pool;

    int                 type;

    // 连接客户端的sockaddr结构体
    struct sockaddr    *sockaddr;
    socklen_t           socklen;
    ngx_str_t           addr_text;

    ngx_proxy_protocol_t  *proxy_protocol;

#if (NGX_SSL || NGX_COMPAT)
    ngx_ssl_connection_t  *ssl;
#endif

    ngx_udp_connection_t  *udp;

    // 本机的监听端口对应的sockaddr结构体,也就是listening监听对象中的sockaddr成员
    struct sockaddr    *local_sockaddr;
    socklen_t           local_socklen;

    // 用于接收、缓存客户端发来的字符流,每个事件消费模块可自由决定从连接池中分配多大的空间给buffer这个接收缓存字段。例如,在HTTP模块中,它的大小决定于client_header_buffer_size配置项
    ngx_buf_t          *buffer;

    // 该字段用来将当前连接以双向链表元素的形式添加到ngx_cycle_t核心结构体的reusable_connections_queue双向链表中,表示可以重用的连接
    ngx_queue_t         queue;

    // 连接使用次数。ngx_connection_t结构体每次建立一条来自客户端的连接,或者用于主动向后端服务器发起连接时(ngx_peer_connection_t也使用它),number都会加1
    ngx_atomic_uint_t   number;

    ngx_msec_t          start_time;
    // 处理的请求次数
    ngx_uint_t          requests;

    unsigned            buffered:8;

    unsigned            log_error:3;     /* ngx_connection_log_error_e */

    unsigned            timedout:1;
    unsigned            error:1;
    unsigned            destroyed:1;

    unsigned            idle:1;
    unsigned            reusable:1;
    unsigned            close:1;
    unsigned            shared:1;

    unsigned            sendfile:1;
    unsigned            sndlowat:1;
    unsigned            tcp_nodelay:2;   /* ngx_connection_tcp_nodelay_e */
    unsigned            tcp_nopush:2;    /* ngx_connection_tcp_nopush_e */

    unsigned            need_last_buf:1;

#if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT)
    unsigned            busy_count:2;
#endif

#if (NGX_THREADS || NGX_COMPAT)
    ngx_thread_task_t  *sendfile_task;
#endif
};

ngx_peer_connection_s

作为Web服务器,Nginx也需要向其他服务器主动发起连接,这样的连接与上边介绍的被动连接是不同的,这种主动连接用ngx_peer_connection_t来标志,其定义如下:

struct ngx_peer_connection_s {
    
    // 一个主动连接实际上也需要ngx_connection_t 结构体中的大部分成员,并且出于重用的考虑而定义了connection 成员
    ngx_connection_t                *connection;

    // 远端服务器的socket 地址
    struct sockaddr                 *sockaddr;
    socklen_t                        socklen;
    // 远端服务器的名称
    ngx_str_t                       *name;

    // 表示在连接一个远端服务器时,当前连接出现异常失败后可以重试的次数,也就是允许的最多失败次数
    ngx_uint_t                       tries;
    ngx_msec_t                       start_time;

    // 获取连接的方法,如果使用长连接构成的连接池,那么必须要实现get 方法
    ngx_event_get_peer_pt            get;
    // 与get 方法对应的释放连接的方法
    ngx_event_free_peer_pt           free;
    ngx_event_notify_peer_pt         notify;
    
    // 这个data 指针仅用于和上面的get 、free 方法配合传递参数
    void                            *data;

#if (NGX_SSL || NGX_COMPAT)
    ngx_event_set_peer_session_pt    set_session;
    ngx_event_save_peer_session_pt   save_session;
#endif

    // 本机地址信息
    ngx_addr_t                      *local;

    int                              type;
    // 套接字的接收缓冲区大小
    int                              rcvbuf;

    // 记录日志的ngx_log_t 对象
    ngx_log_t                       *log;

    // 为1 时表示上面的connection 连接已经缓存
    unsigned                         cached:1;
    unsigned                         transparent:1;
    unsigned                         so_keepalive:1;
    unsigned                         down:1;

                                     /* ngx_connection_log_error_e */
    unsigned                         log_error:2;

    NGX_COMPAT_BEGIN(2)
    NGX_COMPAT_END
};

连接池

Nginx在接受客户端的连接时,所使用的ngx_connection_t结构体都是在启动阶段就预分配好的,使用时从连接池中获取空闲连接。这些连接池都是在上篇讲的ngx_cycle_t结构体中,如下图所示:

操作连接池2个方法:ngx_get_connection()获取控线连接,同时获取相应的读/写事件,ngx_free_connection()回收连接。

2. ngx_events_module核心模块

模块定义

ngx_events_module是一个核心模块(type:NGX_CORE_MODULE),它定义了一类新的模块:事件模块(type:NGX_EVENT_MODULE)。并定义了每个事件模块都需要实现的ngx_event_module_t接口,还需要管理这些事件模块生成的配置项结构体,并解析事件类配置项,当然,在解析配置项时会调用其在ngx_command_t数组中定义的回调方法。

ngx_events_module

ngx_module_t  ngx_events_module = {
    NGX_MODULE_V1,
    &ngx_events_module_ctx,                /* module context */
    ngx_events_commands,                   /* module directives */
    NGX_CORE_MODULE,                       /* module type */
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    NULL,                                  /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

这里主要就是定义了ngx_events_commands。ngx_events_module_ctx的init做了一些验证工作,并没实质性的工作。

ngx_command_t

定义一个Nginx模块就是在实现ngx_modult_t结构体,这里需要先定义好ngx_command_t数组,何模块都是以配置项来定制功能的。ngx_events_commands数组决定了ngx_events_module模块是如何定制其功能的。

static ngx_command_t  ngx_events_commands[] = {

    { ngx_string("events"),
      NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
      ngx_events_block,
      0,
      0,
      NULL },

      ngx_null_command
};

ngx_events_module模块只对一个块配置项感兴趣,也就是nginx.conf中必须有的events配置项。

定制化的功能是在ngx_events_block中实现的,在此函数中,完成了对所有事件模块的初始化。

模块初始化

事件初始化是在ngx_events_block()中完成的,过程如下图所示:

nginx核心模块加载事件模块

ctx_index指的是该模块在该类型模块中的id,其生成方式书上写的与当前代码中有些差异,真个过程就是初始化了各个事件模块的配置。

模块的配置

每一个事件模块都必须实现ngx_event_module_t接口,这个接口中允许每个事件模块建立自己的配置项结构体,用于存储感兴趣的配置项在nginx.conf中对应的参数。

typedef struct {
    ngx_str_t              *name;
    void                 *(*create_conf)(ngx_cycle_t *cycle);
    char                 *(*init_conf)(ngx_cycle_t *cycle, void *conf);

    ngx_event_actions_t     actions;
} ngx_event_module_t;

每一个事件模块产生的配置结构体指针都会被放到ngx_events_module模块创建的指针数组中,这个指针数组又与ngx_cycle_t核心结构体中的conf_ctx成员相关。

void ****conf_ctx中有4个*,它首先指向一个存放指针的数组,这个数组中的指针成员同时又指向了另外一个存放指针的数组,所以是4个*

3. ngx_event_core_module事件模块

模块定义

ngx_event_core_module该模块的定义如下:

ngx_module_t  ngx_event_core_module = {
    NGX_MODULE_V1,
    &ngx_event_core_module_ctx,            /* module context */
    ngx_event_core_commands,               /* module directives */
    NGX_EVENT_MODULE,                      /* module type */
    NULL,                                  /* init master */
    ngx_event_module_init,                 /* init module */
    ngx_event_process_init,                /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

该结构体中除了ngx_events_module中的context与command,它还实现了ngx_event_module_init方法和ngx_event_process_init方法,回想ngx_init_cycle(),在Nginx启动过程中还没有fork出worker子进程时,会首先调用ngx_event_core_module模块的ngx_event_module_init方法,而在fork出worker子进程后,每一个worker进程会在调用ngx_event_core_module模块的ngx_event_process_init方法后才会进入正式的工作循环。ngx_event_module_init方法其实很简单,它主要初始化了一些变量,而ngx_event_process_init方法就做了许多事情。

ngx_event_core_commands数组

static ngx_command_t  ngx_event_core_commands[] = {
	// worker进行连接池大小,也就是每个worker进程中支持的TCP最大连接数
    { ngx_string("worker_connections"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_event_connections,
      0,
      0,
      NULL },

    // 确定选择哪一个事件模块作为事件驱动机制
    { ngx_string("use"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_event_use,
      0,
      0,
      NULL },
	
    // 对于epoll事件驱动模式来说,意味着在接收到一个新连接事件时,调用accept以尽可能多地接收连接
    { ngx_string("multi_accept"),
      NGX_EVENT_CONF|NGX_CONF_FLAG,
      ngx_conf_set_flag_slot,
      0,
      offsetof(ngx_event_conf_t, multi_accept),
      NULL },

    // 确定是否使用accept_mutex负载均衡锁,默认为开启
    { ngx_string("accept_mutex"),
      NGX_EVENT_CONF|NGX_CONF_FLAG,
      ngx_conf_set_flag_slot,
      0,
      offsetof(ngx_event_conf_t, accept_mutex),
      NULL },

    // 启用accept_mutex负载均衡锁后,延迟accept_mutex_delay毫秒后再试图处理新连接事件
    { ngx_string("accept_mutex_delay"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_msec_slot,
      0,
      offsetof(ngx_event_conf_t, accept_mutex_delay),
      NULL },

    // 需要对来自指定IP的TCP连接打印debug级别的调试日志
    { ngx_string("debug_connection"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_event_debug_connection,
      0,
      0,
      NULL },

      ngx_null_command
};

​ 与这个command数组对应的是ngx_event_conf_t,将这些配置存储到了该结构体中,如下:

typedef struct {
    ngx_uint_t    connections;
    ngx_uint_t    use;

    ngx_flag_t    multi_accept;
    ngx_flag_t    accept_mutex;

    ngx_msec_t    accept_mutex_delay;

    u_char       *name;

#if (NGX_DEBUG)
    ngx_array_t   debug_connection;
#endif
} ngx_event_conf_t;

ngx_event_process_init方法

这个方法中完成了很多初始化的工作,包括定时器Timer、连接池Connection、事件数组read_events、write_events,并定义读事件的处理方法为ngx_event_accept(),最后将监听读事件添加到事件驱动模块ngx_add_event()

4. ngx_epoll_module事件驱动模块

epoll的原理与用法

epoll在Linux内核中申请了一个简易的文件系统,把原先的一个select或者poll调用分成了3个部分:

  • 调用epoll_create建立1个epoll对象eventpoll(在epoll文件系统中给这个句柄分配资源)

  • 调用epoll_ctl向epoll对象中添加这待处理的100万个套接字

  • 调用epoll_wait收集发生事件的连接

    在实际收集事件时,epoll_wait的效率就会非常高,因为调用epoll_wait时并没有向它传递这100万个连接,内核也不需要去遍历全部的连接,只传递了有事件发生的连接。

eventpoll对象中有两个重要成员:rbr监控的连接、rdllink右键发生的连接

模块定义

ngx_module_t  ngx_epoll_module = {
    NGX_MODULE_V1,
    &ngx_epoll_module_ctx,               /* module context */
    ngx_epoll_commands,                  /* module directives */
    NGX_EVENT_MODULE,                    /* module type */
    NULL,                                /* init master */
    NULL,                                /* init module */
    NULL,                                /* init process */
    NULL,                                /* init thread */
    NULL,                                /* exit thread */
    NULL,                                /* exit process */
    NULL,                                /* exit master */
    NGX_MODULE_V1_PADDING
};

这个模块的两个参数ngx_epoll_commands与ngx_epoll_module_ctx,ngx_epoll_commands中说明了对哪些配置项感兴趣,以及conf的存储等内容,先来看看它

static ngx_command_t  ngx_epoll_commands[] = {
	// 这个配置项表示调用一次epoll_wait时最多可以返回的事件数,当然,它也会预分配那么多epoll_event结构体用于存储事件
    { ngx_string("epoll_events"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_num_slot,
      0,
      offsetof(ngx_epoll_conf_t, events),
      NULL },

    // 指明在开启异步I/O且使用io_setup系统调用初始化异步I/O上下文环境时,初始分配的异步I/O事件个数
    { ngx_string("worker_aio_requests"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_num_slot,
      0,
      offsetof(ngx_epoll_conf_t, aio_requests),
      NULL },

      ngx_null_command
};

其配置项结构体就是这两个参数:

typedef struct {
    ngx_uint_t  events;
    ngx_uint_t  aio_requests;
} ngx_epoll_conf_t;

下边来看看epoll是如何定义ngx_event_module_t事件模块接口的,也就是这个context

static ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
#if (NGX_HAVE_EVENTFD)
        ngx_epoll_notify,                /* trigger a notify */
#else
        NULL,                            /* trigger a notify */
#endif
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

初始化

首先从实现init接口的ngx_epoll_init方法讲起。ngx_epoll_init(),实在ngx_event_core_commands的模块的第4部中调用,它主要做了两件事情。

  • 调用epoll_create方法创建epoll对象。
  • 创建event_list数组,用于进行epoll_wait调用时传递内核态的事件。
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;

    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    if (ep == -1) {
        ep = epoll_create(cycle->connection_n / 2);

        if (ep == -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }
        ....
    }
    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }

        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }
   ...
}

ep是epoll对象的描述符,nevents是上面说到的epoll_events配置项参数,它既指明了epoll_wait一次返回的最大事件数,也告诉了event_list应该分配的数组大小。

添加事件

对于epoll而言,并没有enable事件和disable事件的概念,另外,从ngx_epoll_module_ctx结构体中可以看出,enable和add接口都是使用ngx_epoll_add_event方法实现的,而disable和del接口都是使用ngx_epoll_del_event方法实现的。

下边先来看看ngx_epoll_add_event

static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    int                  op;
    uint32_t             events, prev;
    ngx_event_t         *e;
    ngx_connection_t    *c;
    struct epoll_event   ee;

    c = ev->data;

    events = (uint32_t) event;

    // 根据event参数确定当前事件是读事件还是写事件,这会决定events是加上EPOLLIN标志位还是EPOLLOUT标志位
    if (event == NGX_READ_EVENT) {
        e = c->write;
        prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
        events = EPOLLIN|EPOLLRDHUP;
#endif

    } else {
        e = c->read;
        prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
        events = EPOLLOUT;
#endif
    }

    // 根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件
    if (e->active) {
        op = EPOLL_CTL_MOD;
        events |= prev;

    } else {
        op = EPOLL_CTL_ADD;
    }

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
    if (flags & NGX_EXCLUSIVE_EVENT) {
        events &= ~EPOLLRDHUP;
    }
#endif

    // 加入flags参数到events标志位中
    ee.events = events | (uint32_t) flags;
    // ptr成员存储的是ngx_connection_t连接,
    ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "epoll add event: fd:%d op:%d ev:%08XD",
                   c->fd, op, ee.events);

    // 向epoll中添加事件或者在epoll中修改事件
    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }
	// 将事件的active标志位置为1,表示当前事件是活跃的
    ev->active = 1;
#if 0
    ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

    return NGX_OK;
}


对事件的处理

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev;
    ngx_queue_t       *queue;
    ngx_connection_t  *c;

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);

    // 调用epoll_wait获取事件
    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;

    // Nginx对时间的缓存和管理,当flags标志位指示要更新时间时,就是在这里更新的
    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }

    ...
     // 遍历本次epoll_wait返回的所有事件

    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;

        // ptr成员就是ngx_connection_t连接的地址,但最后1位有特殊含义,需要把它屏蔽掉
        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        // 取出读事件
        rev = c->read;

        // 判断这个读事件是否为过期事件
        if (c->fd == -1 || rev->instance != instance) {

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        // 取出事件类型
        revents = event_list[i].events;
		...

         // 如果是读事件且该事件是活跃的
        if ((revents & EPOLLIN) && rev->active) {
		...

            rev->ready = 1;
            rev->available = -1;

            if (flags & NGX_POST_EVENTS) {
                // 如果要在post队列中延后处理该事件,首先要判断它是新连接事件还是普通事件,以决定把它加入到ngx_posted_accept_events队列或者ngx_posted_events队列中
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                ngx_post_event(rev, queue);

            } else {
                // 立即调用读事件的回调方法来处理这个事件
                rev->handler(rev);
            }
        }

  		// 取出写事件
        wev = c->write;

        // 如果是写事件且该事件是活跃的
        if ((revents & EPOLLOUT) && wev->active) {

            // // 判断这个写事件是否为过期事件
            if (c->fd == -1 || wev->instance != instance) {
                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            wev->ready = 1;
#if (NGX_THREADS)
            wev->complete = 1;
#endif

            if (flags & NGX_POST_EVENTS) {
                // 将这个事件添加到post队列中延后处理
                ngx_post_event(wev, &ngx_posted_events);

            } else {
                // 立即调用这个写事件的回调方法来处理这个事件
                wev->handler(wev);
            }
        }
    }

    return NGX_OK;
}

ngx_epoll_process_events方法会收集当前触发的所有事件,对于不需要加入到post队列延后处理的事件,该方法会立刻执行它们的回调方法,这其实是在做分发事件的工作,只是它会在自己的进程中调用这些回调方法而已,因此,每一个回调方法都不能导致进程休眠或者消耗太多的时间,以免epoll不能即时地处理其他事件.

instance标志位为什么可以判断事件是否过期?从上面的代码可以看出,instance标志位的使用其实很简单,它利用了指针的最后一位一定是0这一特性。既然最后一位始终都是0,那么不如用来表示instance。这样,在使用ngx_epoll_add_event方法向epoll中添加事件时,就把epoll_event中联合成员data的ptr成员指向ngx_connection_t连接的地址,同时把最后一位置为这个事件的instance标志。而在ngx_epoll_process_events方法中取出指向连接的ptr地址时,先把最后一位instance取出来,再把ptr还原成正常的地址赋给ngx_connection_t连接。这样,instance究竟放在何处的问题也就解决了.

那么,过期事件又是怎么回事呢?举个例子,假设epoll_wait一次返回3个事件,在第1个事件的处理过程中,由于业务的需要,所以关闭了一个连接,而这个连接恰好对应第3个事件。这样的话,在处理到第3个事件时,这个事件就已经是过期事件了.

如何解决这个问题?依靠instance标志位。当调用ngx_get_connection从连接池中获取一个新连接时,instance标志位就会置反。这样,当这个ngx_connection_t连接重复使用时,它的instance标志位一定是不同的。因此,在ngx_epoll_process_events方法中一旦判断instance发生了变化,就认为这是过期事件而不予处理。

5. 定时器事件

缓存时间与时间更新

Nginx与一般的服务器不同,出于性能的考虑(不需要每次获取时间都调用gettimeofday方法),Nginx使用的时间是缓存在其内存中的,这样,在Nginx模块获取时间时,只是获取内存中的几个整型变量而已。

定时器的结构体主要有ngx_time_tngx_tm_t,一些时间的定义,这里不多说了。

这样缓存时间就带来对时间更新的要求,如何更新呢?

它是与ngx_epoll_process_events方法的调用频率及其flag参数相关的。实际上,Nginx还提供了设置更新缓存时间频率的功能(也就是至少每隔timer_resolution毫秒必须更新一次缓存时间),通过在nginx.conf文件中的timer_resolution配置项可以设置更新的最小频率,这样就保证了缓存时间的精度。下面看一下timer_resolution是如何起作用的。

  • 在ngx_event_core_module模块初始化时会使用setitimer系统调用告诉内核每隔timer_resolution毫秒调用一次ngx_timer_signal_handler方法。
  • 而ngx_timer_signal_handler方法则会将ngx_event_timer_alarm标志位设为1
  • 这样一来,一旦调用ngx_epoll_process_events方法,如果间隔的时间超过timer_resolution毫秒,肯定会更新缓存时间。

定时器的原理

定时器是通过一棵红黑树实现的。ngx_event_timer_rbtree就是所有定时器事件组成的红黑树,而ngx_event_timer_sentinel就是这棵红黑树的哨兵节点。

这棵红黑树中的每个节点都是ngx_event_t事件中的timer成员,节点的关键字就是事件的超时时间。以这个超时时间的大小组成了二叉排序树ngx_event_timer_rbtree。

如果需要找出最有可能超时的事件,那么将ngx_event_timer_rbtree树中最左边的节点取出来即可。只要用当前时间去比较这个最左边节点的超时时间,就会知道这个事件有没有触发超时,如果还没有触发超时,那么会知道最少还要经过多少毫秒满足超时条件而触发超时。

提供了几个常用的函数:增加定时器事件void ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer),删除定时器事件void ngx_event_del_timer(ngx_event_t *ev)。以及定时器事件的过期检查ngx_event_expire_timers,它的代码如下:

void
ngx_event_expire_timers(void)
{
    ngx_event_t        *ev;
    ngx_rbtree_node_t  *node, *root, *sentinel;

    sentinel = ngx_event_timer_rbtree.sentinel;

    for ( ;; ) {
        root = ngx_event_timer_rbtree.root;

        if (root == sentinel) {
            return;
        }
		// 获取最小的超时时间的节点
        node = ngx_rbtree_min(root, sentinel);

        /* node->key > ngx_current_msec */

        // 判断它是否过期
        if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
            return;
        }

        // 获取事件
        ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));

        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                       "event timer del: %d: %M",
                       ngx_event_ident(ev->data), ev->timer.key);

        // 移除事件
        ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);

#if (NGX_DEBUG)
        ev->timer.left = NULL;
        ev->timer.right = NULL;
        ev->timer.parent = NULL;
#endif

        ev->timer_set = 0;

        ev->timedout = 1;

        // 定时器回调
        ev->handler(ev);
    }
}

6. 事件处理流程

以上所有的介绍,其实都是为了本节

创建新连接

在ngx_event_core_module的ngx_event_process_init中,将接收到连接之后回调函数设为了ngx_event_accept,在这里完成了新连接的建立,其流程如下:

惊群现象

何谓“惊群”?master进程开始监听Web端口,fork出多个worker子进程,这些子进程开始同时监听同一个Web端口。一般情况下,有多少CPU核心,就会配置多少个worker子进程,这样所有的worker子进程都在承担着Web服务器的角色。但下面假定这样一个场景:没有用户连入
服务器,某一时刻恰好所有的worker子进程都休眠且等待新连接的系统调用(如epoll_wait),这时有一个用户向服务器发起了连接,内核在收到TCP的SYN包时,会激活所有的休眠worker子进程,当然,此时只有最先开始执行accept的子进程可以成功建立新连接,而其他worker子进程都会accept失败。这些accept失败的子进程被内核唤醒是不必要的,它们被唤醒后的执行很可能也是多余的,那么这一时刻它们占用了本不需要占用的系统资源,引发了不必要的进程上下文切换,增加了系统开销。

ps:epoll的唤醒在linux内核中是有解决的,它提供了唤醒的参数,可以选择全唤醒,也可以只唤醒一个。linux自己也解决了这个问题,也就是利用accept_mutex,使同一时刻只能有唯一一个worker子进程监听Web端口,这样就不会发生“惊
群”了,此时新连接事件只能唤醒唯一正在监听端口的worker子进程。

先来看看accept_mutex的获取

ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    // 获取锁
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held && ngx_accept_events == 0) {
            return NGX_OK;
        }

        // 将所有监听连接的读事件添加到当前的epoll等事件驱动模块中
        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    // 没有获取到,且ngx_accept_mutex_held,需要将所有监听连接的读事件从事件驱动模块中移除
    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}

如果ngx_trylock_accept_mutex方法没有获取到锁,接下来调用事件驱动模块的process_events方法时只能处理已有的连接上的事件;如果获取到了锁,调用process_events方法时就会既处理已有连接上的事件,也处理新连接的事件。

在worker进程的工作循环中,如果获取到了,则将flag加上NGX_POST_EVENTS标志。

if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
}

在process_events方法中,新连接事件全部放到ngx_posted_accept_events队列中,普通事件则放到ngx_posted_events队列中。

 if ((revents & EPOLLIN) && rev->active) {
     ...
            rev->ready = 1;
            rev->available = -1;

            if (flags & NGX_POST_EVENTS) {
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                ngx_post_event(rev, queue);

            } else {
                rev->handler(rev);
            }
        }

负载均衡

有一个全局变量ngx_accept_disabled,它就是负载均衡机制实现的关键阈值。ngx_accept_disabled的值就是一个负数,其值为连接总数的7/8。其实,ngx_accept_disabled的用法很简单,当它为负数时,不会进行触发负载均衡操作;而
当ngx_accept_disabled是正数时,就会触发Nginx进行负载均衡操作了。Nginx的做法也很简单,就是当ngx_accept_disabled是正数时当前进程将不再处理新连接事件,取而代之的仅仅是ngx_accept_disabled值减1。

初始话是在ngx_event_accept()中设置的

 ngx_accept_disabled = ngx_cycle->connection_n / 8
                              - ngx_cycle->free_connection_n;

是用是在worker主线程中中使用

if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }
}

ngx_process_events_and_timers流程

这里就是worker的主循环

根据不论是获取锁还是负载均衡都在这里进行着控制,这里主要的事件处理包括第7步ngx_process_events(),也就是在epoll模块中定义的ngx_epoll_process_events(),再就是第8步处理新连接的ngx_event_process_posted,第10步的处理定时器事件ngx_event_expire_timers();,最后是第11步,它是处理普通的事件。

7. TCP协议

前边学习过TCP协议,从理论上进行了学习,这里讲TCP协议,是nginx中看看tcp协议的细节。

三次握手

  • 内核在我们调用listen方法时,就已经为这个监听端口建立了SYN队列和ACCEPT队列,
  • 当客户端使用connect方法向服务器发起TCP连接,随后图中1.1步骤客户端的SYN包到达了服务器后,内核会把这一信息放到SYN队列(即未完成握手队列)中,同时回一个SYN+ACK包给客户端。
  • 2.1步骤中客户端再次发来了针对服务器SYN包的ACK网络分组时,内核会把连接从SYN队列中取出,再把这个连接放到ACCEPT队列(即已完成握手队列)中。
  • 而服务器在第3步调用accept方法建立连接时,其实就是直接从ACCEPT队列中取出已经建好的连接而已。

如果大量连接同时到来,而应用程序不能及时地调用accept方法,就会导致以上两个队列满(ACCEPT队列满,进而也会导致SYN队列满),从而导致连接无法建立。这其实很常见,比如Nginx的每个worker进程都负责调用accept方法,如果一个Nginx模块在处理请求时长时间陷入了某个方法的执行中(如执行计算或者等待IO),就有可能导致新连接无法建立。

send流程

send流程如下:

这里的分片,TCP连接建立时,就可以判断出双方的网络间最适宜的、不会被再次切分的报文大小,TCP层把它叫做MSS最大报文段长度。假定待发送的内存将按照MSS被切分为3个报文,应用程序在第1步调用send方法、第10步send方法返回之间,内核的主要任务是把用户态的内存内容拷贝到内核态的TCP缓冲区上。从这里看到,send方法成功返回并不等于就把报文发送出去了。

recv流程

流程如下图:

当调用recv这样的方法接收报文时,Nginx是基于事件驱动的,也就是说只有epoll通知worker进程收到了网络报文,recv才会被调用。在第1~4步表示接收到了无序的报文后,内核是怎样重新排序的;第5步开始,应用程序调用了recv方法,内核开始把TCP读缓冲区的内容拷贝到应用程序的用户态内存中,第13步recv方法返回拷贝的字节数。

图中用到了linux内核中为TCP准备的2个队列:

receive队列是允许用户进程直接读取的,它是将已经接收到的TCP报文,去除了TCP头部、排好序放入的、用户进程可以直接按序读取的队列;

out_of_order队列存放乱序的报文。

8. 小结

本文一边看书,一边看代码,对事件的的流程进行了较为消息的整理。

这里看到了3个模块:ngx_event_module、ngx_event_core_module、ngx_epoll_module。ngx_event_module调用了各个模块的init_conf,ngx_event_core_module为各个事件模块分配了一些列资源、ngx_epoll_module是事件的驱动模块,事件的init、add、del、process等都在这里定义。

看到worker进程主循环中主要的事件处理循环:ngx_process_events_and_timers

模块之间的关系、模块与进程之间的关系都能清除的看到了。

# nginx 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×