Linux环境编程之二

3. 进程

getpid获取进程id

pid_t getpid(void);
pid_t getppid(void);

gitpid函数获取进程的pid;

getppid获取父进程的pid。

系统进程数的上限可以从以下获取:

cat /proc/sys/kernel/pid_max
sysctl kernel.pid_max

进程的层次

pid_t getpgrp(void);
pid_t getsid(pid_t pid);
int setpgid(pid_t pid, pid_t pgid);
pid_t setsid(void);

getpgrp获取进程组id,

getsid获取会话的id,

setpgid设置进程的进程组,通常用来创建一个新的进程组

setsid创建新的会话,调用进程不能是进程组组长

可以通过ps axjf查看进程的层次关系

进程组和会话是支持shell作业控制而引入的概念。shell进程会话的首进程,会话的首进程pid就是整个会话的id。登录shell后,用户可能会使用管道让多个进程完成一项工作,这一组进程属于同一个进程组。

进程的创建fork

pid_t fork(void);

fork函数向子进程返回0,向父进程返回子进程的id,如果失败,返回-1,并设置errno。

示例:

ret = fork();
if(ret == 0)
{
	// ... 此处是子进程
}
else if(ret > 0)
{
	// ... 此处是父进程
}
else
{
	// ... fork 失败,error handle
}

ps:注意这里的子进程对内存的写时拷贝技术。子进程执行exec时,关闭父进程打开的文件。

ps:task 与file的关系 4.3.3

daemon进程

int daemon(int nochdir, int noclose);

第1个参数nochir:为0则将当前目录切换到根目录/,为1保持当前目录不变

第2个参数noclose:为0 将标准输入、输出、错误重定向到/dev/null,为1保持不变

一般情况下,这两者都是0.

daemon进程是在后台执行,不予任何终端相关联,生命周期很久的进程。

ps:思路一般是先通过fork出子进程,然后用子进程执行setsid,以及相应的设置。这里有个double-fork magic。

进程的终止exit

void _exit(int status);
void exit(int status);

_exit函数中status参数定义进程的终止状态,父进程可以通过wait()来获取该状态值。

exit函数最后也会调用_exit,但它还会进行一些清理操作,关闭打开的流,所有缓存的数据写入,删除tmpfile等。

等待子进程wait

pid_t wait(int *status);
pid_t waitpid(pid_t pid, int *status, int options);
int waitid(idtype_t idtype, id_t id,siginfo_t *infop, int options);
  • wait

    pid_t wait(int *status);
    

    wait成功时,返回已退出子进程的进程id;失败时返回-1,并设置errno。常见的error如下:

    errno说明
    ECHILD调用进程时发现没有子进程需要等待
    EINTR函数被信号中断

    wait调用时,如果没有子进程退出,会陷入阻塞。

  • waitpid

    pid_t waitpid(pid_t pid, int *status, int options);
    

    waitpid与wait返回值相同,status相同,但多了pid,有了精准等待的能力,根据pid不同,其含义如下:

    pid说明
    pid > 0表示等待进程id为pid的子进程
    pid = 0表示等待与调用进程同一进程组的任意子进程
    pid = -1表示等待任意子进程,与wait类似
    pid < -1等待所有子进程中,进程组id与pid绝对值相等的所有子进程

    option标志位如下:

    options说明
    WUNTRACE关心终止子进程信息,也关心因信号而停止的子进程信息
    WCONTINUED关心终止子进程信息,也关心因信号而恢复执行的子进程信息
    WNOHANG指定的子进程未发生状态变化,立即返回,不会阻塞。这种情况返回0

    status是一个int类型的指针,可以传递NULL,表示不关心子进程的状态信息,如果不为空,则根据status值,可以获取到子进程的更多信息。子进程退出有:正常退出、被信号所杀、被信号停止、被信号恢复等几种形式。

    Linux提供了很多宏来判断

    退出方式说明
    WIFEXITED(status)正常退出如果正常退出,返回true、反之false
    WEXITSTATUS(status) 如果正常退出,获取进程的退出状态
    WIFSIGNALED(status)被信号杀死如果被信号杀死,返回true、反之为false
    WTREMSIG(status) 如果被信号杀死,返回该信号
    WCOREDUMP(status) 如果子进程产生了core dump,返回true
    WIFSTOPPED(status)被信号停止如果被信号暂停,返回ture,反之返回false
    WSTOPSIG(status) 如果处于停止状态,返回暂停它的信号
    WIFCONTINUED(status)被信号恢复如果被信号恢复,返回true
  • waitid

    int waitid(idtype_t idtype, id_t id,siginfo_t *infop, int options);
    

    waitid 是等待最重要的函数,waitpid无论用户是否关心子进程的终止,终止事件都会返回用户。waitid提供了更精准的等待。

    第1、2个参数用户选择用户等待的子进程。idtype=P_PID精准等待某个id;idtype=P_PGID等待所有子进程中进程组id为id的进程;idtype=P_ALL等待任意子进程。

    第4个参数也了改变:WEXITED等待子进程的终止事件,WSTOPPED等待信号暂停的子进程事件,WCONTINUED等待被恢复执行的子进程。这几个标志位或计算可以等待多个。除此之外还有WNOHANG(不阻塞)、WNOWAIT(只获取状态,不给子进程收尸)。

    第3个参数inop是一个返回值。如果成功获取,返回进程的pid、uid,以及进程的退出方式等等。

exec家族

int execl(const char *path, const char *arg, ...);
int execlp(const char *file, const char *arg, ...);
int execle(const char *path, const char *arg, ..., char * const envp[]);
int execv(const char *path, char *const argv[]);
int execvp(const char *file, char *const argv[]);
int execve(const char *path, char *const argv[],char *const envp[]);

exec的6个函数都是建立在ececve系统调用之上的,该系统调用的作用是,将新程序加载到进程的地址空间,丢弃旧有的程序,进程的栈、数据段、堆会被新程序替换。

名字中有l表示采用参数列表(list),名字中有v标志参数数组(vector),名字中有p表示可以使用环境变量PATH,名字中有e表示自己维护环境变量。

先来看看execve:

int execve(const char *path, char *const argv[],char *const envp[]);

path为要执行文件的路径,argv是参数,与main函数的第2个入参相同,字符串指针数组,envp也是字符串指针数组,指向的字符串的格式为:name=value。

一般来说execve()函数总数跟着fork函数之后,表示抛弃父进程的程序段,和父进程分道。

execve有个特点,如果成功,不会返回,如果没有成功返回-1,并设置errno。

execve第1个参数必须是绝对路径或者是相对当前目录的路径(带p可以使用PATH环境变量),比如ls,需要写/bin/ls,第3个参数是环境变量指针,需要书写大量的“key=value”(不带e可以使用环境变量)。

比较而言execlp调用起来会最方便一些。

system函数

int system(const char *command);

需要执行命令作为command参数传入,最大的好处是使用方便,不需要自己调用fork,exec、waitpid,也不需要自己处理错误、处理信号。缺点是效率低一些,它会创建shell进程,然后再执行相关命令进程。

4. 信号

信号是一种进程间的通信,举个例子很多应用都有配置文件,如果配置文件发生改变,需要通知进程重新加载配置。一般而言,系统会默认采用SIGHUP信号通知目标进程重新加载。

  • 首先,目标进程约定,只要收到SIGHUP,就执行重新加载配置文件的动作,这称为信号的安装。
  • 然后,在某个时间,管理员改变了配置文件,或是在终端执行了kill -SIGHUP,或是调用C的API,发送信号。
  • 接着,LInux内核收到的信号,就在目标进程描述符里记录一笔,收到SIGHUP。并在适当的时机将信号传递给进程。在内核收到但还没传递给目标进程的这段时间,信号处于挂起pending状态。
  • 最后,进程收到信号,会暂停当前的控制流,转而执行信号处理函数。

信号的产生主要有3种:

  • 硬件异常
  • 终端相关的信号
  • 软件事件相关的信号

信号分成2类:不可靠信号,信号值在[1,31]之间的信号;可靠信号[SIGRTMIN, SIGRTMAX],不可靠信号是从Unix集成而来,所谓不可靠指的是发送的信号,内核不一定能传递给目标进程,信号可能会丢失。

信号的默认处理函数包括:

  • 显示的忽略信号ignore
  • 终止进程terminate
  • 生成core文件并终止进程core
  • 停止进程stop
  • 恢复进程continue

信号的安装

typedef void (*sighandler_t)(int);
sighandler_t signal(int signum, sighandler_t handler);

int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
struct sigaction {
void(*sa_handler)(int);
void(*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void(*sa_restorer)(void);
};

signal是对指定的信号signum,设置其回调函数handler,这个hander的入参就是信号值。

sigaction提供了更精准的控制

sa_handler是信号的处理函数,

sa_mask是信号处理函数期间的屏蔽信号集,

sa_flags标志如下:

  • SA_NOCLDSTOP,这个标志只用于SIGCHLD信号。父进程可以监控子进程状态3个事件:终止、停止、恢复,一旦父进程为SIGCHLD设置了这个标志位,那么子进程的停止、恢复就无须向父进程发送SIGCHLD信号了。
  • SA_NOCLDWAIT,这个标志也只用于SIGCHLD信号,如果父进程为SIGCHLD设置了这个信号,子进程退出就不会进入僵尸状态,而是直接自行了断。
  • SA_ONESHOT和SA_RESETHAND,表示信号处理函数是一次性的,信号处理完之后,就换成默认的处理函数了。
  • SA_NODEFER和SA_NOMASK,在信号处理函数执行期间,不阻塞当前信号。
  • SA_RESTART,如果系统中断被信号中断,则不返回错误,而是自动重启系统调用
  • SA_SIGINFO,表示信号发送者会提供额外的信息,这时候信号的处理函数应该是3个参数的void handle(int, siginfo_t *, void *)

信号的发送kill与sigqueue

int kill(pid_t pid, int sig);  	// 一般与signal连用
int tkill(int tid, int sig);		// 给线程发,系统调用
int tgkill(int tgid, int tid, int sig);	// 给线程发,系统调用
int raise(int sig);	
int sigqueue(pid_t pid, int sig, const union sigval value);		// 一般与sigaction连用
  • kill

    int kill(pid_t pid, int sig);  	// 一般与signal连用
    

    kill向指定进程、指定进程组发送信号。这个pid与waitpid有些类似

    pid说明
    pid > 0发送给进程id等于pid的进程
    pid = 0发送信号给调用进程所在同一个进程组的每一个进程
    pid = -1
    pid < -1向进程组-pid发送信号

    Kill的成功返回0, 失败返回-1,并设置errno。常见错误包括:

    errno说明
    EINVAL无效的信号值
    EPERM该进程没有权限发送信号给目标进程
    ESRCH目标进程或进程组不存在

    示例:

    if(kill(3423,SIGUSR1) == -1)
    {
    /*error handler*/
    }
    
  • tkill与tgkill

    tkilltgkill都是向某个线程发送,它俩都是系统调用,glibc没有封装,采用syscall方式调用

    ret = syscall(SYS_tkill,tid,sig)
    ret = syscall(SYS_tgkill,tgid,tid,sig)
    

    tkill相对tgkill是过时的,tgkill的第一参数tgid,为线程组中主线程的线程id(进程号),起到保护作用,防止向错误的线程发送信号(线程id复用)。

  • raise

    int raise(int sig);	
    

    向进程自身发送信号。对与进程相当于kill(getpid(), sig);,对于多线程相当于pthread_kill(pthread_self(),sig)。信号处理函数执行完毕后才返回。

  • sigqueue

    int sigqueue(pid_t pid, int sig, const union sigval value);		// 一般与sigaction连用
    

    与kill类似,但不能向进程组发送信号。但它有一个payload参数,能制定一点伴随的数据。所谓一点是sigval是一个联合体,如下

    union sigval {
    int sival_int;
    void *sival_ptr;
    };
    

    由于指针多个进程的地址空间各自独立,传递也就没有意义。

    sigqueue与sigaction是搭档,携带的sigval可以通过sigaction来获取到。

等待信号sigsuspend与sigwait

int pause (void);
int sigsuspend(const sigset_t *mask);

int sigwait(const sigset_t *set, int *sig);
int sigwaitinfo(const sigset_t *set, siginfo_t *info);
int sigtimedwait(const sigset_t *set, siginfo_t *info, const struct timespec *timeout);
  • pause

    pause函数将调用线程挂起,使进程进入可中断的睡眠状态,直到传递一个信号为止。信号若执行信号处理函数pause会在执行完后返回,若是终止进程,pause就不返回了,若是内核发出的信号被忽略,进程不会被唤醒。

    pause不能直接指定等待某个信号,即使先通过sigprocmask进行忽略,也存在非原子性,可能造成一致等待不到的情况。

  • sigsuspend

    sigsuspend 函数解决了这个问题,它通过mask指向的掩码设置进程阻塞掩码,并将进程挂起,直到捕捉到信号为止,一旦信号返回,sigsuspend函数会把进程的阻塞掩码恢复为调用之前老的阻塞掩码。

    示例:

    static volatile sig_atomic_t sig_received_flag = 0;
    sigset_t mask_all, mask_most, mask_old;
    
    int signum = SIGUSR1;
    sigfillset(&mask_all);
    sigfillset(&mask_most);
    sigdelset(&mask_most,signum);
    sigprocmask(SIG_SETMASK,&mask_all,&mask_old);
    /* 在 sigprocmask阻塞所有信号之前, SIGUSR1 可能已经 */
    if(sig_received_flag == 0)
    sigsuspend(&mask_most);
    sigprocmask(SIG_SETMASK,&mask_old,NULL);
    
  • sigwait

    sigwait可以更简单的等待某个特定信号的到来,信号集sigset是进程关心的信号。当调用sigwait系列函数时,内核会查看进程的信号挂起队列,检查set中是否有信号处于挂起状态,如果有就立即返回,没有就陷入阻塞。

    sigwait调用成功会返回0,失败-1。sigwaitinfo是加强版的sigwait,调用成功返回信号的值(signo),而不是0,失败返回-1.sigtimedwait与sigwaitinfo类似,它约定一个timeout时间,到时候后不来,就不再等待了。

    sigwait系列函数引入的竞争,正常信号处理流程,会从信号挂起队列中摘取信号传给进程,而sigwait也会从信号挂起队列中摘取信号。所以调用之前可以先将set中的信号阻塞,从而拿到防止竞争。

    示例:

    sigset_t mask_sigusr1;
    int sigusr1_count = 0;
    sigemptyset(&mask_sigusr1);
    sigaddset(&mask_sigusr1,SIGUSR1);
    sigprocmask(SIG_SETMASK,&mask_sigusr1,NULL);
    while(1)
    {
    sig = sigwaitinfo(&mask_sigusr1,&si);
    if(sig != -1)
     {
        sigusr1_count++;
     }
    }
    

通过fd等待信号signalfd

int signalfd(int fd, const sigset_t *mask, int flags);

描述:signalfdsigwaitinfo类似,属于同步等待信号范畴,都需要先调用sigprocmask将关注的信号屏蔽,以防止被进程的信号处理函数劫走。不同之处在于signalfd提供了文件系统的接口,可以通过select、poll、epoll来监控。

第1个参数fd,初次创建时,fd参数应该是-1,该函数会创建一个文件描述符,用于读取mask中到来的信号。如果fd不是-1,一般是修改mask的值,此时fd就是指点调用signalfd时返回的值。

第2个参数mask参数表示关注的信号集合,在调用前先用sigprocmask阻塞这些信号。

第3个参数flags用来控制行为,目前支持的标志如下:

  • SFD_CLOEXEC,与O_CLOEXEC一样,调用exec函数时,文件描述符会被关闭。
  • SFD_NONBLOCK,控制将来的读取操作,非阻塞读取。

创建完成后,使用read函数来读取到来的信号。提供的缓冲区大小一般要足以放下一个signalfd_siginfo结构体。

示例:

sigprocmask(SIG_BLOCK,&mask,NULL);
sfd = signalfd(-1,&mask,NULL);
for(;;)
{
n = read(sfd,&fd_siginfo,sizeof(struct signalfd_siginfo));
if(n != sizeof(struct signalfd_siginfo))
{
	/*error handle*/}
else{
	/*process the signal*/
 }
}

信号异步安全

信号的处理函数是对当前进程的一种中断,同一进程中出现2条执行流。

进程收到信号的执行流

这与嵌入编写回调函数相同,信号处理函数应该是轻量级的,快进快出的。比较常见的做法,信号处理函数非常短,基本就是设置标志位,然后由主程序根据标志位进程处理。另外,可以通过sigwait、signalfd变成同步方式。

异步的示例伪代码如下:

volatile sig_atomic_t get_SIGINT = 0;
/* 信号处理函数 */
void sigint_handler(int sig)
{
    switch(sig){
    case SIGINT:
            get_SIGINT = 1;
          	break;
	...
}
/* 主程序是循环 */
while(true)
{
    if(get_SIGINT==1)
    {
   		 /* 在主 序处理 SIGINT*/
    }
    job = get_next_job();
    do_single_job(job);
}

5. 线程

进程之间彼此地址空间是独立的,单线程会共享内存地址空间。

多线程的地址空间

POSIX线程库的接口:

POSIX函数函数功能描述
pthread_create创建一个线程
pthread_exit退出线程
pthread_self获取线程ID
pthread_equal检查两个线程ID是否相等
pthread_join等待线程退出
pthread_detach设置线程状态为分离状态
pthread_cancle线程的取消
pthread_cleanup_push线程退出,清理函数的注册
phtread_cleanup_pop线程退出,清理函数的执行

线程的id与pid类似的是tid

线程的创建pthread_create

#include <pthread.h>
int pthread_create(pthread_t *restrict thread,						//  创建成功,返回线程ID
										const pthread_attr_t *restrict attr,		// 定制线程的属性,栈大小、调度策略等
										void *(*start_routine)(void*),		// 线程要执行的函数
										void *restrict arg);			// start_routine的入参

成功返回0,不成功返回非0的错误码。

返回值描述
EAGAIN系统资源不够,或创建线程个数超限
EINVAL第二个参数attr不合法
EPERM没有合适的权限来设置调度策略

示例:

void * thread_worker(void *)
{
    printf( “ I am thread worker ” );
    pthread_exit(NULL)
}
pthread_t tid ;
int ret = 0;
ret = pthread_create(&tid,NULL,&thread_worker,NULL);
if(ret != 0)/*此处,不能用 ret < 0 作为出 判断 */
{
    /*ret is the errno*/
    /*error handler*/
}

这里的pthread_t不是线程的tid,它是NPTL线程库的范畴,线程库的后续操作,就是根据该线程ID来操作的。phread_t本质是分配在mmp区域的一个内存地址,可以通过pmap PID查看进程的地址空间情况。

可以给线程起一个有意义的名字,命名之后可以从procfs或者ps中看到线程的名字。通过proctl系统调用可以完成。

创建线线程时,第2个参数若传入NULL,则会创建默认属性的线程,这里可以设置的属性挺多,需要主要线程栈的大小(stack size)可以通过ulimit -s查看该大小,默认是8MB(8192)

线程的退出pthread_exit

#include <pthread.h>
void pthread_exit(void *value_ptr);

value_ptr存放线程的遗言,如果没有可以直接传递NULL。注意该指针不能指向线程自己栈上的变量。

线程的连接pthread_join

#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);

第1个参数是要等待的线程id,第2个参数用来接收返回值。

既然线程退出可以有返回值,那么就需要去获取该值,这里就有pthread_join函数,用来等待某线程的退出,并接收它的返回值。

有点像wait子进程,线程被成为join。但wait进程只能是父进程等子进程,而线程的join可以是该进程组中任一指定线程。

返回值说明
ESRCH传入线程ID不存在,无此线程
EINVAL已有其他线程连接该线程了
EDEADLK死锁

如果不连接已经退出的线程,会导致资源无法释放(栈空间)。

线程的分离pthread_detach

#include <pthread.h>
int pthread_detach(pthread_t thread);

入参是线程id,处于detached状态的线程退出时,系统将负责回收线程的资源。可以是线程组内其他线程对目标线程进程分离,也可以是线程自己执行该函数进行分离。

另外可以在创建线程时,将线程属性设定为已分离状态。

#include <pthread.h>
int pthread_attr_setdetachstate(pthread_attr_t *attr,int detachstate);
int pthread_attr_getdetachstate(pthread_attr_t *attr,int *detachstate);

detachstate可以是PTHREAD_CREATE_DETACHED表示创建出来就是分离状态,模式情况下是PTHREAD_CREATE_JOINABLE 可连接状态。

互斥量mutex

说分不说合,等于耍流氓。多线程的合作就是线程的同步问题。

#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;		// 静态初始化方法
int pthread_mutex_init(pthread_mutex_t *restrict mutex,		// 动态初始化方法
const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);		// 互斥量销毁

int pthread_mutex_lock(pthread_mutex_t *mutex);			   // 加锁,阻塞
int pthread_mutex_trylock(pthread_mutex_t *mutex);		// 加锁,非阻塞
int pthread_mutex_unlock(pthread_mutex_t *mutex);		// 解锁

动态初始化方法第2个参数设置互斥量的属性,一般传入NULL,采用默认属性。调用之后处于未加锁状态。

对于已经持某个锁的线程,再次申请该锁,根据不同属性会有不同的处理,有时会死锁(PTHREAD_MUTEX_NORMAL),有时会返回错误(PTHREAD_MUTEX_ERRORCHECK),有时会申请成功(PTHREAD_MUTEX_RECURSIVE)。注意:互斥量并不能做到先锁的先得到锁。

这里还有一种互斥锁的类型PTHREAD_MUTEX_ADAPTIVE_NP,它是互斥锁与自旋锁的结合。所谓自旋锁,就是在没有申请到锁之后,并不阻塞,而是一致尝试加锁,知道成功为止,但这样也有缺点,所以有了结合普通互斥锁与自旋锁的这种锁。

读写锁rwlock

#include <pthread.h>
pthread_rwlock_t rwlock=PTHREAD_RWLOCK_INITIALIZER;		// 静态初始化方法
int pthread_rwlock_init(pthread_rwlock_t *rwlock,const pthread_rwlockattr_t *attr); // 动态初始化方法
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);		// 销毁

//  读锁接口
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_timedrdlock(pthread_rwlock_t *rwlock,const struct timespec *abstime);

// 写锁接口
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *rwlock,const struct timespec *abstime);

读写锁创建时,第2个参数是pthread_rwlockattr_t,它可以设置锁的竞争策略,默认是读者优先,意思是当前是状态是读锁,如果线程申请读锁,此时纵然有写锁在等待队列上,仍然允许申请者获取读锁,而不是被写锁阻塞。另外一种是写者优先,就是阻塞再次的读。

读锁是共享锁,可以与其他读共享,但不与写锁共享;写锁是独占锁,不允许其他读、写操作。

当前锁状态读锁请求写锁请求
无锁OKOK
读锁OK阻塞
写锁阻塞阻塞

比较适合读写锁的场景是:临界区比较大,大多数是读,少数是写。

条件等待cond

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);

// 等待
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

// 发送
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_init的第2个参数可以为NULL,表示采用默认属性。

生产者-消费者模型中,如果任务队列为空,则消费者线程应该停工等待,一直到队列不空为止。条件等待是这样:线程在条件不满足时,主动让出互斥量,并进入阻塞等待,一旦条件满足,线程就立即唤醒。线程依赖其他线程的操作,它确信有一个线程在发现条件满足后,将向它发信号,并让出互斥量。

条件等待是需要跟互斥量连用的,等待示例如下:

pthread_mutex_lock(&m);
while(condition_is_false)
	pthread_cond_wait(&v,&m);// 此处阻塞
/* 如果代码运行到此处,则表示等待的条件已经满足 ,
* 并且在此持有斥量 *
/* 在满足条件情况下,做你想做的事情*/
...
pthread_mutex_unlock(&m);

发送示例如下:

pthread_mutex_lock(&m);
/*一些对共享数据的操作,会捯饬另一个线程等待条件满足*/
...
// 此处也可以是 pthread_cond_broadcast 函数
pthread_cond_signal(&cond);
pthread_mutex_unlock(&m);

线程取消cancle、cleanup

int pthread_cancel(pthread_t thread);
int pthread_setcancelstate(int state, int *oldstate);

一个线程向另外一个线程发送取消请求,该函数非阻塞,成功后立即返回0,否则返回错误码。

线程可以设置本线程是否可以被取消,PTHREAD_CANCEL_ENABLE(默认),PTHREAD_CANCEL_DISABLE

void pthread_cleanup_push(void (*routine)(void *),void *arg);
void pthread_cleanup_pop(int execute);

线程可以设置一个或多个清理函数,线程取消或退出时,会自动执行这些清理函数,以确保资源安全。以上2个函数要同时使用,并且在一个代码块中。这两个函数是用宏来实现的。

示例:

void *thread(void *param)
{
int input = (int)param;
printf("thread start\n");
pthread_cleanup_push(clean,"first cleanup handler");
pthread_cleanup_push(clean,"second cleanup handler");
/*work logic here*/
if(input != 0){
/*pthread_exit 退出,清理函数总
*/
pthread_exit((void*)1);
}
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
/*return 返回, 如果上面 pop 函数的参数是0,则不执行清理函数 */
return ((void *)0);
}

线程与信号

前面说的线程共享信号处理函数,但独享自己的信号掩码。

#include <signal.h>
// 设置掩码
int pthread_sigmask(int how, const sigset_t *new, sigset_t *old);

//  想线程发送信号
int pthread_kill(pthread_t thread, int sig);
int pthread_sigqueue(pthread_t thread, int sig, const union sigval value);

pthread_sigmask的第1个参数:

  • SIG_BLOCK向当前掩码中添加新的掩码
  • SIG_UNBLOCK从当信号掩码中删除
  • SIG_SETMASK将当前掩码替换成new

2个发送函数跟相应的进程发送函数类似,这里不多解释了。

另外:尽量少在多线程中使用信号,更不要在多线程中使用fork。

6. 进程间通信

进程间通信的手段,可以分成2大类:通信类与同步类。前者用于在进程间传递数据,后者用于协调进程间操作。下边来看看它们,包括管道、消息队列、信号量、mmap、共享内存等几种方式。

管道

  • 简介

    管道的作用是在有亲缘关系的进程之间传递消息,共同祖先调动pipe函数,打开管道文件就会在fork之后被各个后代进程所共享。一般是2个亲缘进程之间来通信,一个写另一个读,如果要双工,最好是2个管道来进行。

    管道是一种文件,可以调用read、write、close等接口来操作,它属于一种特殊的文件系统pipefs。管道中数据是阅后即焚的,读取操作是一种消耗行为。

  • 创建与读写

    #include <unistd.h>
    int pipe(int pipefd[2]);		// 创建
    int pipe2(int pipefd[2], int flags) // flag可以用O_NONBLOCK
    
    write(pipefd[1],wbuf,count);	// 写
    read(pipefd[0],rbuf,count);  // 读
    

    pipe函数返回2个文件描述符,一个是读描述符pipefd[0],一个是写描述符pipefd[1]。示例代码如下:

    int pipefd[2];
    pipe(pipefd);
    switch(fork())
    {
    case -1:
    	/*fork failed, error handler here*/
    case 0:		// 子进程
    	close(pipefd[1]) ; /* 关闭掉写入端对应的文件描述符,可以 pipefd[0] 调用 read*/
    	break ;
    default:  // 父进程
    	close(pipefd[0]); /* 关闭掉读取端文件描述符,可以 pipefd[1] 调用 write*/
    	break;
    }
    

    注意关闭未使用的管道文件描述符。如果所有读端已经关闭,写端再调用,会操作失败,errno被设置为EPIPE,同时内核会向写入进程发送SIGPIPE的信号。

  • 管道对应内存区大小

    管道本质是一片内存区域,默认大小是65536字节,可以通过fcntl来获取与修改。

    int pipe_capacity = fcntl(fd, F_GETPIPE_SZ); 		// 获取管道大小
    ret = fcntl(fd, F_SETPIPE_SZ, size);		// 设置管道大小
    

    可以执行cat /proc/sys/fs/pipe-max-size查看

  • 命名管道

    #include <sys/types.h>
    #include <sys/stat.h>
    int mkfifo(const char *pathname, mode_t mode);
    

    这个与文件I/O的create类似,mode文件的读写权限,pathname是目录。

    文件IO的open、read、write、close等都可以用在FIFO文件上。但FIFO文件不应该以O_RDWR模式打开。正确的使用方法是一个以O_RDONLY方式打开,另外一个以O_WRONLY打开。

    通常打开FIFO文件会同步读写进程,读写都打开才会返回,否则会阻塞。可以通过设置O_NONBLOCK来设置成异步方式。

POSIX消息队列

  • POSIX IPC简介

    以下几种方式,有System V风格与POSIX风格,POSIX风格的出现的要晚,具有后发优势,总体来讲要用于SystemV风格。

    消息队列信号量共享内存
    头文件<mqueue.h><semaphore.h><sys/mman.h>
    创建或打开mq_opensem_openshm_open+mmap
    关闭mq_closesem_closemunmap
    删除mq_unlinksem_unlinkshm_unlink
    执行IPCmq_send、mq_receivesem_post、sem_wait、sem_getvalue共享内存区域操作数据
    其他操作mq_getattr、mq_setattr、mq_notifysem_init、sem_destroy

    创建出来的共享内存、信号量,Linux将这些对象了挂载在/dev/shm目录处的tmpfs文件系统中。

    消息队列也可在文件系统中展示,需要手动挂载:

    mkdir /dev/mqueue
    mount -t mqueue none /dev/mqueue
    

    POSIX消息队列的名字必须以/打头,后续字符不允许出现/,打头的/字符不计入长度,最大长度为NAME_MAX(255字符)。

    POSIX信号量和共享内存可以以1个或多个/大豆,也可不以/打头,打头的/字符不计入长度,共享内存的最大长度是NAME_MAX,POSIX信号量的名字最大长度为NAME_MAX-4。

    示例:

    // 消息队列
    mqd_t mqd = mq_open(argv[1],O_RDWR|O_CREAT|O_EXCL,S_IRUSR|S_IWUSR,NULL);
    if(mqd == -1)
    {
    /*error handle*/
    }
    
    // 信号量
    sem_t* sem = sem_open(argv[1],O_CREAT|O_EXCL,S_IRUSR|S_IWUSR,1);
    if(sem == SEM_FAILED)
    {
    /*error handle*/
    }
    
    // 共享内存
    int shm_fd= shm_open(argv[1],O_RDWR|O_CREAT|O_EXCL,S_IRUSR|S_IWUSR);
    if(shm_fd == -1)
    {
    /*error handle*/
    }
    
  • 创建open

    #include <fcntl.h>
    #include <sys/stat.h>
    #include <mqueue.h>
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
    

    该函数与open类似,第2个参数oflgas的标志位包括:O_RDONLY、O_WRONLY、O_RDWR、O_CREAT、O_EXCL、O_NONBLOCK等。如果打开消息队列时,没有设置O_NONBLOCK,后续调用mq_send、mq_receive就会进入阻塞,反之则不会阻塞。

    第3个参数mode与第4个参数attr只有在创建消息队列时才有意义。mode是访问权限,attr设置了消息队列的属性,attr可以为NULL,表示默认属性。

    struct mq_attr {
    long mq_flags;				// 0或设置O_NONBLOCK
    long mq_maxmsg;		// 队列中最大消息个数
    long mq_msgsize;	// 单条消息允许的最大字节数
    long mq_curmsgs;	// 队列中当前的消息个数
    }
    

    默认的配置如下查看如设置:

    cat /proc/sys/fs/mqueue/msg_max
    cat /proc/sys/fs/mqueue/msgsize_max
    
    sysctl -w fs.mqueue.msg_max=4096
    sysctl -w fs.mqueue.msgsize_max=65536
    
  • 关闭close

    #include <mqueue.h>
    int mq_close(mqd_t mqdes);
    

    POSIX IPC对象维护有引用计数,在调用相关的close函数后,引用计数-1.

  • 销毁unlink

    #include <mqueue.h>
    int mq_unlink(const char *name);
    

    即使引用计数为0,只要不显示调用mq_unlink,该队列及队列上的消息就依然存在。

  • 调整消息队列的属性attr

    #include <mqueue.h>
    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
    struct mq_attr *oldattr);
    

    消息队列创建以后,可以通过调用它们来修改属性,但mq_maxmgs、mg_msgsize属性在创建时已经确定下来,mq_setattr不能修改这2个属性,mq_curmsgs是根据消息自己变化的,也就是说这两个函数仅用于修改mq_flags。

    mq_getattr(mqd,&attr);
    attr.mq_flags |= O_NONBLOCK; /* 设置 O_NONBLOCK 性 */
    //attr.mq_flags &=(~O_NONBLOCK);/* 取消 O_NONBLOCK 性 */
    mq_setattr(mqd,&attr,NULL)
    
  • 消息的发送send

    #include <mqueue.h>
    int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio);
    

    第3个参数是消息的长度,不要超过mq_msgsize

    第4个参数是消息的优先级,非负数。

    如果消息队列已满,mq_send可能阻塞,如果设置了O_NONBLOCK,会立即返回失败,errno设置为EAGAIN。

  • 消息的接收receive

    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
    

    对于POSIX消息队列而言,宗师取走优先级最高的消息中,最先到达的那个。

    第4个参数msg_proi不是NULL,系统将取到的消息体的优先级复制到msg_prio指向的整型变量,如果NULL,表示不在乎消息体的优先级。如果mg_receive调用时,消息队列中没有消息,则函数陷入阻塞,如果设置了O_NONBLOCK,则立即返回失败,并设置errno为EAGIAN。

    POSIX消息队列的本质就是个优先级队列,优先级高的总是被优先取出。

  • 消息的通知notify

    #include <mqueue.h>
    int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
    

    以上接口与管道没有多大什么区别,消息队列的特点就是本接口。也就是发布-订阅的功能。当空的队列中收到一条新的消息,就给相应的进程发送通知,这是一种典型的异步机制。

    POSI消息队列提供了2种异步的方法:1. 产生一个信号;2. 创建一个线程来执行一个事先制指定的函数。

    第2个参数:

    union sigval{
    int sigval_int;
    void *sigval_ptr;
    }
    struct sigevent {
    int sigev_notify; 	/* 决定采用哪种通知方式 ,信号还是线程,SIGEV_SIGNAL或SIGEV_THREAD*/
    int sigev_signo; 	/* 用 信号方式,决定发送哪个信号 */
    union sigval sigev_value; /* 信号或线程方式含义不同 */
    void (*sigev_notify_function)(union sigval);
    void *sigev_notify_attributes;
    }
    

    信号方式示例:

    mqd_t mqd;
    struct mq_attr attr ;
    sigset_t newmask ;
    struct sigevent sigev;
    mqd = mq_open(mq_filename,O_RDONLY|O_NONBLOCK);
    
    mq_getattr(mqd,&attr);
    buffer = malloc(attr.mq_msgsize) ; /* 保证buffer 足够大 */
    
    sigemptyset(&newmask);;
    sigaddset(&newmask,SIGUSR1);
    sigprocmask(SIG_BLOCK,&newmask,NULL);/* 阻塞等待的信号 */
    
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd,&sigev);
    
    for( ; ; )
    {
        sigwait(&newmask,&signo);/* 待 SIGUSR1 信号 */
        if(signo == SIGUSR1)
        {
            mq_notify(mqd,&sigev); /* 先重新注册 notify 函数 */
            while( n = mq_receive(mqd,buffer,attr.mq_msgsize,NULL) >= 0)
            {
            	/*process the message in buffer*/
            }
            if(errno != EAGAIN)
            {
            	/*some error happened*/
            }
        }
    }
    

    Linux系统支持在消息队列上执行select/epoll,以上过程会变得更简单:

    mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK);
    if(mqd == -1)
    {
        fprintf(stderr,"failed to open mqueue (%d: %s)\n",errno,strerror(errno));
        return 1;
    }
    mq_getattr(mqd,&attr);
    buffer = malloc(attr.mq_msgsize);
    
    FD_ZERO(&rset);
    for(;;)
    {
        FD_SET(mqd,&rset);
        nfds = select(mqd+1,&rset,NULL,NULL,NULL);
        if(FD_ISSET(mqd,&rset))
        {
            while((n = mq_receive(mqd,buffer,attr.mq_msgsize,NULL)) >=0 )
            {
            /* 在此处处理本条消息 */
            }
            if(errno != EAGAIN)
            {
                /* 处理错误 */
            }
        }
    }
    

POSIX信号量

  • 简介

    与信号量概念类似的是互斥量,它们都是为了同步多个进程的操作,信号量是与某个资源相关,信号量元素的值表示与之关联的资源的个数。内核维护信号量的值,并确保其值不小于0。互斥量保护临界区,可以看成资源个数是1,信号量的资源个数可以是多个,允许多个进程同时使用资源。

    POSIX提供了2种信号量:有名信号量与无名信号量。无名信号量是基于内存的,由于没有名字,没法通过open操作直接找到对应的信号量,很难在多进程之间使用。无名信号量多用于线程之间同步。有名信号量反之,可以用于多进程同步。它们在使用上的相同,创建、消耗时的接口不同。

  • 创建open

    #include <fcntl.h>
    #include <sys/stat.h>
    #include <semaphore.h>
    sem_t *sem_open(const char *name, int oflag);
    sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
    

    第2个参数支持O_CREATE、O_EXCL,表示创建信号量。

    最后一个value是新建信号的初始值,value的值在0和SEM_VALUE_MAX之间。

    创建失败,返回SEM_FAILED,并设置errno,不要尝试创建sem_t结构体的副本。

  • 关闭close

    #include <semaphore.h>
    int sem_close(sem_t *sem);
    

    关闭信号量,同时信号量的进程数的引用计数-1。进程终止时,有名信号量会自动关闭,当进程执行exec系统函数时,也会关闭。

  • 销毁unlink

    #include <semaphore.h>
    int sem_unlink(const char *name);
    

    与其他POSIX对象相同,只有当引用计数为0,才会真正删除。

  • 信号量使用wait、post

    #include <semaphore.h>
    // 等待信号,它们将会使信号量的值-1
    int sem_wait(sem_t *sem);
    int sem_trywait(sem_t *sem);
    int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
    
    // 发布信号量,归还资源,信号量值+1
    int sem_post(sem_t *sem);
    
    // 获取信号量的值
    int sem_getvalue(sem_t *sem, int *sval);
    
  • 无名信号量的创建init

    int sem_init(sem_t *sem, int pshared, unsigned int value);
    

    pshared参数用于声明信号量是在线程间共享还是在进程间共享。0表示在线程间,非0表示在进程间,想要在进程间使用,信号量必须位于共享内存区域内。

    对于线程间共享的信号量,线程组退出,也就自动销毁了;对于进程间共享的信号量,信号量的持久性与所在共享内存的持久性一样。

  • 无名信号量的销毁destroy

    int sem_destroy(sem_t *sem);
    

    至于所有进程都没有等待信号量时,才会被安全销毁。

内存映射mmap

  • 简介

    进程与其他进程共享物理内存,各个进程的页表条目指向RAM中相同的分页,如下图所示:

    通过mmap系统调用创建的内存映射可以做到进程之间共享,同时要不要映射也是可以选择的,取决于映射是私有映射(MAP_PRIVATE),还是共享映射(MAP_SHARED)。

  • 创建mmap

    #include <sys/mman.h>
    void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
    

    fd、offset、length3个参数制定内存映射的源,即将fd对应的文件,从offset位置起,将长度为length的内容映射到进程地址空间。

    第1个参数addr用于指定将文件对应的内存映射到进程地址空间的起始地址,一般来讲总是为NULL,表示内核自己选择。

    第3个参数用于对内存映射的保护,它的合法值如下:

    • PROT_EXEC: 映射的内容可以执行
    • PROT_READ: 映射的内容可读
    • PROT_WRITE: 映射的内容可写
    • PROT_NONE: 映射的内容不能访问

    第4个参数用于制定是共享映射还是私有映射:

    • MAP_SHARED: 共享映射
    • MAP_PRIVATE: 私有映射
    • MAP_ANONYMOUS: 匿名映射fd参数必须是-1.

    MAP_SHARED 与 MAP_PRIVATE标志位两者必选一个。

    map系统调用的单元是页,参数addr和offset必须是页对齐,也就是页面大小的整数倍,linux下页面大小是4096字节(可通过getconf PAGESIZE查看)。

    返回值是内存映射起始地址

  • 销毁munmap

    int munmap(void *addr, size_t length);
    

    addr是mmap返回值,length是内存映射区域的大小。注意关闭对应的文件并不会引发munmap,因此在没有其他需要的情况下,可以调用close关闭文件。

    伪码示例:

    fd = open(...);
    addr = mmap(..., MAP_SHARED, fd, ...);
    close(fd);
    

POSIX共享内存

  • 简介

    mmap系统调用做了大量的工作,POSIX共享内存就是建立在mmap之上的。

  • 创建open

    #include <sys/mman.h>
    int shm_open(const char *name, int oflag, mode_t mode);
    

    第2个参数与其他与其他的类似,要包含O_RDONLY或O_RDWR标志,创建时用0_CREAT | O_EXCL。

    第3个参数共享内存的访问权限。

    调用成功后,会返回一个文件描述符,与其POSIX IPC一样,内核会自动设置FD_CLOEXEC标志位。

    共享内存是文件,可以调用文件相关的函数。新创建的共享内存,默认大小总是0,在调用mmap之前,需要先调用ftruncate函数,调节文件的大小。通过fstat接口可以获取共享内存区的大小。

    示例

    fd = shm_open(name,...); 		// 使用 mmap 映射进地址空间
    addr = mmap(NULL,length,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    
  • 删除unlink

    int shm_unlink(const char *name);
    

    删除一个共享内存对象,并不会影响既有的映射,内核维护引用计数,当所有进程全部munmap接触映射之后,共享内存对象才会被真正删除。

    如果不显示调用shm_unlink,共享内存对象中的数据则具有持久性,哪怕所有进程都通过munmap解除映射,只要不调用它,其中数据不会丢失。当然系统重启,共享内存对象也就不存在了。

# Linux 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×