进程间通信(InterProcess Communication, IPC):匿名管道,有名管道,信号,消息队列,共享内存,信号量,套接字,etc.

管道

匿名管道

  • 历史上,管道是半双工的(即数据只能在一个方向上流动)。现在某些系统提供全双工管道,但是为了最佳可移植性,不应预先假定系统支持全双工管道。
  • 匿名管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。

pipe函数创建管道,fd[0]为读而打开,fd[1]为写而打开。

1
#include<unsitd.h>
2
int pipe(int fd[2]);

通常,进程会先调用pipe,接着调用fork,从而创建从父进程到子进程的IPC通道。

fork之后做什么取决于我们想要的数据流的方向。对于从父进程到子进程的管道,父进程关闭管道的读端(fd[0]),子进程关闭写端(fd[1])。状态结构如下图所示。

  • 当读(read)一个写端已被关闭的管道时,在所有数据都被读取后,read返回0,表示文件结束。
  • 如果写(write)一个读端已被关闭的管道,则产生信号SIGPIPE。如果忽略该信号或者捕捉该信号并从其处理程序返回,则write返回-1,errno设置为EPIPE。
1
#include "apue.h"
2
int main(void)
3
{
4
    int     n;
5
    int     fd[2];
6
    pid_t   pid;
7
    char    line[MAXLINE];
8
9
    if (pipe(fd) < 0)
10
        err_sys("pipe error");
11
    if ((pid = fork()) < 0) {
12
        err_sys("fork error");
13
    } else if (pid > 0) {       /* parent */
14
        close(fd[0]);
15
        write(fd[1], "hello world\n", 12);
16
    } else {                /* child */
17
        close(fd[1]);
18
        n = read(fd[0], line, MAXLINE);
19
        write(STDOUT_FILENO, line, n);
20
    }
21
    exit(0);
22
}

popen函数:创建一个管道,fork一个子进程,关闭未使用的管道端,执行一个shell运行命令,然后等待命令终止。

1
#include<stdio.h>
2
FILE *popen(const char *cmdstring, const char *type);
3
int pclose(FILE *fp);
1
#include "apue.h"
2
#include <sys/wait.h>
3
4
#define PAGER   "${PAGER:-more}" /* environment variable, or default */
5
6
int main(int argc, char *argv[])
7
{
8
    char    line[MAXLINE];
9
    FILE    *fpin, *fpout;
10
11
    if (argc != 2)
12
        err_quit("usage: a.out <pathname>");
13
    if ((fpin = fopen(argv[1], "r")) == NULL)
14
        err_sys("can't open %s", argv[1]);
15
16
    if ((fpout = popen(PAGER, "w")) == NULL)
17
        err_sys("popen error");
18
19
    /* copy argv[1] to pager */
20
    while (fgets(line, MAXLINE, fpin) != NULL) {
21
        if (fputs(line, fpout) == EOF)
22
            err_sys("fputs error to pipe");
23
    }
24
    if (ferror(fpin))
25
        err_sys("fgets error");
26
    if (pclose(fpout) == -1)
27
        err_sys("pclose error");
28
    exit(0);
29
}

FIFO(命名管道)

FIFO可以在不相关的进程间交换数据。

1
#include<sys/stat.h>
2
int mkfifo(const char *path, mode_t mode);
3
int mkfifoat(int fd, const char *path, mode_t mode);

当使用mkfifo创建一个FIFO时,需要用open来打开它。当open一个FIFO时,非阻塞标志(O_NONBLOCK)会产生如下影响:

  • 没指定O_NONBLOCK是,只读open要阻塞到某个其它进程为写而打开这个FIFO为止。
  • 指定了O_NONBLOCK时,只读open立即返回。如果没有进程为读而打开一个FIFO,那么只写open将返回-1,并将errno设置成ENXIO。

若write一个尚无进程为读而打开的FIFO,则产生信号SIGPIPE。若某个FIFO的最后一个写进程关闭了该FIFO,则将为该FIFO的读进程产生一个文件结束标志。

XSI IPC

每个内核中的IPC结构(消息队列,信号量或共享内存)都用一个非负整数的标识符(identifier)加以引用。

ftok函数通关一个路径名和项目id产生一个键。

1
#include<sys/ipc.h>
2
key_t ftok(const char *path, int id);

XSI IPC为每一个IPC结构关联了一个ipc_perm结构。该结构规定了权限和所有者,它至少包含下列成员:

1
struct ipc_perm {
2
    uid_t  uid;  /* owner's effective user id */
3
    gid_t  gid;  /* owner's effective group id */
4
    uid_t  cuid; /* creator's effective user id */
5
    gid_t  cgid; /* creator's effective group id */
6
    mode_t mode; /* access modes */
7
    //...
8
};
权限
用户读 0400
用户写(更改) 0200
组读 0040
组写(更改) 0020
其它读 0004
其它写(更改) 0002

在linux中可以运行ipcs -l来显示IPC相关的限制。

消息队列

消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。

smgget用于创建一个新队列或打开一个现有队列。msgsnd将新消息添加到队列尾端。每个消息包含一个正的长整型类型的字段,一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传递给msgsndmsgrcv用于从队列中读取消息。

1
#include<sys/msg.h>
2
3
struct msqid_ds {
4
    struct ipc_perm  msg_perm;     /* see Section 15.6.2 */
5
    msgqnum_t        msg_qnum;     /* # of messages on queue */
6
    msglen_t         msg_qbytes;   /* max # of bytes on queue */
7
    pid_t            msg_lspid;    /* pid of last msgsnd() */
8
    pid_t            msg_lrpid;    /* pid of last msgrcv() */
9
    time_t           msg_stime;    /* last-msgsnd() time */
10
    time_t           msg_rtime;    /* last-msgrcv() time */
11
    time_t           msg_ctime;    /* last-change time */
12
    //...
13
};
14
15
int msgget(key_t key, int flag);
16
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
17
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
18
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);

信号量

信号量是一个计数器,用于为多个进程提供对共享数据对象的访问。

1
#include<sys/sem.h>
2
int semget(key_t key, int nsems, int flag);
3
int semctl(int semid, int semnum, int cmd, .../* union semun arg */);
4
5
union semun{
6
    int                 val;
7
    struct semid_ds     *buf;
8
    unsigned short      *array;
9
};
10
11
int semop(int semid, struct sembuf semoparray[], size_t nops);
12
13
struct sembuf{
14
    unsigned short  sem_num;
15
    short           sem_op;
16
    short           sem_flg;
17
};

共享内存

1
#include<sys/shm.h>
2
int shmget(key_t key, size_t size, int flag);
3
int shmctl(int shmid, int cmd, struct shmid_ds* buf);
4
void *shmat(int shmid, const void *addr, int flag);
5
int shmdt(const void *addr);
1
#include "apue.h"
2
#include <sys/shm.h>
3
4
#define ARRAY_SIZE  40000
5
#define MALLOC_SIZE 100000
6
#define SHM_SIZE    100000
7
#define SHM_MODE    0600    /* user read/write */
8
9
char    array[ARRAY_SIZE];  /* uninitialized data = bss */
10
11
int main(void)
12
{
13
    int     shmid;
14
    char    *ptr, *shmptr;
15
16
    printf("array[] from %lx to %lx\n", (unsigned long)&array[0],
17
                    (unsigned long)&array[ARRAY_SIZE]);
18
    printf("stack around %lx\n", (unsigned long)&shmid);
19
20
    if ((ptr = malloc(MALLOC_SIZE)) == NULL)
21
        err_sys("malloc error");
22
    printf("malloced from %lx to %lx\n", (unsigned long)ptr,
23
                    (unsigned long)ptr+MALLOC_SIZE);
24
25
    if ((shmid = shmget(IPC_PRIVATE, SHM_SIZE, SHM_MODE)) < 0)
26
        err_sys("shmget error");
27
    if ((shmptr = shmat(shmid, 0, 0)) == (void *)-1)
28
        err_sys("shmat error");
29
    printf("shared memory attached from %lx to %lx\n",
30
        (unsigned long)shmptr, (unsigned long)shmptr+SHM_SIZE);
31
32
    if (shmctl(shmid, IPC_RMID, 0) < 0)
33
        err_sys("shmctl error");
34
35
    exit(0);
36
}

POSIX信号量

1
#include<semaphore.h>
2
sem_t *sem_open(const char *name, int oflag, .../* mode_t mode,
3
        unsigned int value */);
4
int sem_close(sem_t *sem);
5
int sem_unlink(const char *name);
6
int sem_trywait(sem_t *sem);
7
int sem_wait(sem_t *sem);
8
#include<time.h>
9
int sem_timedwait(sem_t *restrict sem,
10
        const struct timespec *restrict tsptr);
11
int sem_post(sem_t *sem);
12
int sem_init(sem_t *sem, int pshared, unsigned int value);
13
int sem_destroy(sem_t *sem);
14
int sem_getvalue(sem_t *restrict sem, int *restrict valp);