生产者消费者模式并不是(四人帮)GOF提出的23种设计模式之一,因为GOF那经典的23种模式主要是基于OO的。但生产者消费者模式是我们编程过程中最常用的一种设计模式之一。
什么是生产者消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔到容器中,消费者不找生产者要数据,而是直接从容器里取,容器相当于一个缓冲区,平衡了生产者和消费者的处理能力。
生产者消费者模式可以用在线程间,也可以用在进程之间,系统之间等。在多线程编程中使用生产者和消费者模式能够解决绝大多数并发问题。在进程或系统中使用生产者和消费者模式能够解决进程或系统间的耦合问题,该模式通过平衡生产者和消费者的工作能力来提高程序或的整体处理数据的速度。
拿线程间使用来举例:在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。
生产者和消费者模式带来的好处?
解耦:
由于有缓冲区的存在,生产者和消费者之间不直接依赖,耦合度降低,便于程序的扩展和维护。假设生产者和消费者分别是两个类,如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。其实生产者只需要生产数据,并把相关数据放在缓冲区,并不知道消费者对其做什么处理,甚至可以不知道消费者对存在。
支持并发,异步处理:
随着软件业的发展,互联网用户的日渐增多,动辄亿级别的PV,必须通过异步处理方式解决。缓冲区将生产模块和消费模块的分隔,他们之间仅通过缓冲区作为桥梁连接,可使生产模块的运行不再依赖消费模块的执行,由同步改为异步执行并且都可支持并发,从而大大提高了程序的效率。
削峰填谷:
如果生产者产生数据的速度时快时慢,缓冲区的好处就体现出来了。当数据产生快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,等生产者的制造速度慢下来,消费者再慢慢处理掉。这样可以有效避免消费模块任务执行负载不均的问题。
数据沉余:
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
可恢复性:
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
扩展性:
处理能力的扩展:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。增加功能的扩展:如果对用户的某个行为作为生产者生产数据的依据,而消费者消费数据后的动作是用来处理对应可异步的操作,那么在这基础上增加一个操作只需要增加此数据的消费者即可。
保护核心资源:
对于生产者生产数据频率不可控的场景,可以控制消费者消费的频率,达到对下层资源对保护。比如说消费数据过程中对Mysql对消耗比较严重,或者为了防止生产数据对频率过高传导到消费频率高,最后影响到Mysql对安全,可以人为限制消费者的消费频率,不受生产者影响从而保证Mysql安全。
代码层面上:
可以它简化的开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁。分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码。
使用场景?
异步处理:
应用解耦:
流量消峰:
资源保护:
日志处理:
消息通讯:
线程间使用的实现方式有多种,例如:同步synchronized,wait,notify方式;可重入锁ReentrantLock,Condition方式;阻塞队列方式等。
public class Storage implements IStorage {
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(10);
public void put(Object obj) { try { list.put(obj); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("进入缓冲区");
}public Object get() { Object obj = null; try { obj = list.take(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("离开缓冲区");
return obj; } }