上一篇我们看了一下这个队列 ConcurrentLinkedQueue, 那就是一个无界非阻塞链表, 我们这次来看看 LinkedBlockingQueue, 这个队列看名字就知道是一个阻塞式队列 (也就是一个单向链表), 基于独占锁实现的, 比较简单;
一. LinkedBlockingQueue 基本结构
内部也是有一个 Node 类, 下图所示, item 存 实际数据, next 指向下一个节点, 一个有参构造器, 没啥好说的;
我们可以看看这个队列有的一些属性, 其实大概能猜出来就是生产者消费者模型:
- // 队列实际容量
- private final int capacity;
- // 这个原子变量记录节点数量
- private final AtomicInteger count = new AtomicInteger();
- // 头节点
- transient Node<E> head;
- // 尾节点
- private transient Node<E> last;
- // 这个锁用于控制多个线程从队列头部获取元素
- private final ReentrantLock takeLock = new ReentrantLock();
- // 当队列为空, 执行出队操作的线程就放到这条件变量里来
- private final Condition notEmpty = takeLock.newCondition();
- // 用于控制多个线程往队列尾部添加元素
- private final ReentrantLock putLock = new ReentrantLock();
- // 如果队列满了, 执行入队操作的线程就丢到这里面来
- private final Condition notFull = putLock.newCondition();
构造器中可以看到, 默认最大数量就是 65536 个, 虽然说也可以指定大小, 我们一定程度上可以说这是一个有界阻塞队列;
- // 默认队列最大的数量就是 65536 个
- public LinkedBlockingQueue() {
- this(Integer.MAX_VALUE);
- }
- // 也可以指定队列大小, 默认头节点和尾节点都是指向哨兵节点
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node<E>(null);
- }
- // 也可以传一个实现 Collection 接口的类, 比如 List, 然后遍历将其中的元素封装成节点丢到队列中去
- // 注意这里获取锁和释放锁
- public LinkedBlockingQueue(Collection<? extends E> c) {
- this(Integer.MAX_VALUE);
- final ReentrantLock putLock = this.putLock;
- putLock.lock(); // Never contended, but necessary for visibility
- try {
- int n = 0;
- for (E e : c) {
- if (e == null)
- throw new NullPointerException();
- if (n == capacity)
- throw new IllegalStateException("Queue full");
- enqueue(new Node<E>(e));
- ++n;
- }
- count.set(n);
- } finally {
- putLock.unlock();
- }
- }
我们简单的介绍了这个队列的基本结构, 现在我们可以看看一些重要的方法;
二. offer 方法
这个方法向队列最后添加一个元素, 插入成功返回 true, 如果队列满了, 就抛弃当前元素返回 false;
- public boolean offer(E e) {
- // 如果为 null, 就直接抛错
- if (e == null) throw new NullPointerException();
- //count 表示队列中实际数量
- final AtomicInteger count = this.count;
- // 如果实际数量和队列最大容量相同, 那么就不能再添加了, 返回 false
- if (count.get() == capacity)
- return false;
- int c = -1;
- // 将元素封装成 Node 节点
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- // 获取锁
- putLock.lock();
- try {
- // 队列没有满, 就把新的节点丢进去, 递增计数器, 为什么这里要进行判断呢? 上面不是进行判断了么?
- // 还是由于并发, 如果在上面那里判断容量之后, 当前线程还没有获取锁, 此时一个其他线程先获取锁然后执行 offer 方法然后释放锁, 那么
- // 这里需要再判断一次
- if (count.get() <capacity) {
- enqueue(node);
- // 注意 getAndIncrement 和 incrementAndGet 方法的区别, 前者是返回自增之前的值, 后者是返回自增之后的值
- c = count.getAndIncrement();
- // 这里判断如果队列还没有满, 就唤醒之前 notFully 条件队列中的线程, 前面说过了 notfull 中存放的是什么线程
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- // 释放锁
- putLock.unlock();
- }
- // 如果 c==0, 表示队列中在添加节点之前就已经有一个节点了, 就唤醒条件变量 notEmpty 中的线程, 这些线程就会从队列中去取数据
- if (c == 0)
- signalNotEmpty();
- return c>= 0;
- }
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
三. put 方法
这个方法在队列尾部插入一个元素, 如果队列有空闲则插入后直接返回, 否则就阻塞当前线程直到队列空闲再插入; 而且当前线程在阻塞的时候被其他线程调用了中断方法, 就会抛异常;
- public void put(E e) throws InterruptedException {
- // 如果插入的元素为 null, 直接抛异常
- if (e == null) throw new NullPointerException();
- int c = -1;
- // 封装成一个节点
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- // 原子变量记录队列中节点数量
- final AtomicInteger count = this.count;
- // 这个方法后面带有 Interruptibly, 说明当前线程获取锁后是可中断的
- putLock.lockInterruptibly();
- try {
- // 节点数量到达了最大容量, 就将当前线程放到条件变量 notFull 的队列中
- while (count.get() == capacity) {
- notFull.await();
- }
- // 节点数量没有到最大, 就在链表最后添加节点
- enqueue(node);
- // 计数器加一, 注意如果 count 等于 4, 那么 c 还是等于 4, 这个方法是原子自增, 返回原来的值, 注意和 incrementAndGet 方法的区别
- c = count.getAndIncrement();
- // 这里如果 c+1<capacity 的话, 说明队列还没有满, 就唤醒 notFull 中的线程可以往队列中添加元素
- if (c + 1 <capacity)
- notFull.signal();
- } finally {
- // 释放锁
- putLock.unlock();
- }
- // 如果 c==0 表示队列中添加节点之前就已经有了一个节点, 唤醒 notEmpty 中的线程去队列中取数据
- if (c == 0)
- signalNotEmpty();
- }
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
四. poll 方法
这个方法是从头部移除一个元素, 如果队列为空就返回 null, 这个方法不阻塞;
- public E poll() {
- // 记录队列中节点数量
- final AtomicInteger count = this.count;
- // 如果队列为空就返回 null
- if (count.get() == 0)
- return null;
- E x = null;
- int c = -1;
- final ReentrantLock takeLock = this.takeLock;
- // 获取锁
- takeLock.lock();
- try {
- // 这里再判断一次也是为了防止在获取锁之前其他线程调用了 poll 方法取了节点
- // 如果队列不为空, 计数器减一
- if (count.get()> 0) {
- // 删除第一个有数据的节点 (由于第一个节点是哨兵节点, 所以相当于删除的是第二个节点), 方法实现在下面
- x = dequeue();
- c = count.getAndDecrement();
- // 如果 c>1 说明移除头节点之后, 队列不为空, 就唤醒 notEmpty 中条件队列中的线程去队列中取数据
- if (c> 1)
- notEmpty.signal();
- }
- } finally {
- // 释放锁
- takeLock.unlock();
- }
- // 这里这个判断, 注意一下在原子类 AtomicInteger 中两个方法, 比如初始值为 5, 那么调用 decrementAndGet 方法返回的事 4,
- // 而调用 getAndDecrement 方法返回的是 5, 我们这里额 c 调用的是后者, 所以表示删除节点之前队列的数量
- // 所以这里的意思就是: 如果删除队列之前的数量等于队列最大容量, 那么删除之后队列肯定有空位置, 于是就唤醒 notFull 条件队列中的线程
- // 往队列中添加新的节点
- if (c == capacity)
- signalNotFull();
- return x;
- }
- // 这个方法很简单稍微提一下, 就是将第一个哨兵节点自己引用自己, 更好的被 gc 收集
- // 将 head 指向第二个节点, 取出该节点的值, 然后将该节点内的 item 置为 null, 此节点就变成了一个哨兵节点
- private E dequeue() {
- // 删除头节点
- Node<E> h = head;
- Node<E> first = h.next;
- h.next = h; // help GC
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
五. peek 方法
这个方法获取队列头部元素但是不移除该节点;
- public E peek() {
- // 队列为空, 返回 null
- if (count.get() == 0)
- return null;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- // 如果头节点的下一个节点是空, 就返回 null, 因为此时头节点指的是哨兵节点啊
- Node<E> first = head.next;
- if (first == null)
- return null;
- else
- // 头节点的下一个节点不为空, 那么下一个节点肯定有数据, 拿过来就行了
- return first.item;
- } finally {
- takeLock.unlock();
- }
- }
六. take 方法
当前方法跟 peek 方法基本一样, 只不过这个方法是阻塞的: 从队列头删除一个节点, 如果队列为空则阻塞当前线程直到队列不为空再执行操作, 如果在阻塞的时候其他线程修改了中断标志, 那么该线程就抛错;
- // 这个方法其实和 poll 方法基本一样, 没什么好说的, 注意可以抛异常
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- // 注意这里获取锁的方式
- takeLock.lockInterruptibly();
- try {
- while (count.get() == 0) {
- notEmpty.await();
- }
- x = dequeue();
- c = count.getAndDecrement();
- if (c> 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
七. remove 方法
删除队列中某个指定的元素, 删除成功返回 true, 失败返回 false;
- public boolean remove(Object o) {
- // 传入的是 null, 直接返回 false
- if (o == null) return false;
- // 获取两个锁, 方法实现如下, 在删除节点的时候, 不能进行入队和出队操作, 那些线程会被阻塞丢到 AQS 队列里
- fullyLock();
- try {
- // 遍历找到对应的节点, 删除掉, 没找到返回 false
- for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
- if (o.equals(p.item)) {
- // 删除节点的具体方法, 实现在下面
- unlink(p, trail);
- return true;
- }
- }
- return false;
- } finally {
- // 释放两个锁, 注意释放锁的顺序要和获取锁的顺序相反哟
- fullyUnlock();
- }
- }
- void fullyLock() {
- putLock.lock();
- takeLock.lock();
- }
- void fullyUnlock() {
- takeLock.unlock();
- putLock.unlock();
- }
- // 删除其实很容易, 就是跟普通链表的删除一样, 就是把当前要删除的节点前面的节点指向后面的节点
- void unlink(Node<E> p, Node<E> trail) {
- p.item = null;
- trail.next = p.next;
- if (last == p)
- last = trail;
- // 还是注意 getAndDecrement 方法返回的是减一之前的值, 如果减一之后队列不是满的, 就唤醒 notFull 中条件队列中的线程往队列中添加节点
- if (count.getAndDecrement() == capacity)
- notFull.signal();
- }
八. 总结
我们简单的看了看 LinkedBlockingQueue 中一些比较重要的方法, 比上一篇的 ConcurrentLinkedQueue 容易好多呀!
其中 ConcurrentLinkedQueue 是无界非阻塞队列, 底层是用单向链表实现, 入队和出队使用 CAS 实现; 而 LinkedBlockingQueue 是有界阻塞队列, 底层是用单向链表实现, 入队和出队分别用独占锁的方式去处理, 所以入队和出队是可以同时进行的, 而且还为两个独占锁配置了两个条件队列, 用于存放被阻塞的线层, 注意, 这里涉及到好几个队列, 一个是独占锁的 AQS 队列, 一个是条件队列, 一个是存放数据的队列, 不要弄混淆了啊!
用下面这个图强化记忆:
来源: https://www.cnblogs.com/wyq1995/p/12285317.html