经典生产者与消费者问题
1. 问题的引入
如同日常生活中一样,我们每天所消费的东西,都是生产部门进行生产,我们才有商品来进行消费。
那么就有一个很现实的问题,生产部门如果没有生产,我们是没有任何商品来消费的。
同时商品过多又会导致相对商品过剩,造成经济危机。
所以生产合适数量的商品来供我们消费,才可以维持动态平衡。
2.简单的例子实现
要模拟同时进行生产和消费,就必须至少有一个生产者与消费者来同时运作。
我们知道程序一般是顺序执行的,所以必须用到Java技术中的多线程。
也就是开两个线程分别来进行生产以及消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Stack<Character> stack = new Stack<>(); Thread producerThread = new Thread(() -> { while (true ){ producer.produce(stack); } }) ;producerThread .start () ;Thread consumerThread = new Thread (() -> { while (true ){ consumer.consume(stack); } }) ;consumerThread .start () ;
这里我们用stack作为容器的载体,通过stack的push( )与pop( )方法来实现随机字符(即Character)的产生以及消耗。
我们来运行这个程序(当然producer以及consumer我只是用了简单的描述来代替,下面会有两个类的具体实现)。
我们会发现一个非常严重的问题。
众所周知,Java多线程中,每个线程运行时,都在争夺CPU的资源,而且一个线程不可能长时间占用。
那么就会产生一种很糟糕的情况,即生产者一段时间内没有争夺到CPU的资源,而消费者一直消费,但是我们的容器 stack里面已经空了,stack无法pop,就会产生异常。
3.简单修改生产者以及消费者
那么设置边际条件,使当stack容器中的内容为0时,消费者停止消费,而等生产者生产后在开始消费。
运用多线程中的wait( )与notify( )方法,当stack容器中的内容为0时,让消费者wait( ),等待生产者生产之后再消费。
修改完的例子运行之后,发现一切正常,消费者消费,生产者生产,一切祥和稳定。
但真的是这样吗?
4.再次深入
如果我们不只有一个生产者与消费者,那么我们刚刚的程序会发生什么?
我们会发现,会产生多个消费者之间会发生一种复杂的现象,我用下面的图来解释这种现象:
可以看到,由于每个线程之间在争夺CPU的资源,所以会出现 consumer1 在对数据1 进行操作的时候,consumer2 也争夺到了CPU资源,而 consumer1 的操作还未完成,数据1 还未被改动,导致 consumer2 读取到的还时数据1,那么数据1 就会被操作两次,出现错误。
怎么解决呢,Java技术中,为了解决这种问题,为大家提供了关键字 synchronized 来解决,该关键字保证了当进行被 synchronized 关键字修饰的操作时,保证只有当前对象可以访问数据,其他的线程只能等待。
下面放上经过修改后的 Consumer 和 Producer 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class Consumer { private int count; public Consumer (int count) { this .count = count; } public void consume (Stack<Character> stack) throws InterruptedException { String consumeChar; synchronized (stack){ if (stack.size () == 0 ){ stack.wait (); }else { consumeChar = String .valueOf (stack.pop ()); assert consumeChar != null; System.out.println ("[ consumer" +count+" ] : consume " +consumeChar+"\n stack : " +stack.size ()+"\n right now content : " +stack.toString ()); stack.notify (); } } } }public class Producer { private int count; public Producer (int count) { this .count = count; } public void produceRandomChar (Stack<Character> stack) throws InterruptedException { Random random = new Random (); char randomChar = (char )(random.nextInt (25 )+65 ); synchronized (stack){ int MAX_SIZE = 10 ; if (stack.size () == MAX_SIZE){ stack.wait (); }else { stack.push (randomChar); System.out.println ("[ producer" +count+" ] : produce " +randomChar+"\n stack : " +stack.size ()+"\n right now content : " +stack.toString ()); stack.notify (); } } } }
5.生产者与消费者之间进行交互的Interaction类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class Interaction { public static void main(String [] args) throws InterruptedException { int count; Scanner scanner = new Scanner(System.in ); System.out.println("[ Please input the number of thread ]" ); count = scanner.nextInt(); Stack<Character> stack = new Stack<>(); Consumer[] consumers = new Consumer[count]; Producer[] producers = new Producer[count]; for (int i = 0 ; i < count; i++) { consumers[i] = new Consumer(i+1 ); producers[i] = new Producer(i+1 ); } Thread[] consumersThread = new Thread[count]; Thread[] producersThread = new Thread[count]; final boolean[] flag = {true ,true }; for (int i = 0 ; i < count; i++) { int finalI = i; Thread produce = new Thread(() -> { try { while (flag[0 ]){ producers[finalI].produceRandomChar(stack); Thread.sleep(500 ); if (stack.size() == 10 ){ flag[0 ] = false ; } } } catch (InterruptedException e) { e.printStackTrace(); } }) ; produce .start () ; producersThread [i ] = produce ; Thread consume = new Thread (() -> { try { while (flag[1 ]){ consumers[finalI].consume(stack); Thread.sleep(500 ); if (stack.size() == 0 && !flag[1 ]){ flag[0 ] = false ; } } } catch (InterruptedException e) { e.printStackTrace(); } }) ; consume .start () ; consumersThread [i ] = consume ; } for (Thread thread : consumersThread) { thread .join () ; } for (Thread thread: producersThread) { thread .join () ; } } }