调整一下学习的顺序,从先看libuv,再看node.js。
主要参考:官网
基础
libuv不仅仅是一个在不同I/O轮询机制之上的简单抽象,它为sockets提供更高层次的'handles'与'streams'抽象,跨平台的文件I/O,线程、进程等功能。如图:

-
事件循环(event loop)

事件循环是libuv的核心,它与一个独立的线程绑定。所有的网络I/O都在非阻塞的socket上执行。作为循环迭代的一部分,等待socket上I/O活动时,循环会阻塞,回调函数会立即触发,并指示socket的状态(可读、可写等),这样handles可以执行读写操作。
- 首先执行是到期的times
- 其次执行的是被挂起的callbacks,这些callback在本次轮询的前面,为什么会再前面呢,因为它们是在一次轮询前,推迟到本次的。
- 空闲的handles
- prepare handles、I/O轮询、check handles,这三个可以是轮询的前中后,AOP编程的思维,像是Nest框架中的Interceptors
在事件驱动的编程中,应用向表示对特定事件的兴趣,当事件发生时进行相应。从操作系统收集事件,监视其他资源的工作由libuv来完成。用户可以注册回调,当事件触发时被调用。
#include <stdlib.h>
#include <uv.h>
int main() {
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
printf("Now quitting.\n");
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
free(loop);
return 0;
}
-
handles 与 requests
-
定义
libuv提供两个抽象:handle与request,它们与event loop结合使用。
handles: 代表可执行某些操作的持久对象,例如tcp server handle,一有新连接到来,就会执行回调函数。
requests: 代表可以行某些操作的短期对象,即可以在handle里执行,也可以直接再event loop里执行。例如getaddrinfo request
前文讲到,需要向libuv表示对某些事件的兴趣,这个怎么表示呢?这通过创建一个相应的handle来完成,handle可以简单表示成:uv_TYPE_t.例如:
/* Handle types. */
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_udp_s uv_udp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_tty_s uv_tty_t;
typedef struct uv_poll_s uv_poll_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
-
示例:
#include <stdio.h>
#include <uv.h>
int64_t counter = 0;
void wait_for_a_while(uv_idle_t* handle) {
counter++;
if (counter >= 10e6)
uv_idle_stop(handle);
}
int main() {
uv_idle_t idler;
uv_idle_init(uv_default_loop(), &idler);
uv_idle_start(&idler, wait_for_a_while);
printf("Idling...\n");
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_loop_close(uv_default_loop());
return 0;
}
-
Streams 与 Buffers
最基本的handle是stream(uv_stream_t),TCP socket、文件I/O的管道、IPC都是stream的子类。
初始化之后,stream的基本操作如下:
int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
int uv_read_stop(uv_stream_t*);
int uv_write(uv_write_t* req, uv_stream_t* handle,
const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);
当uv_read_start()调用后,libuv自动从stream中持续读取数据,直到uv_read_stop()调用。
每个单位的数据是buffer- uv_buf_t。两个基本属性:uv_buf_t.base,指向数据的指针;uv_buf_t.len,数据的长度。
Filesystem
在概览图中,文件I/O是建立再线程池基础之上的。Socket操作使用操作系统提供的非阻塞函数,而File操作内部使用是阻塞函数,通过线程池来触发该函数。
-
主要函数
int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode, uv_fs_cb cb)
int uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb)
void callback(uv_fs_t* req);
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset, uv_fs_cb cb);
int uv_fs_write(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset, uv_fs_cb cb);
-
直接读取方式:
void on_open(uv_fs_t *req) {
// The request passed to the callback is the same as the one the call setup
// function was passed.
assert(req == &open_req);
if (req->result >= 0) {
iov = uv_buf_init(buffer, sizeof(buffer));
uv_fs_read(uv_default_loop(), &read_req, req->result,
&iov, 1, -1, on_read);
}
else {
fprintf(stderr, "error opening file: %s\n", uv_strerror((int)req->result));
}
}
void on_read(uv_fs_t *req) {
if (req->result < 0) {
fprintf(stderr, "Read error: %s\n", uv_strerror(req->result));
}
else if (req->result == 0) {
uv_fs_t close_req;
// synchronous
uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL);
}
else if (req->result > 0) {
iov.len = req->result;
uv_fs_write(uv_default_loop(), &write_req, 1, &iov, 1, -1, on_write);
}
}
void on_write(uv_fs_t *req) {
if (req->result < 0) {
fprintf(stderr, "Write error: %s\n", uv_strerror((int)req->result));
}
else {
uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1, -1, on_read);
}
}
int main(int argc, char **argv) {
uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_fs_req_cleanup(&open_req);
uv_fs_req_cleanup(&read_req);
uv_fs_req_cleanup(&write_req);
return 0;
}
这是一个边读边写的示例,再onCreate中,开始读一定的量,然后写到write_req中,再write_req的回调中,再触发读。
libuv对操作系统的api进行了完整的封装,详细内容查看libuv的api
-
stream方式
除了直接读取,还可以通过stream的方式来使用,通过将文件fd与pipe绑定来创建一个file pipe如:
int main(int argc, char **argv) {
loop = uv_default_loop();
uv_pipe_init(loop, &stdin_pipe, 0);
uv_pipe_open(&stdin_pipe, 0);
uv_pipe_init(loop, &stdout_pipe, 0);
uv_pipe_open(&stdout_pipe, 1);
uv_fs_t file_req;
int fd = uv_fs_open(loop, &file_req, argv[1], O_CREAT | O_RDWR, 0644, NULL);
uv_pipe_init(loop, &file_pipe, 0);
uv_pipe_open(&file_pipe, fd);
uv_read_start((uv_stream_t*)&stdin_pipe, alloc_buffer, read_stdin);
uv_run(loop, UV_RUN_DEFAULT);
return 0;
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
*buf = uv_buf_init((char*) malloc(suggested_size), suggested_size);
}
void read_stdin(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0){
if (nread == UV_EOF){
// end of file
uv_close((uv_handle_t *)&stdin_pipe, NULL);
uv_close((uv_handle_t *)&stdout_pipe, NULL);
uv_close((uv_handle_t *)&file_pipe, NULL);
}
} else if (nread > 0) {
write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write);
write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write);
}
// OK to free buffer as write_data copies it.
if (buf->base)
free(buf->base);
}
Networking
tcp
这个过程与直接使用操作系统的过程类似
-
server
服务器通过:init、bind、listen、accept来使用
int main() {
loop = uv_default_loop();
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
注意这里read数据时,是使用stream来使用
-
client
client端通过connect来使用
uv_tcp_t* socket = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, socket);
uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
struct sockaddr_in dest;
uv_ip4_addr("127.0.0.1", 80, &dest);
uv_tcp_connect(connect, socket, (const struct sockaddr*)&dest, on_connect);
udp
libuv并不为udp提供stream,使用非阻塞的uv_udp_t handle与 uv_udp_send_t request来使用。
uv_loop_t *loop;
uv_udp_t send_socket;
uv_udp_t recv_socket;
int main() {
loop = uv_default_loop();
uv_udp_init(loop, &recv_socket);
struct sockaddr_in recv_addr;
uv_ip4_addr("0.0.0.0", 68, &recv_addr);
uv_udp_bind(&recv_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR);
uv_udp_recv_start(&recv_socket, alloc_buffer, on_read);
uv_udp_init(loop, &send_socket);
struct sockaddr_in broadcast_addr;
uv_ip4_addr("0.0.0.0", 0, &broadcast_addr);
uv_udp_bind(&send_socket, (const struct sockaddr *)&broadcast_addr, 0);
uv_udp_set_broadcast(&send_socket, 1);
uv_udp_send_t send_req;
uv_buf_t discover_msg = make_discover_msg();
struct sockaddr_in send_addr;
uv_ip4_addr("255.255.255.255", 67, &send_addr);
uv_udp_send(&send_req, &send_socket, &discover_msg, 1, (const struct sockaddr *)&send_addr, on_send);
return uv_run(loop, UV_RUN_DEFAULT);
}
void on_read(uv_udp_t *req, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
if (nread < 0) {
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) req, NULL);
free(buf->base);
return;
}
char sender[17] = { 0 };
uv_ip4_name((const struct sockaddr_in*) addr, sender, 16);
fprintf(stderr, "Recv from %s\n", sender);
// ... DHCP specific code
unsigned int *as_integer = (unsigned int*)buf->base;
unsigned int ipbin = ntohl(as_integer[4]);
unsigned char ip[4] = {0};
int i;
for (i = 0; i < 4; i++)
ip[i] = (ipbin >> i*8) & 0xff;
fprintf(stderr, "Offered IP %d.%d.%d.%d\n", ip[3], ip[2], ip[1], ip[0]);
free(buf->base);
uv_udp_recv_stop(req);
}
其他
libuv中还对线程、进程、进程通信等进行了封装,我觉得这些不是libuv的核心,这里不做详细的介绍了。
-
线程
int main() {
int tracklen = 10;
uv_thread_t hare_id;
uv_thread_t tortoise_id;
uv_thread_create(&hare_id, hare, &tracklen);
uv_thread_create(&tortoise_id, tortoise, &tracklen);
uv_thread_join(&hare_id);
uv_thread_join(&tortoise_id);
return 0;
}
-
进程
uv_loop_t *loop;
uv_process_t child_req;
uv_process_options_t options;
int main() {
loop = uv_default_loop();
char* args[3];
args[0] = "mkdir";
args[1] = "test-dir";
args[2] = NULL;
options.exit_cb = on_exit;
options.file = "mkdir";
options.args = args;
int r;
if ((r = uv_spawn(loop, &child_req, &options))) {
fprintf(stderr, "%s\n", uv_strerror(r));
return 1;
} else {
fprintf(stderr, "Launched process with ID %d\n", child_req.pid);
}
return uv_run(loop, UV_RUN_DEFAULT);
}
回顾
- event loop
- handle 与 request
- stream
- file
- tcp && udp
问题