6. 进程间通信
进程间通信的手段,可以分成2大类:通信类与同步类。前者用于在进程间传递数据,后者用于协调进程间操作。下边来看看它们,包括管道、消息队列、信号量、mmap、共享内存等几种方式。
管道
-
简介
管道的作用是在有亲缘关系的进程之间传递消息,共同祖先调动pipe函数,打开管道文件就会在fork之后被各个后代进程所共享。一般是2个亲缘进程之间来通信,一个写另一个读,如果要双工,最好是2个管道来进行。
管道是一种文件,可以调用read、write、close等接口来操作,它属于一种特殊的文件系统pipefs。管道中数据是阅后即焚的,读取操作是一种消耗行为。
-
创建与读写
#include <unistd.h>
int pipe(int pipefd[2]); // 创建
int pipe2(int pipefd[2], int flags) // flag可以用O_NONBLOCK
write(pipefd[1],wbuf,count); // 写
read(pipefd[0],rbuf,count); // 读
pipe
函数返回2个文件描述符,一个是读描述符pipefd[0]
,一个是写描述符pipefd[1]
。示例代码如下:
int pipefd[2];
pipe(pipefd);
switch(fork())
{
case -1:
/*fork failed, error handler here*/
case 0: // 子进程
close(pipefd[1]) ; /* 关闭掉写入端对应的文件描述符,可以 pipefd[0] 调用 read*/
break ;
default: // 父进程
close(pipefd[0]); /* 关闭掉读取端文件描述符,可以 pipefd[1] 调用 write*/
break;
}
注意关闭未使用的管道文件描述符。如果所有读端已经关闭,写端再调用,会操作失败,errno被设置为EPIPE,同时内核会向写入进程发送SIGPIPE的信号。
-
管道对应内存区大小
管道本质是一片内存区域,默认大小是65536字节,可以通过fcntl来获取与修改。
int pipe_capacity = fcntl(fd, F_GETPIPE_SZ); // 获取管道大小
ret = fcntl(fd, F_SETPIPE_SZ, size); // 设置管道大小
可以执行cat /proc/sys/fs/pipe-max-size
查看
-
命名管道
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
这个与文件I/O的create类似,mode文件的读写权限,pathname是目录。
文件IO的open、read、write、close等都可以用在FIFO文件上。但FIFO文件不应该以O_RDWR
模式打开。正确的使用方法是一个以O_RDONLY
方式打开,另外一个以O_WRONLY
打开。
通常打开FIFO文件会同步读写进程,读写都打开才会返回,否则会阻塞。可以通过设置O_NONBLOCK
来设置成异步方式。
POSIX消息队列
-
POSIX IPC简介
以下几种方式,有System V风格与POSIX风格,POSIX风格的出现的要晚,具有后发优势,总体来讲要用于SystemV风格。
| 消息队列 | 信号量 | 共享内存 |
头文件 | <mqueue.h> | <semaphore.h> | <sys/mman.h> |
创建或打开 | mq_open | sem_open | shm_open+mmap |
关闭 | mq_close | sem_close | munmap |
删除 | mq_unlink | sem_unlink | shm_unlink |
执行IPC | mq_send、mq_receive | sem_post、sem_wait、sem_getvalue | 共享内存区域操作数据 |
其他操作 | mq_getattr、mq_setattr、mq_notify | sem_init、sem_destroy | 无 |
创建出来的共享内存、信号量,Linux将这些对象了挂载在/dev/shm目录处的tmpfs文件系统中。
消息队列也可在文件系统中展示,需要手动挂载:
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue
POSIX消息队列的名字必须以/打头,后续字符不允许出现/,打头的/字符不计入长度,最大长度为NAME_MAX(255字符)。
POSIX信号量和共享内存可以以1个或多个/大豆,也可不以/打头,打头的/字符不计入长度,共享内存的最大长度是NAME_MAX,POSIX信号量的名字最大长度为NAME_MAX-4。
示例:
// 消息队列
mqd_t mqd = mq_open(argv[1],O_RDWR|O_CREAT|O_EXCL,S_IRUSR|S_IWUSR,NULL);
if(mqd == -1)
{
/*error handle*/
}
// 信号量
sem_t* sem = sem_open(argv[1],O_CREAT|O_EXCL,S_IRUSR|S_IWUSR,1);
if(sem == SEM_FAILED)
{
/*error handle*/
}
// 共享内存
int shm_fd= shm_open(argv[1],O_RDWR|O_CREAT|O_EXCL,S_IRUSR|S_IWUSR);
if(shm_fd == -1)
{
/*error handle*/
}
-
创建open
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
该函数与open
类似,第2个参数oflgas的标志位包括:O_RDONLY、O_WRONLY、O_RDWR、O_CREAT、O_EXCL、O_NONBLOCK等。如果打开消息队列时,没有设置O_NONBLOCK,后续调用mq_send、mq_receive就会进入阻塞,反之则不会阻塞。
第3个参数mode与第4个参数attr只有在创建消息队列时才有意义。mode是访问权限,attr设置了消息队列的属性,attr可以为NULL,表示默认属性。
struct mq_attr {
long mq_flags; // 0或设置O_NONBLOCK
long mq_maxmsg; // 队列中最大消息个数
long mq_msgsize; // 单条消息允许的最大字节数
long mq_curmsgs; // 队列中当前的消息个数
}
默认的配置如下查看如设置:
cat /proc/sys/fs/mqueue/msg_max
cat /proc/sys/fs/mqueue/msgsize_max
sysctl -w fs.mqueue.msg_max=4096
sysctl -w fs.mqueue.msgsize_max=65536
-
关闭close
#include <mqueue.h>
int mq_close(mqd_t mqdes);
POSIX IPC对象维护有引用计数,在调用相关的close函数后,引用计数-1.
-
销毁unlink
#include <mqueue.h>
int mq_unlink(const char *name);
即使引用计数为0,只要不显示调用mq_unlink,该队列及队列上的消息就依然存在。
-
调整消息队列的属性attr
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
struct mq_attr *oldattr);
消息队列创建以后,可以通过调用它们来修改属性,但mq_maxmgs、mg_msgsize属性在创建时已经确定下来,mq_setattr不能修改这2个属性,mq_curmsgs是根据消息自己变化的,也就是说这两个函数仅用于修改mq_flags。
mq_getattr(mqd,&attr);
attr.mq_flags |= O_NONBLOCK; /* 设置 O_NONBLOCK 性 */
//attr.mq_flags &=(~O_NONBLOCK);/* 取消 O_NONBLOCK 性 */
mq_setattr(mqd,&attr,NULL)
-
消息的发送send
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio);
第3个参数是消息的长度,不要超过mq_msgsize
第4个参数是消息的优先级,非负数。
如果消息队列已满,mq_send可能阻塞,如果设置了O_NONBLOCK,会立即返回失败,errno设置为EAGAIN。
-
消息的接收receive
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
对于POSIX消息队列而言,宗师取走优先级最高的消息中,最先到达的那个。
第4个参数msg_proi不是NULL,系统将取到的消息体的优先级复制到msg_prio指向的整型变量,如果NULL,表示不在乎消息体的优先级。如果mg_receive调用时,消息队列中没有消息,则函数陷入阻塞,如果设置了O_NONBLOCK,则立即返回失败,并设置errno为EAGIAN。
POSIX消息队列的本质就是个优先级队列,优先级高的总是被优先取出。
-
消息的通知notify
#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
以上接口与管道没有多大什么区别,消息队列的特点就是本接口。也就是发布-订阅的功能。当空的队列中收到一条新的消息,就给相应的进程发送通知,这是一种典型的异步机制。
POSI消息队列提供了2种异步的方法:1. 产生一个信号;2. 创建一个线程来执行一个事先制指定的函数。
第2个参数:
union sigval{
int sigval_int;
void *sigval_ptr;
}
struct sigevent {
int sigev_notify; /* 决定采用哪种通知方式 ,信号还是线程,SIGEV_SIGNAL或SIGEV_THREAD*/
int sigev_signo; /* 用 信号方式,决定发送哪个信号 */
union sigval sigev_value; /* 信号或线程方式含义不同 */
void (*sigev_notify_function)(union sigval);
void *sigev_notify_attributes;
}
信号方式示例:
mqd_t mqd;
struct mq_attr attr ;
sigset_t newmask ;
struct sigevent sigev;
mqd = mq_open(mq_filename,O_RDONLY|O_NONBLOCK);
mq_getattr(mqd,&attr);
buffer = malloc(attr.mq_msgsize) ; /* 保证buffer 足够大 */
sigemptyset(&newmask);;
sigaddset(&newmask,SIGUSR1);
sigprocmask(SIG_BLOCK,&newmask,NULL);/* 阻塞等待的信号 */
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd,&sigev);
for( ; ; )
{
sigwait(&newmask,&signo);/* 待 SIGUSR1 信号 */
if(signo == SIGUSR1)
{
mq_notify(mqd,&sigev); /* 先重新注册 notify 函数 */
while( n = mq_receive(mqd,buffer,attr.mq_msgsize,NULL) >= 0)
{
/*process the message in buffer*/
}
if(errno != EAGAIN)
{
/*some error happened*/
}
}
}
Linux系统支持在消息队列上执行select/epoll,以上过程会变得更简单:
mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK);
if(mqd == -1)
{
fprintf(stderr,"failed to open mqueue (%d: %s)\n",errno,strerror(errno));
return 1;
}
mq_getattr(mqd,&attr);
buffer = malloc(attr.mq_msgsize);
FD_ZERO(&rset);
for(;;)
{
FD_SET(mqd,&rset);
nfds = select(mqd+1,&rset,NULL,NULL,NULL);
if(FD_ISSET(mqd,&rset))
{
while((n = mq_receive(mqd,buffer,attr.mq_msgsize,NULL)) >=0 )
{
/* 在此处处理本条消息 */
}
if(errno != EAGAIN)
{
/* 处理错误 */
}
}
}
POSIX信号量
-
简介
与信号量概念类似的是互斥量,它们都是为了同步多个进程的操作,信号量是与某个资源相关,信号量元素的值表示与之关联的资源的个数。内核维护信号量的值,并确保其值不小于0。互斥量保护临界区,可以看成资源个数是1,信号量的资源个数可以是多个,允许多个进程同时使用资源。
POSIX提供了2种信号量:有名信号量与无名信号量。无名信号量是基于内存的,由于没有名字,没法通过open操作直接找到对应的信号量,很难在多进程之间使用。无名信号量多用于线程之间同步。有名信号量反之,可以用于多进程同步。它们在使用上的相同,创建、消耗时的接口不同。
-
创建open
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
第2个参数支持O_CREATE、O_EXCL,表示创建信号量。
最后一个value是新建信号的初始值,value的值在0和SEM_VALUE_MAX之间。
创建失败,返回SEM_FAILED,并设置errno,不要尝试创建sem_t结构体的副本。
-
关闭close
#include <semaphore.h>
int sem_close(sem_t *sem);
关闭信号量,同时信号量的进程数的引用计数-1。进程终止时,有名信号量会自动关闭,当进程执行exec系统函数时,也会关闭。
-
销毁unlink
#include <semaphore.h>
int sem_unlink(const char *name);
与其他POSIX对象相同,只有当引用计数为0,才会真正删除。
-
信号量使用wait、post
#include <semaphore.h>
// 等待信号,它们将会使信号量的值-1
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
// 发布信号量,归还资源,信号量值+1
int sem_post(sem_t *sem);
// 获取信号量的值
int sem_getvalue(sem_t *sem, int *sval);
-
无名信号量的创建init
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared参数用于声明信号量是在线程间共享还是在进程间共享。0表示在线程间,非0表示在进程间,想要在进程间使用,信号量必须位于共享内存区域内。
对于线程间共享的信号量,线程组退出,也就自动销毁了;对于进程间共享的信号量,信号量的持久性与所在共享内存的持久性一样。
-
无名信号量的销毁destroy
int sem_destroy(sem_t *sem);
至于所有进程都没有等待信号量时,才会被安全销毁。
内存映射mmap
-
简介
进程与其他进程共享物理内存,各个进程的页表条目指向RAM中相同的分页,如下图所示:
通过mmap系统调用创建的内存映射可以做到进程之间共享,同时要不要映射也是可以选择的,取决于映射是私有映射(MAP_PRIVATE),还是共享映射(MAP_SHARED)。
-
创建mmap
#include <sys/mman.h>
void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
fd、offset、length3个参数制定内存映射的源,即将fd对应的文件,从offset位置起,将长度为length的内容映射到进程地址空间。
第1个参数addr用于指定将文件对应的内存映射到进程地址空间的起始地址,一般来讲总是为NULL,表示内核自己选择。
第3个参数用于对内存映射的保护,它的合法值如下:
- PROT_EXEC: 映射的内容可以执行
- PROT_READ: 映射的内容可读
- PROT_WRITE: 映射的内容可写
- PROT_NONE: 映射的内容不能访问
第4个参数用于制定是共享映射还是私有映射:
- MAP_SHARED: 共享映射
- MAP_PRIVATE: 私有映射
- MAP_ANONYMOUS: 匿名映射fd参数必须是-1.
MAP_SHARED 与 MAP_PRIVATE标志位两者必选一个。
map系统调用的单元是页,参数addr和offset必须是页对齐,也就是页面大小的整数倍,linux下页面大小是4096字节(可通过getconf PAGESIZE
查看)。
返回值是内存映射起始地址
-
销毁munmap
int munmap(void *addr, size_t length);
addr是mmap返回值,length是内存映射区域的大小。注意关闭对应的文件并不会引发munmap,因此在没有其他需要的情况下,可以调用close关闭文件。
伪码示例:
fd = open(...);
addr = mmap(..., MAP_SHARED, fd, ...);
close(fd);
POSIX共享内存
-
简介
mmap系统调用做了大量的工作,POSIX共享内存就是建立在mmap之上的。
-
创建open
#include <sys/mman.h>
int shm_open(const char *name, int oflag, mode_t mode);
第2个参数与其他与其他的类似,要包含O_RDONLY或O_RDWR标志,创建时用0_CREAT | O_EXCL。
第3个参数共享内存的访问权限。
调用成功后,会返回一个文件描述符,与其POSIX IPC一样,内核会自动设置FD_CLOEXEC标志位。
共享内存是文件,可以调用文件相关的函数。新创建的共享内存,默认大小总是0,在调用mmap之前,需要先调用ftruncate函数,调节文件的大小。通过fstat接口可以获取共享内存区的大小。
示例
fd = shm_open(name,...); // 使用 mmap 映射进地址空间
addr = mmap(NULL,length,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
-
删除unlink
int shm_unlink(const char *name);
删除一个共享内存对象,并不会影响既有的映射,内核维护引用计数,当所有进程全部munmap接触映射之后,共享内存对象才会被真正删除。
如果不显示调用shm_unlink,共享内存对象中的数据则具有持久性,哪怕所有进程都通过munmap解除映射,只要不调用它,其中数据不会丢失。当然系统重启,共享内存对象也就不存在了。
7. 网络通信
简介
主要对socket套接字进行详细的分析,包括连接的建立、数据的发送与接收等等。
连接的建立socket
#include <sys/types.h>
#include <sys/socket.h>
int socket(int domain, int type, int protocol);
第1个参数domain:用于表示协议族,如AF_INET为IPv4
第2个参数type: 用于指示类型,如基于流通信的SOCK_STREAM
第3个参数protocal: 对应这种socket的具体协议类型。
成功创建socket后,会返回一个套接字文件描述符。失败时,接口返回-1。
绑定IP地址bind
#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
第1个参数sockfd:表示要绑定地址的套接字描述符
第2个参数addr:表示绑定到套接字的地址
struct sockadd {
sa_family_t sa_family;
char sa_data[14];
}
第3个参数addrlen: 表示绑定的地址长度
成功返回0,错误返回-1。
成功创建套接字后,该套接字仅仅是一个文件描述符,并没有任何地址与之关联,使用socket发送数据包时,由于该socket没有任何ip地址,内核会自动选择一个地址。一般对于服务端需要通过bind手动指定地址。
客户端连接connect
#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
第1个参数 sockfd:套接字描述符
第2个参数 addr:要连接的地址
第3个参数addrlen:要连接的地址长度
对于TCP套接字来说,connect是3次握手,所以其默认是一个阻塞操作。也可以成为一个非阻塞的过程。
服务器监听listen
#include <sys/socket.h>
int listen(int sockfd, int backlog);
第1个参数sockfd:成功创建的TCP套接字
第2个参数backlog:定义TCP未处理连接的队列长度,该队列已经完成三次握手,但服务器端还没有执行accept的连接。
成功返回0,失败返回-1。
服务器连接accept
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
第1个参数sockfd:处于监听状态的套接字
第2个参数addr:保存客户端的地址信息
第3个参数addrlen:是一个输入输出值,调用者将其初始化为addr缓存的大小,accept返回时,设置为addr的大小。
第4个参数flags:accept4的标志,目前支持SOCK_NONBLOCK、SOCK_CLOEXEC。
成功返回一个非负的文件描述符,失败返回-1。
accept用于从指定套接字的连接队列中取出第一个连接,并返回一个新的套接字用于与客户端通信。
发送相关接口send
#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
send只能用于处理已经连接的套接字。
sendto可以在调用时指定目的地址,如果套接字已经是连接状态,那么目的地址的dest_addr与addrlen应该是NULL和0。
sendmsg无论要发送的数据还是目的地址都保存在msg中,msg.msg_name和msg.msg_len用于指明目的地址,而msg.msg_iov用于保存要发送的数据。
flags只是标志位(稍现代的系统调用,一般都会有一个标志参数,可以从容的为系统调用增加新功能,并同时兼容老版本)
由于socket同时还是文件描述符,所以为文件提供的读写函数,也可以被socket直接调用
接收相关接口recv
#include <sys/socket.h>
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
与send对称的接口
recv也是处理已连接的套接字。
recvfrom通过额外的参数 src_addr与addrlen来获得发送方的地址,addrlen也是输入输出值。
recvmsg与sengmsg相同,把接受的数据都保存在msg中。
都支持设置标志位flags