原文出处 http://cmsblogs.com/ 『chenssy'
ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的。
ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了 "不平衡性"。
先看 ArrayBlockingQueue 的定义:
- public class ArrayBlockingQueue < E > extends AbstractQueue < E > implements BlockingQueue < E > ,
- Serializable {
- private static final long serialVersionUID = -817911632652898426L;
- final Object[] items;
- int takeIndex;
- int putIndex;
- int count;
- // 重入锁
- final ReentrantLock lock;
- // notEmpty condition
- private final Condition notEmpty;
- // notFull condition
- private final Condition notFull;
- transient ArrayBlockingQueue.Itrs itrs;
- }
可以清楚地看到 ArrayBlockingQueue 继承 AbstractQueue,实现 BlockingQueue 接口。看过 java.util 包源码的同学应该都认识 AbstractQueue,改类在 Queue 接口中扮演着非常重要的作用,该类提供了对 queue 操作的骨干实现(具体内容移驾其源码)。BlockingQueue 继承 java.util.Queue 为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作,作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由 BlockingQueue 来完成。
ArrayBlockingQueue 内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作。
ArrayBlockingQueue 提供了诸多方法,可以将元素加入队列尾部。
方法较多,我们就分析一个方法:add(E e):
- public boolean add(E e) {
- return super.add(e);
- }
- public boolean add(E e) {
- if (offer(e)) return true;
- else throw new IllegalStateException("Queue full");
- }
add 方法调用 offer(E e),如果返回 false,则直接抛出 IllegalStateException 异常。offer(E e) 为 ArrayBlockingQueue 实现:
- public boolean offer(E e) {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count == items.length) return false;
- else {
- enqueue(e);
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
方法首先检查是否为 null,然后获取 lock 锁。获取锁成功后,如果队列已满则直接返回 false,否则调用 enqueue(E e),enqueue(E e) 为入列的核心方法,所有入列的方法最终都将调用该方法在队列尾部插入元素:
- private void enqueue(E x) {
- // assert lock.getHoldCount() == 1;
- // assert items[putIndex] == null;
- final Object[] items = this.items;
- items[putIndex] = x;
- if (++putIndex == items.length) putIndex = 0;
- count++;
- notEmpty.signal();
- }
该方法就是在 putIndex(对尾)为知处添加元素,最后使用 notEmpty 的 signal() 方法通知阻塞在出列的线程(如果队列为空,则进行出列操作是会阻塞)。
ArrayBlockingQueue 提供的出队方法如下:
poll()
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return (count == 0) ? null: dequeue();
- } finally {
- lock.unlock();
- }
- }
如果队列为空返回 null,否则调用 dequeue() 获取列头元素:
- private E dequeue() {
- final Object[] items = this.items;
- E x = (E) items[takeIndex];
- items[takeIndex] = null;
- if (++takeIndex == items.length) takeIndex = 0;
- count--;
- if (itrs != null) itrs.elementDequeued();
- notFull.signal();
- return x;
- }
该方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器 itrs 不为 null,则需要维护下该迭代器。最后调用 notFull.signal() 唤醒入列线程。
take()
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0) notEmpty.await();
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
take() 与 poll() 存在一个区别就是 count == 0 时的处理,poll() 直接返回 null,而 take() 则是在 notEmpty 上面等待直到被入列的线程唤醒。
来源: http://blog.csdn.net/chenssy/article/details/75841740