阻塞队列(BlockingQueue)是 Java 5 并发新特性中的内容,阻塞队列的接口是 java.util.concurrent.BlockingQueue,它提供了两个附加操作:当队列中为空时,从队列中获取元素的操作将被阻塞;当队列满时,向队列中添加元素的操作将被阻塞。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器。
阻塞队列提供了四种操作方法:
JDK7 提供了 7 个阻塞队列。分别是
下面分别简单介绍一下:
Java 中线程安全的内置队列还有两个:ConcurrentLinkedQueue 和 LinkedTransferQueue,它们使用了 CAS 这种无锁的方式来实现了线程安全的队列。无锁的方式性能好,但是队列是无界的,用在生产系统中,生产者生产速度过快,可能导致内存溢出。有界的阻塞队列 ArrayBlockingQueue 和 LinkedBlockingQueue,为了减少 Java 的垃圾回收对系统性能的影响,会尽量选择 array/heap 格式的数据结构。这样的话就只剩下 ArrayBlockingQueue。(先埋个坑在这儿,近来接触到了 disruptor,感觉妙不可言。disruptor)
这里分析下 ArrayBlockingQueue 的实现原理。
构造方法:
- ArrayBlockingQueue(intcapacity);
- ArrayBlockingQueue(intcapacity,booleanfair);
- ArrayBlockingQueue(intcapacity,booleanfair, CollectionextendsE> c)
ArrayBlockingQueue 提供了三种构造方法,参数含义如下:
插入元素:
- public void put(E e)throwsInterruptedException {checkNotNull(e);finalReentrantLock lock =this.lock;
- lock.lockInterruptibly();try{while(count == items.length)
- notFull.await();enqueue(e);
- }finally{
- lock.unlock();
- }
- }
从源码可以看出,生产者首先获得锁 lock,然后判断队列是否已经满了,如果满了,则等待,直到被唤醒,然后调用 enqueue 插入元素。
- private void enqueue(E x) {// assert lock.getHoldCount() == 1;
- // assert items[putIndex] == null;
- finalObject[] items =this.items;
- items[putIndex] = x;if(++putIndex == items.length)
- putIndex =0;
- count++;
- notEmpty.signal();
- }
以上是 enqueue 的实现,实现的操作是插入元素到一个环形数组,然后唤醒 notEmpty 上阻塞的线程。
获取元素:
- publicEtake()throwsInterruptedException {finalReentrantLock lock =this.lock;
- lock.lockInterruptibly();try{while(count ==0)
- notEmpty.await();return dequeue();
- }finally{
- lock.unlock();
- }
- }
从源码可以看出,消费者首先获得锁,然后判断队列是否为空,为空,则等待,直到被唤醒,然后调用 dequeue 获取元素。
- privateEdequeue() {// assert lock.getHoldCount() == 1;
- // assert items[takeIndex] != null;
- finalObject[] items =this.items;@SuppressWarnings("unchecked")
- E x = (E) items[takeIndex];
- items[takeIndex] =null;if(++takeIndex == items.length)
- takeIndex =0;
- count--;if(itrs !=null)
- itrs.elementDequeued();
- notFull.signal();returnx;
- }
以上是 dequeue 的实现,获取环形数组当前 takeIndex 的元素,并及时将当前元素置为 null,设置下一次 takeIndex 的值 takeIndex++,然后唤醒 notFull 上阻塞的线程。
还有其他方法
、
- offer(E e)
、
- poll()
、
- add(E e)
、
- remove()
等的实现,因为常用 take 和 put,这些方法就不一一赘述了。
- offer(E e, long timeout, TimeUnit unit)
使用阻塞队列实现生产者 - 消费者模式:
- /**
- * Created by noly on 2017/5/19.
- */
- public classBlockingQueueTest {public static void main(String[] args) {
- ArrayBlockingQueue queue = newArrayBlockingQueue(10);
- Consumer consumer =new Consumer(queue);
- Producer producer =new Producer(queue);
- producer.start();
- consumer.start();
- }
- }classConsumerextendsThread {privateArrayBlockingQueue queue;
- public Consumer(ArrayBlockingQueue queue){
- this.queue= queue;
- }@Override
- public void run() {while(true) {try{
- Integer i = queue.take();
- System.out.println("消费者从队列取出元素:"+ i);
- }catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }classProducerextendsThread {privateArrayBlockingQueue queue;
- public Producer(ArrayBlockingQueue queue){
- this.queue= queue;
- }@Override
- public void run() {for(inti =0; i <100; i++) {try{
- queue.put(i);
- System.out.println("生产者向队列插入元素:"+ i);
- }catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
如果不使用阻塞队列,使用 Object.wait() 和 Object.notify()、非阻塞队列实现生产者 - 消费者模式,考虑线程间的通讯,会非常麻烦。
参考资料:
聊聊并发(七)——Java 中的阻塞队列
阻塞队列和 ArrayBlockingQueue 源码解析
高性能队列——Disruptor
来源: http://www.cnblogs.com/aheizi/p/6871636.html