libuv记录

调整一下学习的顺序,从先看libuv,再看node.js。

  • v8引擎
  • libuv <=
  • node.js

主要参考:官网

基础

libuv不仅仅是一个在不同I/O轮询机制之上的简单抽象,它为sockets提供更高层次的'handles'与'streams'抽象,跨平台的文件I/O,线程、进程等功能。如图:
概览

  • 事件循环(event loop)
    事件循环

    事件循环是libuv的核心,它与一个独立的线程绑定。所有的网络I/O都在非阻塞的socket上执行。作为循环迭代的一部分,等待socket上I/O活动时,循环会阻塞,回调函数会立即触发,并指示socket的状态(可读、可写等),这样handles可以执行读写操作。

    • 首先执行是到期的times
    • 其次执行的是被挂起的callbacks,这些callback在本次轮询的前面,为什么会再前面呢,因为它们是在一次轮询前,推迟到本次的。
    • 空闲的handles
    • prepare handles、I/O轮询、check handles,这三个可以是轮询的前中后,AOP编程的思维,像是Nest框架中的Interceptors

    在事件驱动的编程中,应用向表示对特定事件的兴趣,当事件发生时进行相应。从操作系统收集事件,监视其他资源的工作由libuv来完成。用户可以注册回调,当事件触发时被调用。

    #include <stdlib.h>
    #include <uv.h>
    
    int main() {
        uv_loop_t *loop = malloc(sizeof(uv_loop_t));
        uv_loop_init(loop);
    
        printf("Now quitting.\n");
        uv_run(loop, UV_RUN_DEFAULT);
    
        uv_loop_close(loop);
        free(loop);
        return 0;
    }
    
  • handles 与 requests

    • 定义
      libuv提供两个抽象:handle与request,它们与event loop结合使用。
      handles: 代表可执行某些操作的持久对象,例如tcp server handle,一有新连接到来,就会执行回调函数。
      requests: 代表可以行某些操作的短期对象,即可以在handle里执行,也可以直接再event loop里执行。例如getaddrinfo request

      前文讲到,需要向libuv表示对某些事件的兴趣,这个怎么表示呢?这通过创建一个相应的handle来完成,handle可以简单表示成:uv_TYPE_t.例如:

      /* Handle types. */
      typedef struct uv_stream_s uv_stream_t;
      typedef struct uv_tcp_s uv_tcp_t;
      typedef struct uv_udp_s uv_udp_t;
      typedef struct uv_pipe_s uv_pipe_t;
      typedef struct uv_tty_s uv_tty_t;
      typedef struct uv_poll_s uv_poll_t;
      typedef struct uv_timer_s uv_timer_t;
      typedef struct uv_prepare_s uv_prepare_t;
      typedef struct uv_check_s uv_check_t;
      typedef struct uv_idle_s uv_idle_t;
      
    • 示例:

      #include <stdio.h>
      #include <uv.h>
      
      int64_t counter = 0;
      
      void wait_for_a_while(uv_idle_t* handle) {
          counter++;
      
          if (counter >= 10e6)
              uv_idle_stop(handle);
      }
      
      int main() {
          uv_idle_t idler;
      
          uv_idle_init(uv_default_loop(), &idler);
          uv_idle_start(&idler, wait_for_a_while);
      
          printf("Idling...\n");
          uv_run(uv_default_loop(), UV_RUN_DEFAULT);
      
          uv_loop_close(uv_default_loop());
          return 0;
      }
      
  • Streams 与 Buffers

最基本的handle是stream(uv_stream_t),TCP socket、文件I/O的管道、IPC都是stream的子类。
初始化之后,stream的基本操作如下:

int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
int uv_read_stop(uv_stream_t*);
int uv_write(uv_write_t* req, uv_stream_t* handle,
            const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);

当uv_read_start()调用后,libuv自动从stream中持续读取数据,直到uv_read_stop()调用。
每个单位的数据是buffer- uv_buf_t。两个基本属性:uv_buf_t.base,指向数据的指针;uv_buf_t.len,数据的长度。

Filesystem

在概览图中,文件I/O是建立再线程池基础之上的。Socket操作使用操作系统提供的非阻塞函数,而File操作内部使用是阻塞函数,通过线程池来触发该函数。

  • 主要函数

    int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode, uv_fs_cb cb)
    int uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb)
    void callback(uv_fs_t* req);
    int uv_fs_read(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset,   uv_fs_cb cb);
    int uv_fs_write(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset,   uv_fs_cb cb);
    
  • 直接读取方式:

    void on_open(uv_fs_t *req) {
        // The request passed to the callback is the same as the one the call setup
        // function was passed.
        assert(req == &open_req);
        if (req->result >= 0) {
            iov = uv_buf_init(buffer, sizeof(buffer));
            uv_fs_read(uv_default_loop(), &read_req, req->result,
                       &iov, 1, -1, on_read);
        }
        else {
            fprintf(stderr, "error opening file: %s\n", uv_strerror((int)req->result));
        }
    }
    
    void on_read(uv_fs_t *req) {
        if (req->result < 0) {
            fprintf(stderr, "Read error: %s\n", uv_strerror(req->result));
        }
        else if (req->result == 0) {
            uv_fs_t close_req;
            // synchronous
            uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL);
        }
        else if (req->result > 0) {
            iov.len = req->result;
            uv_fs_write(uv_default_loop(), &write_req, 1, &iov, 1, -1, on_write);
        }
    }
    
    void on_write(uv_fs_t *req) {
        if (req->result < 0) {
            fprintf(stderr, "Write error: %s\n", uv_strerror((int)req->result));
        }
        else {
            uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1, -1, on_read);
        }
    }
    
    int main(int argc, char **argv) {
        uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
        uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    
        uv_fs_req_cleanup(&open_req);
        uv_fs_req_cleanup(&read_req);
        uv_fs_req_cleanup(&write_req);
        return 0;
    }
    
    

    这是一个边读边写的示例,再onCreate中,开始读一定的量,然后写到write_req中,再write_req的回调中,再触发读。
    libuv对操作系统的api进行了完整的封装,详细内容查看libuv的api

  • stream方式

    除了直接读取,还可以通过stream的方式来使用,通过将文件fd与pipe绑定来创建一个file pipe如:

    int main(int argc, char **argv) {
        loop = uv_default_loop();
    
        uv_pipe_init(loop, &stdin_pipe, 0);
        uv_pipe_open(&stdin_pipe, 0);
    
        uv_pipe_init(loop, &stdout_pipe, 0);
        uv_pipe_open(&stdout_pipe, 1);
    
        uv_fs_t file_req;
        int fd = uv_fs_open(loop, &file_req, argv[1], O_CREAT | O_RDWR, 0644, NULL);
        uv_pipe_init(loop, &file_pipe, 0);
        uv_pipe_open(&file_pipe, fd);
    
        uv_read_start((uv_stream_t*)&stdin_pipe, alloc_buffer, read_stdin);
    
        uv_run(loop, UV_RUN_DEFAULT);
        return 0;
    }
    
    void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
        *buf = uv_buf_init((char*) malloc(suggested_size), suggested_size);
    }
    
    void read_stdin(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
        if (nread < 0){
            if (nread == UV_EOF){
                // end of file
                uv_close((uv_handle_t *)&stdin_pipe, NULL);
                uv_close((uv_handle_t *)&stdout_pipe, NULL);
                uv_close((uv_handle_t *)&file_pipe, NULL);
            }
        } else if (nread > 0) {
            write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write);
            write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write);
        }
    
        // OK to free buffer as write_data copies it.
        if (buf->base)
            free(buf->base);
    }
    

Networking

tcp

这个过程与直接使用操作系统的过程类似

  • server

    服务器通过:init、bind、listen、accept来使用

    int main() {
        loop = uv_default_loop();
    
        uv_tcp_t server;
        uv_tcp_init(loop, &server);
    
        uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
    
        uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
        int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
        if (r) {
            fprintf(stderr, "Listen error %s\n", uv_strerror(r));
            return 1;
        }
        return uv_run(loop, UV_RUN_DEFAULT);
    }
    
    void on_new_connection(uv_stream_t *server, int status) {
        if (status < 0) {
            fprintf(stderr, "New connection error %s\n", uv_strerror(status));
            // error!
            return;
        }
    
        uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
        uv_tcp_init(loop, client);
        if (uv_accept(server, (uv_stream_t*) client) == 0) {
            uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
        }
    

    注意这里read数据时,是使用stream来使用

  • client

    client端通过connect来使用

      uv_tcp_t* socket = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
      uv_tcp_init(loop, socket);
    
      uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
    
      struct sockaddr_in dest;
      uv_ip4_addr("127.0.0.1", 80, &dest);
    
      uv_tcp_connect(connect, socket, (const struct sockaddr*)&dest, on_connect);
    

udp

libuv并不为udp提供stream,使用非阻塞的uv_udp_t handle与 uv_udp_send_t request来使用。

uv_loop_t *loop;
uv_udp_t send_socket;
uv_udp_t recv_socket;

int main() {
    loop = uv_default_loop();

    uv_udp_init(loop, &recv_socket);
    struct sockaddr_in recv_addr;
    uv_ip4_addr("0.0.0.0", 68, &recv_addr);
    uv_udp_bind(&recv_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR);
    uv_udp_recv_start(&recv_socket, alloc_buffer, on_read);

    uv_udp_init(loop, &send_socket);
    struct sockaddr_in broadcast_addr;
    uv_ip4_addr("0.0.0.0", 0, &broadcast_addr);
    uv_udp_bind(&send_socket, (const struct sockaddr *)&broadcast_addr, 0);
    uv_udp_set_broadcast(&send_socket, 1);

    uv_udp_send_t send_req;
    uv_buf_t discover_msg = make_discover_msg();

    struct sockaddr_in send_addr;
    uv_ip4_addr("255.255.255.255", 67, &send_addr);
    uv_udp_send(&send_req, &send_socket, &discover_msg, 1, (const struct sockaddr *)&send_addr, on_send);

    return uv_run(loop, UV_RUN_DEFAULT);
}
void on_read(uv_udp_t *req, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
    if (nread < 0) {
        fprintf(stderr, "Read error %s\n", uv_err_name(nread));
        uv_close((uv_handle_t*) req, NULL);
        free(buf->base);
        return;
    }

    char sender[17] = { 0 };
    uv_ip4_name((const struct sockaddr_in*) addr, sender, 16);
    fprintf(stderr, "Recv from %s\n", sender);

    // ... DHCP specific code
    unsigned int *as_integer = (unsigned int*)buf->base;
    unsigned int ipbin = ntohl(as_integer[4]);
    unsigned char ip[4] = {0};
    int i;
    for (i = 0; i < 4; i++)
        ip[i] = (ipbin >> i*8) & 0xff;
    fprintf(stderr, "Offered IP %d.%d.%d.%d\n", ip[3], ip[2], ip[1], ip[0]);

    free(buf->base);
    uv_udp_recv_stop(req);
}

其他

libuv中还对线程、进程、进程通信等进行了封装,我觉得这些不是libuv的核心,这里不做详细的介绍了。

  • 线程

    int main() {
        int tracklen = 10;
        uv_thread_t hare_id;
        uv_thread_t tortoise_id;
        uv_thread_create(&hare_id, hare, &tracklen);
        uv_thread_create(&tortoise_id, tortoise, &tracklen);
    
        uv_thread_join(&hare_id);
        uv_thread_join(&tortoise_id);
        return 0;
    }
    
  • 进程

    uv_loop_t *loop;
    uv_process_t child_req;
    uv_process_options_t options;
    int main() {
        loop = uv_default_loop();
    
        char* args[3];
        args[0] = "mkdir";
        args[1] = "test-dir";
        args[2] = NULL;
    
        options.exit_cb = on_exit;
        options.file = "mkdir";
        options.args = args;
    
        int r;
        if ((r = uv_spawn(loop, &child_req, &options))) {
            fprintf(stderr, "%s\n", uv_strerror(r));
            return 1;
        } else {
            fprintf(stderr, "Launched process with ID %d\n", child_req.pid);
        }
    
        return uv_run(loop, UV_RUN_DEFAULT);
    }
    

回顾

  • event loop
  • handle 与 request
  • stream
  • file
  • tcp && udp

问题

  • 为什么文件读取用的是线程池?

  • 为什么tcp是一个stream,而udp不是一个stream?

# Node.js 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×