Linux环境编程之二

6. 进程间通信

进程间通信的手段,可以分成2大类:通信类与同步类。前者用于在进程间传递数据,后者用于协调进程间操作。下边来看看它们,包括管道、消息队列、信号量、mmap、共享内存等几种方式。

管道

  • 简介

    管道的作用是在有亲缘关系的进程之间传递消息,共同祖先调动pipe函数,打开管道文件就会在fork之后被各个后代进程所共享。一般是2个亲缘进程之间来通信,一个写另一个读,如果要双工,最好是2个管道来进行。

    管道是一种文件,可以调用read、write、close等接口来操作,它属于一种特殊的文件系统pipefs。管道中数据是阅后即焚的,读取操作是一种消耗行为。

  • 创建与读写

    #include <unistd.h>
    int pipe(int pipefd[2]);		// 创建
    int pipe2(int pipefd[2], int flags) // flag可以用O_NONBLOCK
    
    write(pipefd[1],wbuf,count);	// 写
    read(pipefd[0],rbuf,count);  // 读
    

    pipe函数返回2个文件描述符,一个是读描述符pipefd[0],一个是写描述符pipefd[1]。示例代码如下:

    int pipefd[2];
    pipe(pipefd);
    switch(fork())
    {
    case -1:
    	/*fork failed, error handler here*/
    case 0:		// 子进程
    	close(pipefd[1]) ; /* 关闭掉写入端对应的文件描述符,可以 pipefd[0] 调用 read*/
    	break ;
    default:  // 父进程
    	close(pipefd[0]); /* 关闭掉读取端文件描述符,可以 pipefd[1] 调用 write*/
    	break;
    }
    

    注意关闭未使用的管道文件描述符。如果所有读端已经关闭,写端再调用,会操作失败,errno被设置为EPIPE,同时内核会向写入进程发送SIGPIPE的信号。

  • 管道对应内存区大小

    管道本质是一片内存区域,默认大小是65536字节,可以通过fcntl来获取与修改。

    int pipe_capacity = fcntl(fd, F_GETPIPE_SZ); 		// 获取管道大小
    ret = fcntl(fd, F_SETPIPE_SZ, size);		// 设置管道大小
    

    可以执行cat /proc/sys/fs/pipe-max-size查看

  • 命名管道

    #include <sys/types.h>
    #include <sys/stat.h>
    int mkfifo(const char *pathname, mode_t mode);
    

    这个与文件I/O的create类似,mode文件的读写权限,pathname是目录。

    文件IO的open、read、write、close等都可以用在FIFO文件上。但FIFO文件不应该以O_RDWR模式打开。正确的使用方法是一个以O_RDONLY方式打开,另外一个以O_WRONLY打开。

    通常打开FIFO文件会同步读写进程,读写都打开才会返回,否则会阻塞。可以通过设置O_NONBLOCK来设置成异步方式。

POSIX消息队列

  • POSIX IPC简介

    以下几种方式,有System V风格与POSIX风格,POSIX风格的出现的要晚,具有后发优势,总体来讲要用于SystemV风格。

    消息队列信号量共享内存
    头文件<mqueue.h><semaphore.h><sys/mman.h>
    创建或打开mq_opensem_openshm_open+mmap
    关闭mq_closesem_closemunmap
    删除mq_unlinksem_unlinkshm_unlink
    执行IPCmq_send、mq_receivesem_post、sem_wait、sem_getvalue共享内存区域操作数据
    其他操作mq_getattr、mq_setattr、mq_notifysem_init、sem_destroy

    创建出来的共享内存、信号量,Linux将这些对象了挂载在/dev/shm目录处的tmpfs文件系统中。

    消息队列也可在文件系统中展示,需要手动挂载:

    mkdir /dev/mqueue
    mount -t mqueue none /dev/mqueue
    

    POSIX消息队列的名字必须以/打头,后续字符不允许出现/,打头的/字符不计入长度,最大长度为NAME_MAX(255字符)。

    POSIX信号量和共享内存可以以1个或多个/大豆,也可不以/打头,打头的/字符不计入长度,共享内存的最大长度是NAME_MAX,POSIX信号量的名字最大长度为NAME_MAX-4。

    示例:

    // 消息队列
    mqd_t mqd = mq_open(argv[1],O_RDWR|O_CREAT|O_EXCL,S_IRUSR|S_IWUSR,NULL);
    if(mqd == -1)
    {
    /*error handle*/
    }
    
    // 信号量
    sem_t* sem = sem_open(argv[1],O_CREAT|O_EXCL,S_IRUSR|S_IWUSR,1);
    if(sem == SEM_FAILED)
    {
    /*error handle*/
    }
    
    // 共享内存
    int shm_fd= shm_open(argv[1],O_RDWR|O_CREAT|O_EXCL,S_IRUSR|S_IWUSR);
    if(shm_fd == -1)
    {
    /*error handle*/
    }
    
  • 创建open

    #include <fcntl.h>
    #include <sys/stat.h>
    #include <mqueue.h>
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
    

    该函数与open类似,第2个参数oflgas的标志位包括:O_RDONLY、O_WRONLY、O_RDWR、O_CREAT、O_EXCL、O_NONBLOCK等。如果打开消息队列时,没有设置O_NONBLOCK,后续调用mq_send、mq_receive就会进入阻塞,反之则不会阻塞。

    第3个参数mode与第4个参数attr只有在创建消息队列时才有意义。mode是访问权限,attr设置了消息队列的属性,attr可以为NULL,表示默认属性。

    struct mq_attr {
    long mq_flags;				// 0或设置O_NONBLOCK
    long mq_maxmsg;		// 队列中最大消息个数
    long mq_msgsize;	// 单条消息允许的最大字节数
    long mq_curmsgs;	// 队列中当前的消息个数
    }
    

    默认的配置如下查看如设置:

    cat /proc/sys/fs/mqueue/msg_max
    cat /proc/sys/fs/mqueue/msgsize_max
    
    sysctl -w fs.mqueue.msg_max=4096
    sysctl -w fs.mqueue.msgsize_max=65536
    
  • 关闭close

    #include <mqueue.h>
    int mq_close(mqd_t mqdes);
    

    POSIX IPC对象维护有引用计数,在调用相关的close函数后,引用计数-1.

  • 销毁unlink

    #include <mqueue.h>
    int mq_unlink(const char *name);
    

    即使引用计数为0,只要不显示调用mq_unlink,该队列及队列上的消息就依然存在。

  • 调整消息队列的属性attr

    #include <mqueue.h>
    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
    struct mq_attr *oldattr);
    

    消息队列创建以后,可以通过调用它们来修改属性,但mq_maxmgs、mg_msgsize属性在创建时已经确定下来,mq_setattr不能修改这2个属性,mq_curmsgs是根据消息自己变化的,也就是说这两个函数仅用于修改mq_flags。

    mq_getattr(mqd,&attr);
    attr.mq_flags |= O_NONBLOCK; /* 设置 O_NONBLOCK 性 */
    //attr.mq_flags &=(~O_NONBLOCK);/* 取消 O_NONBLOCK 性 */
    mq_setattr(mqd,&attr,NULL)
    
  • 消息的发送send

    #include <mqueue.h>
    int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio);
    

    第3个参数是消息的长度,不要超过mq_msgsize

    第4个参数是消息的优先级,非负数。

    如果消息队列已满,mq_send可能阻塞,如果设置了O_NONBLOCK,会立即返回失败,errno设置为EAGAIN。

  • 消息的接收receive

    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
    

    对于POSIX消息队列而言,宗师取走优先级最高的消息中,最先到达的那个。

    第4个参数msg_proi不是NULL,系统将取到的消息体的优先级复制到msg_prio指向的整型变量,如果NULL,表示不在乎消息体的优先级。如果mg_receive调用时,消息队列中没有消息,则函数陷入阻塞,如果设置了O_NONBLOCK,则立即返回失败,并设置errno为EAGIAN。

    POSIX消息队列的本质就是个优先级队列,优先级高的总是被优先取出。

  • 消息的通知notify

    #include <mqueue.h>
    int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
    

    以上接口与管道没有多大什么区别,消息队列的特点就是本接口。也就是发布-订阅的功能。当空的队列中收到一条新的消息,就给相应的进程发送通知,这是一种典型的异步机制。

    POSI消息队列提供了2种异步的方法:1. 产生一个信号;2. 创建一个线程来执行一个事先制指定的函数。

    第2个参数:

    union sigval{
    int sigval_int;
    void *sigval_ptr;
    }
    struct sigevent {
    int sigev_notify; 	/* 决定采用哪种通知方式 ,信号还是线程,SIGEV_SIGNAL或SIGEV_THREAD*/
    int sigev_signo; 	/* 用 信号方式,决定发送哪个信号 */
    union sigval sigev_value; /* 信号或线程方式含义不同 */
    void (*sigev_notify_function)(union sigval);
    void *sigev_notify_attributes;
    }
    

    信号方式示例:

    mqd_t mqd;
    struct mq_attr attr ;
    sigset_t newmask ;
    struct sigevent sigev;
    mqd = mq_open(mq_filename,O_RDONLY|O_NONBLOCK);
    
    mq_getattr(mqd,&attr);
    buffer = malloc(attr.mq_msgsize) ; /* 保证buffer 足够大 */
    
    sigemptyset(&newmask);;
    sigaddset(&newmask,SIGUSR1);
    sigprocmask(SIG_BLOCK,&newmask,NULL);/* 阻塞等待的信号 */
    
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd,&sigev);
    
    for( ; ; )
    {
        sigwait(&newmask,&signo);/* 待 SIGUSR1 信号 */
        if(signo == SIGUSR1)
        {
            mq_notify(mqd,&sigev); /* 先重新注册 notify 函数 */
            while( n = mq_receive(mqd,buffer,attr.mq_msgsize,NULL) >= 0)
            {
            	/*process the message in buffer*/
            }
            if(errno != EAGAIN)
            {
            	/*some error happened*/
            }
        }
    }
    

    Linux系统支持在消息队列上执行select/epoll,以上过程会变得更简单:

    mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK);
    if(mqd == -1)
    {
        fprintf(stderr,"failed to open mqueue (%d: %s)\n",errno,strerror(errno));
        return 1;
    }
    mq_getattr(mqd,&attr);
    buffer = malloc(attr.mq_msgsize);
    
    FD_ZERO(&rset);
    for(;;)
    {
        FD_SET(mqd,&rset);
        nfds = select(mqd+1,&rset,NULL,NULL,NULL);
        if(FD_ISSET(mqd,&rset))
        {
            while((n = mq_receive(mqd,buffer,attr.mq_msgsize,NULL)) >=0 )
            {
            /* 在此处处理本条消息 */
            }
            if(errno != EAGAIN)
            {
                /* 处理错误 */
            }
        }
    }
    

POSIX信号量

  • 简介

    与信号量概念类似的是互斥量,它们都是为了同步多个进程的操作,信号量是与某个资源相关,信号量元素的值表示与之关联的资源的个数。内核维护信号量的值,并确保其值不小于0。互斥量保护临界区,可以看成资源个数是1,信号量的资源个数可以是多个,允许多个进程同时使用资源。

    POSIX提供了2种信号量:有名信号量与无名信号量。无名信号量是基于内存的,由于没有名字,没法通过open操作直接找到对应的信号量,很难在多进程之间使用。无名信号量多用于线程之间同步。有名信号量反之,可以用于多进程同步。它们在使用上的相同,创建、消耗时的接口不同。

  • 创建open

    #include <fcntl.h>
    #include <sys/stat.h>
    #include <semaphore.h>
    sem_t *sem_open(const char *name, int oflag);
    sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
    

    第2个参数支持O_CREATE、O_EXCL,表示创建信号量。

    最后一个value是新建信号的初始值,value的值在0和SEM_VALUE_MAX之间。

    创建失败,返回SEM_FAILED,并设置errno,不要尝试创建sem_t结构体的副本。

  • 关闭close

    #include <semaphore.h>
    int sem_close(sem_t *sem);
    

    关闭信号量,同时信号量的进程数的引用计数-1。进程终止时,有名信号量会自动关闭,当进程执行exec系统函数时,也会关闭。

  • 销毁unlink

    #include <semaphore.h>
    int sem_unlink(const char *name);
    

    与其他POSIX对象相同,只有当引用计数为0,才会真正删除。

  • 信号量使用wait、post

    #include <semaphore.h>
    // 等待信号,它们将会使信号量的值-1
    int sem_wait(sem_t *sem);
    int sem_trywait(sem_t *sem);
    int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
    
    // 发布信号量,归还资源,信号量值+1
    int sem_post(sem_t *sem);
    
    // 获取信号量的值
    int sem_getvalue(sem_t *sem, int *sval);
    
  • 无名信号量的创建init

    int sem_init(sem_t *sem, int pshared, unsigned int value);
    

    pshared参数用于声明信号量是在线程间共享还是在进程间共享。0表示在线程间,非0表示在进程间,想要在进程间使用,信号量必须位于共享内存区域内。

    对于线程间共享的信号量,线程组退出,也就自动销毁了;对于进程间共享的信号量,信号量的持久性与所在共享内存的持久性一样。

  • 无名信号量的销毁destroy

    int sem_destroy(sem_t *sem);
    

    至于所有进程都没有等待信号量时,才会被安全销毁。

内存映射mmap

  • 简介

    进程与其他进程共享物理内存,各个进程的页表条目指向RAM中相同的分页,如下图所示:

    通过mmap系统调用创建的内存映射可以做到进程之间共享,同时要不要映射也是可以选择的,取决于映射是私有映射(MAP_PRIVATE),还是共享映射(MAP_SHARED)。

  • 创建mmap

    #include <sys/mman.h>
    void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
    

    fd、offset、length3个参数制定内存映射的源,即将fd对应的文件,从offset位置起,将长度为length的内容映射到进程地址空间。

    第1个参数addr用于指定将文件对应的内存映射到进程地址空间的起始地址,一般来讲总是为NULL,表示内核自己选择。

    第3个参数用于对内存映射的保护,它的合法值如下:

    • PROT_EXEC: 映射的内容可以执行
    • PROT_READ: 映射的内容可读
    • PROT_WRITE: 映射的内容可写
    • PROT_NONE: 映射的内容不能访问

    第4个参数用于制定是共享映射还是私有映射:

    • MAP_SHARED: 共享映射
    • MAP_PRIVATE: 私有映射
    • MAP_ANONYMOUS: 匿名映射fd参数必须是-1.

    MAP_SHARED 与 MAP_PRIVATE标志位两者必选一个。

    map系统调用的单元是页,参数addr和offset必须是页对齐,也就是页面大小的整数倍,linux下页面大小是4096字节(可通过getconf PAGESIZE查看)。

    返回值是内存映射起始地址

  • 销毁munmap

    int munmap(void *addr, size_t length);
    

    addr是mmap返回值,length是内存映射区域的大小。注意关闭对应的文件并不会引发munmap,因此在没有其他需要的情况下,可以调用close关闭文件。

    伪码示例:

    fd = open(...);
    addr = mmap(..., MAP_SHARED, fd, ...);
    close(fd);
    

POSIX共享内存

  • 简介

    mmap系统调用做了大量的工作,POSIX共享内存就是建立在mmap之上的。

  • 创建open

    #include <sys/mman.h>
    int shm_open(const char *name, int oflag, mode_t mode);
    

    第2个参数与其他与其他的类似,要包含O_RDONLY或O_RDWR标志,创建时用0_CREAT | O_EXCL。

    第3个参数共享内存的访问权限。

    调用成功后,会返回一个文件描述符,与其POSIX IPC一样,内核会自动设置FD_CLOEXEC标志位。

    共享内存是文件,可以调用文件相关的函数。新创建的共享内存,默认大小总是0,在调用mmap之前,需要先调用ftruncate函数,调节文件的大小。通过fstat接口可以获取共享内存区的大小。

    示例

    fd = shm_open(name,...); 		// 使用 mmap 映射进地址空间
    addr = mmap(NULL,length,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    
  • 删除unlink

    int shm_unlink(const char *name);
    

    删除一个共享内存对象,并不会影响既有的映射,内核维护引用计数,当所有进程全部munmap接触映射之后,共享内存对象才会被真正删除。

    如果不显示调用shm_unlink,共享内存对象中的数据则具有持久性,哪怕所有进程都通过munmap解除映射,只要不调用它,其中数据不会丢失。当然系统重启,共享内存对象也就不存在了。

7. 网络通信

简介

主要对socket套接字进行详细的分析,包括连接的建立、数据的发送与接收等等。

连接的建立socket

#include <sys/types.h>
#include <sys/socket.h>
int socket(int domain, int type, int protocol);

第1个参数domain:用于表示协议族,如AF_INET为IPv4

第2个参数type: 用于指示类型,如基于流通信的SOCK_STREAM

第3个参数protocal: 对应这种socket的具体协议类型。

成功创建socket后,会返回一个套接字文件描述符。失败时,接口返回-1。

绑定IP地址bind

#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

第1个参数sockfd:表示要绑定地址的套接字描述符

第2个参数addr:表示绑定到套接字的地址

struct sockadd {
    sa_family_t sa_family;
    char sa_data[14];
}

第3个参数addrlen: 表示绑定的地址长度

成功返回0,错误返回-1。

成功创建套接字后,该套接字仅仅是一个文件描述符,并没有任何地址与之关联,使用socket发送数据包时,由于该socket没有任何ip地址,内核会自动选择一个地址。一般对于服务端需要通过bind手动指定地址。

客户端连接connect

#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr,  socklen_t addrlen);

第1个参数 sockfd:套接字描述符

第2个参数 addr:要连接的地址

第3个参数addrlen:要连接的地址长度

对于TCP套接字来说,connect是3次握手,所以其默认是一个阻塞操作。也可以成为一个非阻塞的过程。

服务器监听listen

#include <sys/socket.h>
int listen(int sockfd, int backlog);

第1个参数sockfd:成功创建的TCP套接字

第2个参数backlog:定义TCP未处理连接的队列长度,该队列已经完成三次握手,但服务器端还没有执行accept的连接。

成功返回0,失败返回-1。

服务器连接accept

#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);

第1个参数sockfd:处于监听状态的套接字

第2个参数addr:保存客户端的地址信息

第3个参数addrlen:是一个输入输出值,调用者将其初始化为addr缓存的大小,accept返回时,设置为addr的大小。

第4个参数flags:accept4的标志,目前支持SOCK_NONBLOCK、SOCK_CLOEXEC。

成功返回一个非负的文件描述符,失败返回-1。

accept用于从指定套接字的连接队列中取出第一个连接,并返回一个新的套接字用于与客户端通信。

发送相关接口send

#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);

send只能用于处理已经连接的套接字。

sendto可以在调用时指定目的地址,如果套接字已经是连接状态,那么目的地址的dest_addr与addrlen应该是NULL和0。

sendmsg无论要发送的数据还是目的地址都保存在msg中,msg.msg_name和msg.msg_len用于指明目的地址,而msg.msg_iov用于保存要发送的数据。

flags只是标志位(稍现代的系统调用,一般都会有一个标志参数,可以从容的为系统调用增加新功能,并同时兼容老版本)

由于socket同时还是文件描述符,所以为文件提供的读写函数,也可以被socket直接调用

接收相关接口recv

#include <sys/socket.h>
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);

与send对称的接口

recv也是处理已连接的套接字。

recvfrom通过额外的参数 src_addr与addrlen来获得发送方的地址,addrlen也是输入输出值。

recvmsg与sengmsg相同,把接受的数据都保存在msg中。

都支持设置标志位flags

# Linux 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×