Matrix Wall

进程间通信

进程经常需要与其他进程通信,所以我们就来讨论一些关于进程间通信(Inter Process Communication, IPC)的问题。

一、竞争状态(race condition)

  • 两个或多个进程对同一共享数据同时进行读写操作,而最后的结果是不可预测的,它取决于各个进程具体运行情况。
  • 在同一时刻,只允许一个进程访问该共享数据,即如果当前已有一个进程正在使用该数据,那么其他进程暂时不能访问。这就是互斥的概念。

那么为了避免竞争状态,我们就需要互斥,也就是当一个进程在使用一个共享变量或文件时,其他进程不能做同样的操作。这样就引入了临界区的概念。

二、临界区

临界区的定义:

每个进程中访问临界资源的那段代码称为临界区。
临界资源指的是一次只允许一个进程使用的资源称为临界资源,例如打印机、变量。

所以如果我们能通过适当的安排,使得两个进程不可能同时处于临界区中,就能避免竞争状态。

尽管这样的要求避免了竞争状态,但它还不能保证使用共享数据的并发进程能够正确和高效地进行协作。所以对于一个好的解决方案,需要满足一下4个:

  1. 任何两个进程都不能同时进入临界区;
  2. 不应对CPU的速度和数量做任何假设;
  3. 临界区外运行的进程不得阻塞其他进程;
  4. 不得使进程无限期等待进入临界区。

三、同步机制应遵循的准则

1.空闲让进

  • 临界自愿处于空闲状态,允许进程进入临界区
  • 临界区内仅有一个进程执行

2.忙则等待

  • 临界区有进程正在执行其中的代码,所有其他进程则不可以进入临界区

3.有限等待

  • 对要求访问临界区的进程 应在保证在有限时间内进入自己的临界区,避免死等。

4.让权等待

  • 当进程不能进入自己的临界区时,应立即释放处理机,避免忙等。

四、实现互斥的方案

1.屏蔽中断

在单处理器系统中,最简单的方法是使每个进程在刚刚进入临界区后立即屏蔽所有中断,并在就要离开之前打开中断。
屏蔽中断后,时钟中断也会被屏蔽。
CPU只有在发生时钟中断或其他中断时才会进行进程切换,这样,在屏蔽中断后CPU就不会被切换到其他进程。

2.锁变量

假设有一个共享变量,初始值为0。当一个进程想进入临界区时,它就会首先测试这把锁。
如果锁的值为0,则该进程就将其设置为1并进入临界区。
如果锁的值已经为1,那么进程就一直等待直到锁的值为0.
所以0就表示临界区内没有进程,1表示已经有某个进程进入临界区。

3.严格轮换法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
while(TRUE){
while(turn != 0)
critical_region(); /*循环*/
turn = 1;
noncritical_region();
}
```

>a)

```C
while(TRUE){
while(turn != 1)
critical_region(); /*循环*/
turn = 0;
noncritical_region();
}

```

>b)

### 4.Peterson解法
在进入临界区前,各个进程使用其进程号01作为参数来调用enter_region。该调用在需要时将使进程等待,直到能安全地进入临界区。在完成对共享变量的操作后,进程将调用leave_region,表示操作已完成,若其他进程希望进入临界区,则现在就可以进入。

```C
#define FALSE 0
#define TRUE 1
#define N 2 /*进程数量*/

int turn; /*现在轮到谁?*/
int interested[N]; /*所有值初始化为0(FALSE)*/

void enter_region(int process) /*进程是0或1*/
{
int other; /*其他进程号*/

other = 1 - process; /*另一方进程*/
interested[process] = TRUE; /*表明所感兴趣的*/
turn = process; /*设置标志*/
while(turn == process && interested[other] == TRUE); /*空语句*/
}

void leave_region(int process) /*进程:谁离开?*/
{
interested[process] = FALSE; /*表示离开临界区*/
}

五、生产者-消费者问题

生产者-消费者问题又称为有界缓冲区问题。两个进程共享一个公共的固定大小的缓冲区。

其中一个是生产者,将消息放入缓冲区;另一个是消费者,从缓冲区中取出消息。

而问题在于当缓冲区已满,而此时生产者还想向其中放入一个新的数据项的情况。其解决方法是让生产者睡眠,待消费者从缓存区中取出一个或多个数据项时再唤醒它。

当消费者试图从缓冲区中取数据而发现缓冲区为空时,消费者就睡眠,直到生产者向其中放入一些数据时再将其唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#define N 100                                      /*缓冲区中的槽数目*/
int count = 0; /*缓冲区中的数据项数目*/
void producer(void)
{
int item;

while(TRUE){ /*无限循环*/
item = produce_item(); /*产生下一新数据项*/
if(count == N) sleep(); /*如果缓冲区满了,就进入休眠状态*/
insert_item(item); /*将新数据项放入缓冲区中*/
count = count + 1; /*将缓冲区的数据项计数器增1*/
if(count == 1) wakeup(consumer);
}
}

void consumer(void)
{
int item;

while(TRUE){ /*无限循环*/
if(count == 0) sleep(); /*如果缓冲区空,则进入休眠状态*/
item = remove_item(); /*从缓冲区中取出一个数据项*/
count = count - 1; /*将缓冲区的数据项计数器减1*/
if(count == N - 1) wakeup(producer); /*缓冲区满吗?*/
consume_item(item); /*打印数据项*/
}
}
```
这里还是有可能会出现竞争状态,其原因是对count的访问未加限制。

我们来看这样一个情况:缓冲区为空,消费者刚刚读取count的值发现它为0,此时调度程序决定暂停消费者而唤醒生产者。生产者向缓冲区中加入一个数据项,count加1。现在count的值变成1了,它推断由于count刚刚为0,所以消费者一定在睡眠,于是生产者调用**wakeup**来唤醒消费者。

但此时消费者并没有睡眠,所以这个**wakeup**信号就会丢失,当消费者下次运行时,它将测试先前读到的count值,发现它为0,于是睡眠。而生产者迟早会填满整个缓冲区,然后睡眠,这样一来,两个进程将永远睡眠下去。

所以现在问题的实质在于一个wakeup信号的丢失。虽然在这里我们可以加上一个**唤醒等待位**,但当问题中有三个或者更多进程时一个唤醒等待位就不够了,所以这并没有从本质上解决问题。

## 六、信号量
E.W.Dijkstra提出一种方法,它使用一个整型变量来累计唤醒次数,供以后使用。

这个引入的新变量类型就叫做**信号量(semaphore)**。

一个信号量的取值可以为0(表示没有保存下来的唤醒操作)或正值(表示有一个或多个唤醒操作)。

Dijkstra建议设立两种操作:down(P)和up(V)。


- down
如果信号量的值大于0,则将其值减1并继续;
若该值为0,则进程将睡眠。并将继续down操作。

- up
对信号量的值增1


### 用信号量解决生产者-消费者问题
#### 该方案使用了三个信号量:
- **full** 用来记录充满的缓冲槽数目;
- **empty** 记录空的缓冲槽总数;
- **mutex** 确保生产者和消费者不会同时访问缓冲区。

full的初值为0,empty的初值为缓冲区中槽的数目,mutex初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区,称为二元信号量。如果每个进程在进入临界区前都执行一个down操作,并在刚刚退出时执行一个up操作,就能够实现互斥。

```C
#define N 100 /*缓冲区中的槽数目*/
typedef int semaphore; /*信号量是一种特殊的整型数据*/
semaphore mutex = 1; /*控制对临界区的访问*/
semaphore empty = N; /*计数缓冲区的空槽数目*/
semaphore full = 0; /*计数缓冲区的满槽数目*/

void producer(void)
{
int item;

while(TRUE){ /*TRUE是常量1*/
item = produce_item(); /*产生放在缓冲区中的一些数据*/
down(&empty); /*将空槽数目减1*/
down(&mutex); /*进入临界区*/
insert_item(item); /*将新数据项放到缓冲区中*/
up(&mutex); /*离开临界区*/
up(&full); /*将满槽的数目加1*/
}
}

void consumer(void)
{
int item;

while(TRUE){ /*无限循环*/
down(&full); /*将满槽数目减1*/
down(&mutex); /*进入临界区*/
item = remove(); /*从缓冲区中取出数据项*/
up(&mutex); /*离开临界区*/
up(&empty); /*将空槽数加1*/
consumer_item(item); /*处理数据项*/
}
}