开篇先介绍下 BlockingQueue 这个接口的规则, 后面再看其实现.
阻塞队列概要
阻塞队列与我们平常接触的普通队列 (LinkedList 或 ArrayList 等) 的最大不同点, 在于阻塞队列的阻塞添加和阻塞删除方法.
阻塞添加
所谓的阻塞添加是指当阻塞队列元素已满时, 队列会阻塞加入元素的线程, 直队列元素不满时才重新唤醒线程执行元素加入操作.
阻塞删除
阻塞删除是指在队列元素为空时, 删除队列元素的线程将被阻塞, 直到队列不为空再执行删除操作(一般都会返回被删除的元素).
由于 Java 中的阻塞队列接口 BlockingQueue 继承自 Queue 接口, 因此先来看看阻塞队列接口为我们提供的主要方法
- public interface BlockingQueue<E> extends Queue<E> {
- // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
- // 在成功时返回 true, 如果此队列已满, 则抛 IllegalStateException.
- boolean add(E e);
- // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
- // 将指定的元素插入此队列的尾部, 如果该队列已满,
- // 则在到达指定的等待时间之前等待可用的空间, 该方法可中断
- boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
- // 将指定的元素插入此队列的尾部, 如果该队列已满, 则一直等到(阻塞).
- void put(E e) throws InterruptedException;
- // 获取并移除此队列的头部, 如果没有元素则等待(阻塞),
- // 直到有元素将唤醒等待线程执行该操作
- E take() throws InterruptedException;
- // 获取并移除此队列的头部, 在指定的等待时间前一直等到获取元素, // 超过时间方法将结束
- E poll(long timeout, TimeUnit unit) throws InterruptedException;
- // 从此队列中移除指定元素的单个实例(如果存在).
- boolean remove(Object o);
- }
这里我们把上述操作进行分类
插入方法:
add(E e) : 添加成功返回 true, 失败抛 IllegalStateException 异常
offer(E e) : 成功返回 true, 如果此队列已满, 则返回 false.
put(E e) : 将元素插入此队列的尾部, 如果该队列已满, 则一直阻塞
删除方法:
remove(Object o) : 移除指定元素, 成功返回 true, 失败返回 false
poll() : 获取并移除此队列的头元素, 若队列为空, 则返回 null
take(): 获取并移除此队列头元素, 若没有元素则一直阻塞.
阻塞队列的对元素的增删查操作主要就是上述的三类方法, 通常情况下我们都是通过这 3 类方法操作阻塞队列, 了解完阻塞队列的基本方法后, 下面我们将分析阻塞队列中的两个实现类 ArrayBlockingQueue 和 LinkedBlockingQueue 的简单使用和实现原理, 其中实现原理是这篇文章重点分析的内容.
ArrayBlockingQueue
在看源码之前, 通过查询 API 发现对 ArrayBlockingQueue 特点的简单介绍:
1, 一个由数组支持的有界队列, 此队列按 FIFO(先进先出)原则对元素进行排序.
2, 新元素插入到队列的尾部, 队列获取操作则是从队列头部开始获得元素
3, 这是一个简单的 "有界缓存区", 一旦创建, 就不能在增加其容量
4, 在向已满队列中添加元素会导致操作阻塞, 从空队列中提取元素也将导致阻塞
5, 此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略. 默认情况下, 不保证是这种排序的. 然而通过将公平性 (fairness) 设置为 true, 而构造的队列允许按照 FIFO 顺序访问线程. 公平性通常会降低吞吐量, 但也减少了可变性和避免了 "不平衡性".
简单的来说, ArrayBlockingQueue 是一个用数组实现的有界阻塞队列, 其内部按先进先出的原则对元素进行排序, 其中 put 方法和 take 方法为添加和删除的阻塞方法, 下面我们通过 ArrayBlockingQueue 队列实现一个生产者消费者的案例, 通过该案例简单了解其使用方式
使用示例
Consumer 消费者和 Producer 生产者, 通过 ArrayBlockingQueue 队列获取和添加元素, 其中消费者调用了 take()方法获取元素当队列没有元素就阻塞, 生产者调用 put()方法添加元素, 当队列满时就阻塞, 通过这种方式便实现生产者消费者模式. 比直接使用等待唤醒机制或者 Condition 条件队列来得更加简单.
- package com.zejian.concurrencys.Queue;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.TimeUnit;
- /**
- * Created by chenhao on 2018/01/07
- */
- public class ArrayBlockingQueueDemo {
- private final static ArrayBlockingQueue<Apple> queue= new ArrayBlockingQueue<>(1);
- public static void main(String[] args){
- new Thread(new Producer(queue)).start();
- new Thread(new Producer(queue)).start();
- new Thread(new Consumer(queue)).start();
- new Thread(new Consumer(queue)).start();
- }
- }
- class Apple {
- public Apple(){
- }
- }
- /**
- * 生产者线程
- */
- class Producer implements Runnable{
- private final ArrayBlockingQueue<Apple> mAbq;
- Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
- this.mAbq = arrayBlockingQueue;
- }
- @Override
- public void run() {
- while (true) {
- Produce();
- }
- }
- private void Produce(){
- try {
- Apple apple = new Apple();
- mAbq.put(apple);
- System.out.println("生产:"+apple);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 消费者线程
- */
- class Consumer implements Runnable{
- private ArrayBlockingQueue<Apple> mAbq;
- Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
- this.mAbq = arrayBlockingQueue;
- }
- @Override
- public void run() {
- while (true){
- try {
- TimeUnit.MILLISECONDS.sleep(1000);
- comsume();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- private void comsume() throws InterruptedException {
- Apple apple = mAbq.take();
- System.out.println("消费 Apple="+apple);
- }
- }
输出:
生产: com.zejian.concurrencys.Queue.Apple@109967f
消费 Apple=com.zejian.concurrencys.Queue.Apple@109967f
生产: com.zejian.concurrencys.Queue.Apple@269a77
生产: com.zejian.concurrencys.Queue.Apple@1ce746e
消费 Apple=com.zejian.concurrencys.Queue.Apple@269a77
消费 Apple=com.zejian.concurrencys.Queue.Apple@1ce746e
........
源码剖析
ArrayBlockingQueue 内部的阻塞队列是通过重入锁 ReenterLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平访问与非公平访问的区别, 对于公平访问队列, 被阻塞的线程可以按照阻塞的先后顺序访问队列, 即先阻塞的线程先访问队列. 而非公平队列, 当队列可用时, 阻塞的线程将进入争夺访问资源的竞争中, 也就是说谁先抢到谁就执行, 没有固定的先后顺序. 创建公平与非公平阻塞队列代码如下:
- // 默认非公平阻塞队列
- ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
- // 公平阻塞队列
- ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
- // 构造方法源码
- public ArrayBlockingQueue(int capacity) {
- this(capacity, false);
- }
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = new Object[capacity];
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
ArrayBlockingQueue 的内部是通过一个可重入锁 ReentrantLock 和两个 Condition 条件对象来实现阻塞, 这里先看看其内部成员变量
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- /** 存储数据的数组 */
- final Object[] items;
- /** 获取数据的索引, 主要用于 take,poll,peek,remove 方法 */
- int takeIndex;
- /** 添加数据的索引, 主要用于 put, offer, or add 方法 */
- int putIndex;
- /** 队列元素的个数 */
- int count;
- /** 控制并非访问的锁 */
- final ReentrantLock lock;
- /**notEmpty 条件对象, 用于通知 take 方法队列已有元素, 可执行获取操作 */
- private final Condition notEmpty;
- /**notFull 条件对象, 用于通知 put 方法队列未满, 可执行添加操作 */
- private final Condition notFull;
- /**
- 迭代器
- */
- transient Itrs itrs = null;
- }
从成员变量可看出, ArrayBlockingQueue 内部确实是通过数组对象 items 来存储所有的数据, 值得注意的是 ArrayBlockingQueue 通过一个 ReentrantLock 来同时控制添加线程与移除线程的并非访问, 这点与 LinkedBlockingQueue 区别很大 (稍后会分析). 而对于 notEmpty 条件对象则是用于存放等待或唤醒调用 take 方法的线程, 告诉他们队列已有元素, 可以执行获取操作. 同理 notFull 条件对象是用于等待或唤醒调用 put 方法的线程, 告诉它们, 队列未满, 可以执行添加元素的操作. takeIndex 代表的是下一个方法(take,poll,peek,remove) 被调用时获取数组元素的索引, putIndex 则代表下一个方法 (put, offer, or add) 被调用时元素添加到数组中的索引. 图示如下
添加
- //add 方法实现, 间接调用了 offer(e)
- public boolean add(E e) {
- if (offer(e))
- return true;
- else
- throw new IllegalStateException("Queue full");
- }
- //offer 方法
- public boolean offer(E e) {
- checkNotNull(e);// 检查元素是否为 null
- final ReentrantLock lock = this.lock;
- lock.lock();// 加锁
- try {
- if (count == items.length)// 判断队列是否满
- return false;
- else {
- enqueue(e);// 添加元素到队列
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
- // 入队操作
- private void enqueue(E x) {
- // 获取当前数组
- final Object[] items = this.items;
- // 通过 putIndex 索引对数组进行赋值
- items[putIndex] = x;
- // 索引自增, 如果已是最后一个位置, 重新设置 putIndex = 0;
- if (++putIndex == items.length)
- putIndex = 0;
- count++;// 队列中元素数量加 1
- // 唤醒调用 take()方法的线程, 执行元素获取操作.
- notEmpty.signal();
- }
这里的 add 方法和 offer 方法实现比较简单, 其中需要注意的是 enqueue(E x)方法, 当 putIndex 索引大小等于数组长度时, 需要将 putIndex 重新设置为 0, 因为后面讲到的取值也是从数组中第一个开始依次往后面取, 取了之后会将原位置的值设置为 null, 方便循环 put 操作, 这里要注意并不是每次都是取数组中的第一个值, takeIndex 也会增加. 因为做了添加操作, 数组中肯定不会空, 则 notEmpty 条件会唤醒 take()方法取值.
ok~, 接着看 put 方法, 它是一个阻塞添加的方法:
- //put 方法, 阻塞时可中断
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();// 该方法可中断
- try {
- // 当队列元素个数与数组长度相等时, 无法添加元素
- while (count == items.length)
- // 将当前调用线程挂起, 添加到 notFull 条件队列中等待唤醒
- notFull.await();
- enqueue(e);// 如果队列没有满直接添加..
- } finally {
- lock.unlock();
- }
- }
put 方法是一个阻塞的方法, 如果队列元素已满, 那么当前线程将会被 notFull 条件对象挂起加到等待队列中, 直到队列有空档才会唤醒执行添加操作. 但如果队列没有满, 那么就直接调用 enqueue(e)方法将元素加入到数组队列中. 到此我们对三个添加方法即 put,offer,add 都分析完毕, 其中 offer,add 在正常情况下都是无阻塞的添加, 而 put 方法是阻塞添加.
(获取)删除
关于删除先看 poll 方法, 该方法获取并移除此队列的头元素, 若队列为空, 则返回 null.
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 判断队列是否为 null, 不为 null 执行 dequeue()方法, 否则返回 null
- return (count == 0) ? null : dequeue();
- } finally {
- lock.unlock();
- }
- }
- // 删除队列头元素并返回
- private E dequeue() {
- // 拿到当前数组的数据
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- // 获取要删除的对象
- E x = (E) items[takeIndex];
将数组中 takeIndex 索引位置设置为 null
- items[takeIndex] = null;
- //takeIndex 索引加 1 并判断是否与数组长度相等,
- // 如果相等说明已到尽头, 恢复为 0
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;// 队列个数减 1
- if (itrs != null)
- itrs.elementDequeued();// 同时更新迭代器中的元素数据
- // 删除了元素说明队列有空位, 唤醒 notFull 条件对象添加线程, 执行添加操作
- notFull.signal();
- return x;
- }
接着看 take()方法, 是一个阻塞方法, 获取队列头元素并删除.
- // 从队列头部删除, 队列没有元素就阻塞, 可中断
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();// 中断
- try {
- // 如果队列没有元素
- while (count == 0)
- // 执行阻塞操作
- notEmpty.await();
- return dequeue();// 如果队列有元素执行删除操作
- } finally {
- lock.unlock();
- }
- }
take 和 poll 的区别是, 队列为空时, poll 返回 null,take 则被挂起阻塞, 直到有元素添加进来, take 线程被唤醒, 然后获取第一个元素并删除.
peek 方法非常简单, 直接返回当前队列的头元素但不删除任何元素.
- public E peek() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 直接返回当前队列的头元素, 但不删除
- return itemAt(takeIndex); // null when queue is empty
- } finally {
- lock.unlock();
- }
- }
- final E itemAt(int i) {
- return (E) items[i];
- }
ok~, 到此对于 ArrayBlockingQueue 的主要方法就分析完了.
来源: https://www.cnblogs.com/java-chen-hao/p/10234149.html