经典生产者与消费者问题

经典生产者与消费者问题

1. 问题的引入

​ 如同日常生活中一样,我们每天所消费的东西,都是生产部门进行生产,我们才有商品来进行消费。

​ 那么就有一个很现实的问题,生产部门如果没有生产,我们是没有任何商品来消费的。

​ 同时商品过多又会导致相对商品过剩,造成经济危机。

​ 所以生产合适数量的商品来供我们消费,才可以维持动态平衡。

2.简单的例子实现

​ 要模拟同时进行生产和消费,就必须至少有一个生产者与消费者来同时运作。

​ 我们知道程序一般是顺序执行的,所以必须用到Java技术中的多线程。

​ 也就是开两个线程分别来进行生产以及消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Stack<Character> stack = new Stack<>();

Thread producerThread = new Thread(() -> {
while(true){
producer.produce(stack);
}
});
producerThread.start();

Thread consumerThread = new Thread(() -> {
while(true){
consumer.consume(stack);
}
});
consumerThread.start();

​ 这里我们用stack作为容器的载体,通过stack的push( )与pop( )方法来实现随机字符(即Character)的产生以及消耗。

​ 我们来运行这个程序(当然producer以及consumer我只是用了简单的描述来代替,下面会有两个类的具体实现)。

​ 我们会发现一个非常严重的问题。

​ 众所周知,Java多线程中,每个线程运行时,都在争夺CPU的资源,而且一个线程不可能长时间占用。

​ 那么就会产生一种很糟糕的情况,即生产者一段时间内没有争夺到CPU的资源,而消费者一直消费,但是我们的容器 stack里面已经空了,stack无法pop,就会产生异常。

3.简单修改生产者以及消费者

​ 那么设置边际条件,使当stack容器中的内容为0时,消费者停止消费,而等生产者生产后在开始消费。

​ 运用多线程中的wait( )与notify( )方法,当stack容器中的内容为0时,让消费者wait( ),等待生产者生产之后再消费。

​ 修改完的例子运行之后,发现一切正常,消费者消费,生产者生产,一切祥和稳定。

​ 但真的是这样吗?

4.再次深入

​ 如果我们不只有一个生产者与消费者,那么我们刚刚的程序会发生什么?

​ 我们会发现,会产生多个消费者之间会发生一种复杂的现象,我用下面的图来解释这种现象:

1577242149195

​ 可以看到,由于每个线程之间在争夺CPU的资源,所以会出现 consumer1 在对数据1 进行操作的时候,consumer2 也争夺到了CPU资源,而 consumer1 的操作还未完成,数据1 还未被改动,导致 consumer2 读取到的还时数据1,那么数据1 就会被操作两次,出现错误。

​ 怎么解决呢,Java技术中,为了解决这种问题,为大家提供了关键字 synchronized 来解决,该关键字保证了当进行被 synchronized 关键字修饰的操作时,保证只有当前对象可以访问数据,其他的线程只能等待。

​ 下面放上经过修改后的 ConsumerProducer 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Consumer {
private int count;

public Consumer(int count){
this.count = count;
}

public void consume(Stack<Character> stack) throws InterruptedException{
String consumeChar;
synchronized (stack){
if(stack.size() == 0){
stack.wait();
}else{
consumeChar = String.valueOf(stack.pop());
assert consumeChar != null;
System.out.println("[ consumer"+count+" ] : consume "+consumeChar+"\n stack : "+stack.size()+"\n right now content : "+stack.toString());
stack.notify();
}
}
}
}
public class Producer {
private int count;

public Producer(int count){
this.count = count;
}

public void produceRandomChar(Stack<Character> stack) throws InterruptedException {
Random random = new Random();
char randomChar = (char)(random.nextInt(25)+65);
synchronized (stack){
int MAX_SIZE = 10;
if(stack.size() == MAX_SIZE){
stack.wait();
}else{
stack.push(randomChar);
System.out.println("[ producer"+count+" ] : produce "+randomChar+"\n stack : "+stack.size()+"\n right now content : "+stack.toString());
stack.notify();
}
}
}
}

5.生产者与消费者之间进行交互的Interaction类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Interaction {
public static void main(String[] args) throws InterruptedException {
int count;
Scanner scanner = new Scanner(System.in);
System.out.println("[ Please input the number of thread ]");
count = scanner.nextInt();
Stack<Character> stack = new Stack<>();
Consumer[] consumers = new Consumer[count];
Producer[] producers = new Producer[count];
for (int i = 0; i < count; i++) {
consumers[i] = new Consumer(i+1);
producers[i] = new Producer(i+1);
}
Thread[] consumersThread = new Thread[count];
Thread[] producersThread = new Thread[count];
final boolean[] flag = {true,true};
for (int i = 0; i < count; i++) {
int finalI = i;
Thread produce = new Thread(() -> {
try {
while(flag[0]){
producers[finalI].produceRandomChar(stack);
Thread.sleep(500);
if(stack.size() == 10){
flag[0] = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
produce.start();
producersThread[i] = produce;

Thread consume = new Thread(() -> {
try {
while(flag[1]){
consumers[finalI].consume(stack);
Thread.sleep(500);
if(stack.size() == 0 && !flag[1]){
flag[0] = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consume.start();
consumersThread[i] = consume;
}

for(Thread thread : consumersThread){
thread.join();
}
for (Thread thread: producersThread) {
thread.join();
}
}
}