进程间通信(InterProcess Communication, IPC):匿名管道,有名管道,信号,消息队列,共享内存,信号量,套接字,etc.
管道
匿名管道
- 历史上,管道是半双工的(即数据只能在一个方向上流动)。现在某些系统提供全双工管道,但是为了最佳可移植性,不应预先假定系统支持全双工管道。
- 匿名管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。
pipe
函数创建管道,fd[0]
为读而打开,fd[1]
为写而打开。
1 |
|
2 | int pipe(int fd[2]); |
通常,进程会先调用pipe,接着调用fork,从而创建从父进程到子进程的IPC通道。
fork之后做什么取决于我们想要的数据流的方向。对于从父进程到子进程的管道,父进程关闭管道的读端(fd[0]),子进程关闭写端(fd[1])。状态结构如下图所示。
- 当读(read)一个写端已被关闭的管道时,在所有数据都被读取后,read返回0,表示文件结束。
- 如果写(write)一个读端已被关闭的管道,则产生信号SIGPIPE。如果忽略该信号或者捕捉该信号并从其处理程序返回,则write返回-1,errno设置为EPIPE。
1 |
|
2 | int main(void) |
3 | { |
4 | int n; |
5 | int fd[2]; |
6 | pid_t pid; |
7 | char line[MAXLINE]; |
8 | |
9 | if (pipe(fd) < 0) |
10 | err_sys("pipe error"); |
11 | if ((pid = fork()) < 0) { |
12 | err_sys("fork error"); |
13 | } else if (pid > 0) { /* parent */ |
14 | close(fd[0]); |
15 | write(fd[1], "hello world\n", 12); |
16 | } else { /* child */ |
17 | close(fd[1]); |
18 | n = read(fd[0], line, MAXLINE); |
19 | write(STDOUT_FILENO, line, n); |
20 | } |
21 | exit(0); |
22 | } |
popen
函数:创建一个管道,fork一个子进程,关闭未使用的管道端,执行一个shell运行命令,然后等待命令终止。
1 |
|
2 | FILE *popen(const char *cmdstring, const char *type); |
3 | int pclose(FILE *fp); |
1 |
|
2 |
|
3 | |
4 |
|
5 | |
6 | int main(int argc, char *argv[]) |
7 | { |
8 | char line[MAXLINE]; |
9 | FILE *fpin, *fpout; |
10 | |
11 | if (argc != 2) |
12 | err_quit("usage: a.out <pathname>"); |
13 | if ((fpin = fopen(argv[1], "r")) == NULL) |
14 | err_sys("can't open %s", argv[1]); |
15 | |
16 | if ((fpout = popen(PAGER, "w")) == NULL) |
17 | err_sys("popen error"); |
18 | |
19 | /* copy argv[1] to pager */ |
20 | while (fgets(line, MAXLINE, fpin) != NULL) { |
21 | if (fputs(line, fpout) == EOF) |
22 | err_sys("fputs error to pipe"); |
23 | } |
24 | if (ferror(fpin)) |
25 | err_sys("fgets error"); |
26 | if (pclose(fpout) == -1) |
27 | err_sys("pclose error"); |
28 | exit(0); |
29 | } |
FIFO(命名管道)
FIFO可以在不相关的进程间交换数据。
1 |
|
2 | int mkfifo(const char *path, mode_t mode); |
3 | int mkfifoat(int fd, const char *path, mode_t mode); |
当使用mkfifo
创建一个FIFO时,需要用open
来打开它。当open
一个FIFO时,非阻塞标志(O_NONBLOCK)会产生如下影响:
- 没指定O_NONBLOCK是,只读open要阻塞到某个其它进程为写而打开这个FIFO为止。
- 指定了O_NONBLOCK时,只读open立即返回。如果没有进程为读而打开一个FIFO,那么只写open将返回-1,并将errno设置成ENXIO。
若write一个尚无进程为读而打开的FIFO,则产生信号SIGPIPE。若某个FIFO的最后一个写进程关闭了该FIFO,则将为该FIFO的读进程产生一个文件结束标志。
XSI IPC
每个内核中的IPC结构(消息队列,信号量或共享内存)都用一个非负整数的标识符(identifier)加以引用。
ftok
函数通关一个路径名和项目id产生一个键。
1 |
|
2 | key_t ftok(const char *path, int id); |
XSI IPC为每一个IPC结构关联了一个ipc_perm结构。该结构规定了权限和所有者,它至少包含下列成员:
1 | struct ipc_perm { |
2 | uid_t uid; /* owner's effective user id */ |
3 | gid_t gid; /* owner's effective group id */ |
4 | uid_t cuid; /* creator's effective user id */ |
5 | gid_t cgid; /* creator's effective group id */ |
6 | mode_t mode; /* access modes */ |
7 | //... |
8 | }; |
权限 | 位 |
---|---|
用户读 | 0400 |
用户写(更改) | 0200 |
组读 | 0040 |
组写(更改) | 0020 |
其它读 | 0004 |
其它写(更改) | 0002 |
在linux中可以运行ipcs -l
来显示IPC相关的限制。
消息队列
消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。
smgget
用于创建一个新队列或打开一个现有队列。msgsnd
将新消息添加到队列尾端。每个消息包含一个正的长整型类型的字段,一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传递给msgsnd
。msgrcv
用于从队列中读取消息。
1 |
|
2 | |
3 | struct msqid_ds { |
4 | struct ipc_perm msg_perm; /* see Section 15.6.2 */ |
5 | msgqnum_t msg_qnum; /* # of messages on queue */ |
6 | msglen_t msg_qbytes; /* max # of bytes on queue */ |
7 | pid_t msg_lspid; /* pid of last msgsnd() */ |
8 | pid_t msg_lrpid; /* pid of last msgrcv() */ |
9 | time_t msg_stime; /* last-msgsnd() time */ |
10 | time_t msg_rtime; /* last-msgrcv() time */ |
11 | time_t msg_ctime; /* last-change time */ |
12 | //... |
13 | }; |
14 | |
15 | int msgget(key_t key, int flag); |
16 | int msgctl(int msqid, int cmd, struct msqid_ds *buf); |
17 | int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag); |
18 | ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag); |
信号量
信号量是一个计数器,用于为多个进程提供对共享数据对象的访问。
1 |
|
2 | int semget(key_t key, int nsems, int flag); |
3 | int semctl(int semid, int semnum, int cmd, .../* union semun arg */); |
4 | |
5 | union semun{ |
6 | int val; |
7 | struct semid_ds *buf; |
8 | unsigned short *array; |
9 | }; |
10 | |
11 | int semop(int semid, struct sembuf semoparray[], size_t nops); |
12 | |
13 | struct sembuf{ |
14 | unsigned short sem_num; |
15 | short sem_op; |
16 | short sem_flg; |
17 | }; |
共享内存
1 |
|
2 | int shmget(key_t key, size_t size, int flag); |
3 | int shmctl(int shmid, int cmd, struct shmid_ds* buf); |
4 | void *shmat(int shmid, const void *addr, int flag); |
5 | int shmdt(const void *addr); |
1 |
|
2 |
|
3 | |
4 |
|
5 |
|
6 |
|
7 |
|
8 | |
9 | char array[ARRAY_SIZE]; /* uninitialized data = bss */ |
10 | |
11 | int main(void) |
12 | { |
13 | int shmid; |
14 | char *ptr, *shmptr; |
15 | |
16 | printf("array[] from %lx to %lx\n", (unsigned long)&array[0], |
17 | (unsigned long)&array[ARRAY_SIZE]); |
18 | printf("stack around %lx\n", (unsigned long)&shmid); |
19 | |
20 | if ((ptr = malloc(MALLOC_SIZE)) == NULL) |
21 | err_sys("malloc error"); |
22 | printf("malloced from %lx to %lx\n", (unsigned long)ptr, |
23 | (unsigned long)ptr+MALLOC_SIZE); |
24 | |
25 | if ((shmid = shmget(IPC_PRIVATE, SHM_SIZE, SHM_MODE)) < 0) |
26 | err_sys("shmget error"); |
27 | if ((shmptr = shmat(shmid, 0, 0)) == (void *)-1) |
28 | err_sys("shmat error"); |
29 | printf("shared memory attached from %lx to %lx\n", |
30 | (unsigned long)shmptr, (unsigned long)shmptr+SHM_SIZE); |
31 | |
32 | if (shmctl(shmid, IPC_RMID, 0) < 0) |
33 | err_sys("shmctl error"); |
34 | |
35 | exit(0); |
36 | } |
POSIX信号量
1 |
|
2 | sem_t *sem_open(const char *name, int oflag, .../* mode_t mode, |
3 | unsigned int value */); |
4 | int sem_close(sem_t *sem); |
5 | int sem_unlink(const char *name); |
6 | int sem_trywait(sem_t *sem); |
7 | int sem_wait(sem_t *sem); |
8 |
|
9 | int sem_timedwait(sem_t *restrict sem, |
10 | const struct timespec *restrict tsptr); |
11 | int sem_post(sem_t *sem); |
12 | int sem_init(sem_t *sem, int pshared, unsigned int value); |
13 | int sem_destroy(sem_t *sem); |
14 | int sem_getvalue(sem_t *restrict sem, int *restrict valp); |