LinkedBlockingQueue 的基础
LinkedBlockingQueue 是一个基于链表的阻塞队列, 实际使用上与 ArrayBlockingQueue 完全一样, 我们只需要把之前烤鸡的例子中的 Queue 对象替换一下即可. 如果对于 ArrayBlockingQueue 不熟悉, 可以去看看 https://juejin.im/post/5c0f79f3f265da61561f1bec
LinkedBlockingQueue 源码分析
源码在 node 上注释写明了, 它是基于一个 "two lock queue" 算法实现的, 感兴趣的同学可以参考这篇 paper: 这篇文章为了提升在多处理器的机器上的更好性能的并发而提出了这个算法, 其中心思想是: 通过两把锁分别控制并发, 入队时: 只需要锁 Tail Node, 出队时, 只需要锁 Head Node. 回到 LinkedBlockingQueue, 先看看内部成员变量:
- public class LinkedBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- private static final long serialVersionUID = -6903933977591709194L;
- /**
- * Linked list node class
- */
- static class Node<E> {
- E item;
- /**
- * One of:
- * - the real successor Node
- * - this Node, meaning the successor is head.next
- * - null, meaning there is no successor (this is the last node)
- */
- Node<E> next;
- Node(E x) { item = x; }
- }
- /** The capacity bound, or Integer.MAX_VALUE if none */
- private final int capacity;
- /** Current number of elements */
- private final AtomicInteger count = new AtomicInteger();
- /**
- * Head of linked list.
- * Invariant: head.item == null
- */
- transient Node<E> head;
- /**
- * Tail of linked list.
- * Invariant: last.next == null
- */
- private transient Node<E> last;
- /** Lock held by take, poll, etc */
- private final ReentrantLock takeLock = new ReentrantLock();
- /** Wait queue for waiting takes */
- private final Condition notEmpty = takeLock.newCondition();
- /** Lock held by put, offer, etc */
- private final ReentrantLock putLock = new ReentrantLock();
- /** Wait queue for waiting puts */
- private final Condition notFull = putLock.newCondition();
每个添加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点(这个 node 不同于 AQS 中的 node, 它是一个单向链表), 其中 head 和 last 分别指向队列的头结点和尾结点. 与 ArrayBlockingQueue 不同的是, LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制, 也就是说, 添加和删除操作并不是互斥操作, 可以同时进行, 这样也就可以大大提高吞吐量. 这里再次强调如果没有给 LinkedBlockingQueue 指定容量大小, 其默认值将是 Integer.MAX_VALUE, 如果存在添加速度大于删除速度时候, 有可能会内存溢出, 这点在使用前希望慎重考虑. 至于 LinkedBlockingQueue 的实现原理图与 ArrayBlockingQueue 是类似的, 除了对添加和移除方法使用单独的锁控制外, 两者都使用了不同的 Condition 条件对象作为等待队列, 用于挂起 take 线程和 put 线程. 总结如下图:
LinkedBlockingQueue 的阻塞添加
同样的, 添加的方法主要有: add offer 和 put. 我们先看看非阻塞添加的 add 和 offer 方法, 这两个方法的区别同样是添加失败时, add 方法是抛异常, offer 方法是返回 false
- public boolean add(E e) {
- if (offer(e))
- return true;
- else
- throw new IllegalStateException("Queue full");
- }
- public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- // 因为存在并发操作移出和入队互不冲突, 与 arrayBlockingQueue 不同, count 被声明为 Atomic
- final AtomicInteger count = this.count;
- // 队列满了直接返回
- if (count.get() == capacity)
- return false;
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- // 因为存在并发问题, 加锁之后再次判断一下队列有没有满
- if (count.get() <capacity) {
- // 入队
- enqueue(node);
- // 容量 + 1 返回旧值
- c = count.getAndIncrement();
- // 因为在入队时可能同时有出队的线程同时把元素移除, 所以在入队后做一个补偿,
- // 如果队列还有空间, 那么唤醒一个如归的线程执行添加操作
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- putLock.unlock();
- }
- //c==0, 只有可能最开始就是一个空队列 (注意上面的 c 返回的是旧值) 此时因为刚好添加了一个元素,
- // 所以唤醒消费的线程去取移出元素
- if (c == 0)
- signalNotEmpty();
- return c>= 0;
- }
- // 入队操作
- private void enqueue(Node<E> node) {
- // 队列尾节点指向新的 node 节点
- last = last.next = node;
- }
- //signalNotEmpty 方法去唤醒移出元素的线程, 为什么要先获取锁才能 signal 呢? 不懂的同学回去看看 AQS:
- // 因为条件队列是基于 AQS 的锁存在的, 用法上必须要这么用, 否则会抛出异常
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- // 唤醒获取并删除元素的线程
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
这里的 Offer()方法做了两件事:
第一件事是判断队列是否满, 满了就直接释放锁, 没满就将节点封装成 Node 入队, 然后加锁后再次判断队列添加完成后是否已满, 不满就继续唤醒等到在条件对象 notFull 上的添加线程.
第二件事是, 判断是否需要唤醒等到在 notEmpty 条件对象上的消费线程.
接下来看看 put 方法, 与 offer 方法如出一辙:
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- // 锁可被中断
- putLock.lockInterruptibly();
- try {
- // 队列满时加入 notFull 条件队列
- while (count.get() == capacity) {
- notFull.await();
- }
- enqueue(node);
- c = count.getAndIncrement();
- // 队列还没有满时, 继续唤醒添加线程
- if (c + 1 <capacity)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- //c==0, 只有可能最开始就是一个空队列 (注意上面的 c 返回的是旧值) 此时因为刚好添加了一个元素,
- // 所以唤醒消费的线程去取移出元素
- if (c == 0)
- signalNotEmpty();
- }
这里有几个问题:
问题 1:
为什么添加完成后是继续唤醒在条件队列 notFull 上的添加线程而不是像 ArrayBlockingQueue 那样直接唤醒 notEmpty 条件对象上的消费线程?
分析 1: 先回想一下 ArrayBlockingQueue: 它内部只有一个锁, 在内部完成添加元素操作后直接唤醒消费线程去消费. 如果 ArrayBlockingQueue 在添加元素之后再唤醒添加线程的话, 消费的线程就可能一直被 block, 无法执行. 而为了避免这种情况, 对于 LinkedBlockingQueue 来说, 他有两个锁, 添加和删除元素不是互斥的, 添加的过程中可能已经删除好几个元素了, 所以他在设计上要尽可能的去唤醒两个条件队列. 添加线程在队列没有满时自己直接唤醒自己的其他添加线程, 如果没有等待的添加线程, 直接结束了. 如果有就直到队列元素已满才结束挂起. 注意消费线程的执行过程也是如此. 这也是为什么 LinkedBlockingQueue 的吞吐量要相对大些的原因.
问题 2: 为什么 if (c == 0)时才去唤醒消费线程呢
分析 2: 什么情况下 c 等于 0 呢? c 值是添加元素前队列的大小, 也就是说, 之前是空队列, 空队列时会有什么情况呢, 空队列会阻塞所有的 take 进程, 将其封装到 notEmpty 的条件队列中. 这个时候, c 之前是 0, 现在在执行了 enqueue 方法后, 队列中有元素了, 所以他需要立即唤醒阻塞的 take 进程, 否则阻塞的 take 进程就一直 block 在队列里, 一直沉睡下去. 为什么 c>0 时, 就不会唤醒呢? 因为 take 方法和 put 方法一样, take 方法每次 take 完元素后, 如果队列还有值, 它会继续唤醒 take 队列, 也就是说他只要没有被 await()阻塞, 他就会一直不断的唤醒 take 线程, 而不需要再添加的时候再去唤醒, 造成不必要的性能浪费
LinkedBlockingQueue 的阻塞移出
相对的, 我们再看看 take 方法:
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- // 获取当前队列大小
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();// 可中断
- try {
- // 如果队列没有数据, 当前 take 线程到条件队列中
- while (count.get() == 0) {
- notEmpty.await();
- }
- // 如果存在数据直接删除并返回该数据
- x = dequeue();
- c = count.getAndDecrement();// 队列大小减 1, 返回之前的值
- if (c> 1)
- notEmpty.signal();// 还有数据就唤醒后续的消费线程
- } finally {
- takeLock.unlock();
- }
- // 满足条件(之前队列是满的, 现在刚刚执行 dequeue 拿出了一个),
- // 唤醒条件对象上等待队列中的添加线程
- if (c == capacity)
- signalNotFull();
- return x;
- }
- private E dequeue() {
- Node<E> h = head;// 获取头结点
- Node<E> first = h.next; // 获取头结的下一个节点(要删除的节点)
- h.next = h; // help GC// 自己 next 指向自己, 即被删除
- head = first;// 更新头结点
- E x = first.item;// 获取删除节点的值
- first.item = null;// 清空数据, 因为 first 变成头结点是不能带数据的, 这样也就删除队列的带数据的第一个节点
- return x;
- }
take 方法是一个可阻塞可中断的移除方法, 主要做了两件事:
如果队列没有数据就挂起当前线程到 notEmpty 条件对象的等待队列中一直等待, 如果有数据就删除节点并返回数据项, 同时唤醒后续消费线程;
尝试唤醒条件对象 notFull 上等待队列中的添加线程: 假设之前队列中满员了, 那么新来的 put 进程将会被阻塞进 notFull 条件队列, 然后 await 挂起沉睡. 这个时候有线程通过 take 方法拿出了一个元素, 如果此时不唤醒 notFull 条件队列, 那么之前满员时队列中的线程就会一直睡死过去
总结
LinkedBlockingQueue 的两个队列:
notFull 条件队列(队列满时阻塞的 put 线程): await 的时机: 队列满了 signal 的时机: 一是 put 方法放入元素后, 如果队列还有空位, 会 singal 线程继续添加; 二是如果队列最开始满员, take 方法移出了一个元素后, 队列还有一个空位时也会唤醒它.
notEmpty 条件队列(队列空时候阻塞的 take 线程): await 的时机: 队列空了 signal 的时机: 一是 take 方法移出元素后, 如果队列还有空位, 会 singal 线程继续移出; 二是如果队列最开始空的, put 方法放入了一个元素后, 队列还有一个元素时也会唤醒它.
这种算法就是 "two lock queue" 的设计思想, 这也是 LinkedBlockingQueue 的吞吐量较高的本质原因
ArrayBlockingQueue 和 LinkedBlockingQueue 的比较总结
通过上述的分析, 对于 LinkedBlockingQueue 和 ArrayBlockingQueue 的基本使用以及内部实现原理我们已较为熟悉了, 这里我们就对它们两间的区别来个小结
1. 队列大小和构造方法有所不同, ArrayBlockingQueue 是有界的初始化必须指定大小, 而 LinkedBlockingQueue 可以是有界的也可以是无界的(Integer.MAX_VALUE), 对于后者而言, 当添加速度大于移除速度时, 在无界的情况下, 可能会造成内存溢出等问题, 有坑.
2. 数据存储容器不同, ArrayBlockingQueue 采用的是数组作为数据存储容器, 而 LinkedBlockingQueue 采用的则是以 Node 节点作为连接对象的单向链表.
3. 从 GC 的角度分析: 由于 ArrayBlockingQueue 采用的是数组的存储容器, 因此在插入或删除元素时不会产生或销毁任何额外的对象实例, 而 LinkedBlockingQueue 则会生成一个额外的 Node 对象. 这可能在长时间内需要高效并发地处理大批量数据的时, 对于 GC 可能存在较大影响.
4. 两者的实现队列添加或移除的锁不一样, ArrayBlockingQueue 实现的队列中的锁是没有分离的, 即添加操作和移除操作采用的同一个 ReenterLock 锁, 而 LinkedBlockingQueue 实现的队列中的锁是分离的, 其添加采用的是 putLock, 移除采用的则是 takeLock, 这样能大大提高队列的吞吐量, 也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据, 以此来提高整个队列的并发性能.
来源: https://juejin.im/post/5c0fcb35e51d453b304796c7