LinkedBlockingQueue 的实现是使用独占锁实现的阻塞队列. 首先看一下 LinkedBlockingQueue 的类图结构, 如下图所示:
如类图所示: LinkedBlockingQueue 是使用单向链表实现, 有两个 Node 分别来存放首尾节点, 并且里面有个初始值为 0 的原子变量 count, 它用来记录队列元素个数.
另外里面有两个 ReentrantLock 的实例, 分别用来控制元素入队和出队的原子性, 其中 takeLock 用来控制同时只有一个线程可以从队列获取元素, 其他线程必须等待,
putLock 控制同时只能有一个线程可以获取锁去添加元素, 其他线程必须等待. 另外 notEmpty 和 notFull 是信号量, 内部分别有一个条件队列用来存放进队和出队的时候被阻塞的线程,
说白了, 这其实就是一个生产者 - 消费者模型.
我们首先看一下独占锁的源码, 如下所示:
- /** 执行 take, poll 等操作时候需要获取该锁 */
- private final ReentrantLock takeLock = new ReentrantLock();
- /** 当队列为空时候执行出队操作 (比如 take) 的线程会被放入这个条件队列进行等待 */
- private final Condition notEmpty = takeLock.newCondition();
- /** 执行 put, offer 等操作时候需要获取该锁 */
- private final ReentrantLock putLock = new ReentrantLock();
- /** 当队列满时候执行进队操作 (比如 put) 的线程会被放入这个条件队列进行等待 */
- private final Condition notFull = putLock.newCondition();
- /** 当前队列元素个数 */
- private final AtomicInteger count = new AtomicInteger(0);
接着我们要进入 LinkedBlockingQueue 无参构造函数, 源码如下:
- public static final int MAX_VALUE = 0x7fffffff;
- public LinkedBlockingQueue() {
- this(Integer.MAX_VALUE);
- }
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- // 初始化首尾节点, 指向哨兵节点
- last = head = new Node<E>(null);
- }
从源码中可以看到, 默认队列的容量为 0x7fffffff; 用户也可以自己指定容量, 所以一定程度上 LinkedBlockingQueue 可以说是有界阻塞队列.
接下来我们主要看 LinkedBlockingQueue 的几个主要方法的源码, 如下:
1.offer 操作, 向队列尾部插入一个元素, 如果队列有空闲容量则插入成功后返回 true, 如果队列已满则丢弃当前元素然后返回 false, 如果 e 元素为 null, 则抛出空指针异常(NullPointerException ), 还有一点就是, 该方法是非阻塞的. 源码如下:
- public boolean offer(E e) {
- //(1)空元素抛空指针异常
- if (e == null) throw new NullPointerException();
- //(2) 如果当前队列满了则丢弃将要放入的元素, 然后返回 false
- final AtomicInteger count = this.count;
- if (count.get() == capacity)
- return false;
- //(3) 构造新节点, 获取 putLock 独占锁
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- //(4)如果队列不满则进队列, 并递增元素计数
- if (count.get() <capacity) {
- enqueue(node);
- c = count.getAndIncrement();
- //(5)
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- //(6)释放锁
- putLock.unlock();
- }
- //(7)
- if (c == 0)
- signalNotEmpty();
- //(8)
- return c>= 0;
- }
- private void enqueue(Node<E> node) {
- last = last.next = node;
- }
代码 (2) 判断的是如果当前队列已满则丢弃当前元素并返回 false.
代码 (3) 获取到 putLock 锁, 当前线程获取到该锁后, 则其他调用 put 和 offer 的线程将会被阻塞(阻塞的线程被放到 putLock 锁的 AQS 阻塞队列).
代码 (4) 这里又重新判断了一下当前队列是否满了, 这是因为在执行代码 (2) 和获取到 putLock 锁期间, 有可能其他线程通过 put 或者 offer 方法想队列里面添加了新的元素. 重新判断队列确实不满则新元素入队, 并递增计数器.
代码 (5) 判断的是如果新元素入队后还有空闲空间, 则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作 (比如执行 put 方法而队列满了的时候) 而被阻塞的一个线程, 因为队列现在有空闲, 所以这里可以提前唤醒一个入队线程.
代码 (6) 则释放获取的 putLock 锁, 这里要注意锁的释放一定要在 finally 里面做, 因为即使 try 块抛出异常了, finally 也是会被执行到的. 另外释放锁后其他因为调用 put 和 offer 而被阻塞的线程将会有一个获取到改锁.
代码 (7)c == 0 说明在执行代码(6) 释放锁的时候队列里面至少有一个元素, 队列里面有元素则执行 signalNotEmpty,signalNotEmpty 的源码如下:
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
通过上面代码可以看到其作用是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法 (比如调用 take 方法并且队列为空的时候) 而被阻塞的一个线程, 这里也说明了调用条件变量的方法前, 要首先获取对应的锁.
offer 的总结: offer 方法中通过使用 putLock 锁保证了在队尾新增元素的原子性和队列元素个数的比较和递增操作的原子性.
2.put 操作, 向队列尾部插入一个元素, 如果队列有空闲则插入后直接返回 true, 如果队列已经满则阻塞当前线程知道队列有空闲插入成功后返回 true, 如果在阻塞的时候被其他线程设置了中断标志,
则被阻塞线程会抛出 InterruptedException 异常而返回, 另外如果 e 元素为 null 则抛出 NullPointerException 异常. 源码如下:
- public void put(E e) throws InterruptedException {
- //(1)空元素抛空指针异常
- if (e == null) throw new NullPointerException();
- //(2) 构建新节点, 并获取独占锁 putLock
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- //(3)如果队列满则等待
- while (count.get() == capacity) {
- notFull.await();
- }
- //(4)进队列并递增计数
- enqueue(node);
- c = count.getAndIncrement();
- //(5)
- if (c + 1 <capacity)
- notFull.signal();
- } finally {
- //(6)
- putLock.unlock();
- }
- //(7)
- if (c == 0)
- signalNotEmpty();
- }
代码 (2) 中使用 putLock.lockInterruptibly() 获取独占锁, 相比 offer 方法中这个获取独占锁方法意味着可以被中断, 具体说是当前线程在获取锁的过程中, 如果被其它线程设置了中断标志则当前线程会抛出 InterruptedException 异常,
所以 put 操作在获取 锁过程中是可被中断的.
代码 (3) 如果当前队列已经满, 则 notFull 的 await() 把当前线程放入 notFull 的条件队列, 当前线程被阻塞挂起并释放获取到的 putLock 锁, 由于 putLock 锁被释放了, 所以现在其他线程就有机会获取到 putLock 锁了.
代码 (3) 判断队列是否为空为何使用 while 循环而不是 if 语句呢?
这是因为考虑到当前线程被虚假唤醒的问题, 也就是其它线程没有调用 notFull 的 singal 方法时候, notFull.await() 在某种情况下会自动返回.
如果使用 if 语句简单判断一下, 那么虚假唤醒后会执行代码(4), 元素入队, 并且递增计数器, 而这时候队列已经是满了的, 导致队列元素个数大于了队列设置的容量, 导致程序出错.
而使用使用 while 循环假如 notFull.await() 被虚假唤醒了, 那么循环在检查一下当前队列是否是满的, 如果是则再次进行等待.
3.poll 操作, 从队列头部获取并移除一个元素, 如果队列为空则返回 null, 该方法是不阻塞的. 源码如下:
- public E poll() {
- //(1)队列为空则返回 null
- final AtomicInteger count = this.count;
- if (count.get() == 0)
- return null;
- //(2)获取独占锁
- E x = null;
- int c = -1;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- //(3)队列不空则出队并递减计数
- if (count.get()> 0) {//3.1
- x = dequeue();//3.2
- c = count.getAndDecrement();//3.3
- //(4)
- if (c> 1)
- notEmpty.signal();
- }
- } finally {
- //(5)
- takeLock.unlock();
- }
- //(6)
- if (c == capacity)
- signalNotFull();
- //(7)返回
- return x;
- }
- private E dequeue() {
- Node<E> h = head;
- Node<E> first = h.next;
- h.next = h; // help GC
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
代码(1) 如果当前队列为空, 则直接返回 null.
代码 (2) 获取独占锁 takeLock, 当前线程获取该锁后, 其它线程在调用 poll 或者 take 方法会被阻塞挂起.
代码 (3) 如果当前队列不为空则进行出队操作, 然后递减计数器.
代码 (4) 如果 c>1 则说明当前线程移除掉队列里面的一个元素后队列不为空(c 是删除元素前队列元素个数), 那么这时候就可以激活因为调用 poll 或者 take 方法而被阻塞到 notEmpty 的条件队列里面的一个线程.
代码 (5) 释放锁, 一定要在 finally 里面释放锁.
代码 (6) 说明当前线程移除队头元素前当前队列是满的, 移除队头元素后队列当前至少有一个空闲位置, 那么这时候就可以调用 signalNotFull 激活因为调用 put 或者 offer 而被阻塞放到 notFull 的条件队列里的一个线程, signalNotFull 源码如下:
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
poll 代码逻辑比较简单, 值得注意的是获取元素时候只操作了队列的头节点.
4.peek 操作, 获取队列头部元素但是不从队列里面移除, 如果队列为空则返回 null, 该方法是不阻塞的. 源码如下:
- public E peek() {
- //(1)
- if (count.get() == 0)
- return null;
- //(2)
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- Node<E> first = head.next;
- //(3)
- if (first == null)
- return null;
- else
- //(4)
- return first.item;
- } finally {
- //(5)
- takeLock.unlock();
- }
- }
可以看到代码 (3) 这里还是需要判断下 first 是否为 null 的, 不能直接执行代码(4).
正常情况下执行到代码 (2) 说明队列不为空, 但是代码 (1) 和(2)不是原子性操作, 也就是在执行代码 (1) 判断队列不为空后,
在代码 (2) 获取到锁前, 有可能其他线程执行了 poll 或者 take 操作导致队列变为了空, 然后当前线程获取锁后, 直接执行 first.item 会抛出空指针异常.
5.take 操作, 获取当前队列头部元素并从队列里面移除, 如果队列为空则阻塞调用线程. 如果队列为空则阻塞当前线程知道队列不为空, 然后返回元素, 如果在阻塞的时候被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回. 源码如下:
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- //(1)获取锁
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- //(2)当前队列为空则阻塞挂起
- while (count.get() == 0) {
- notEmpty.await();
- }
- //(3)出队并递减计数
- x = dequeue();
- c = count.getAndDecrement();
- //(4)
- if (c> 1)
- notEmpty.signal();
- } finally {
- //(5)
- takeLock.unlock();
- }
- //(6)
- if (c == capacity)
- signalNotFull();
- //(7)
- return x;
- }
代码 (1) 当前线程获取到独占锁, 其他调用 take 或者 poll 的线程将会被阻塞挂起.
代码 (2) 如果队列为空则阻塞挂起当前线程, 并把当前线程放入 notEmpty 的条件队列.
代码 (3) 进行出队操作并递减计数.
代码 (4) 如果 c> 1 说明当前队列不为空, 则唤醒 notEmpty 的条件队列的条件队列里面的一个因为调用 take 或者 poll 而被阻塞的线程.
代码 (5) 释放锁.
代码 (6) 如果 c == capacity 则说明当前队列至少有一个空闲位置, 则激活条件变量 notFull 的条件队列里面的一个因为调用 put 或者 offer 而被阻塞的线程.
6.remove 操作, 删除队列里面指定元素, 有则删除返回 true, 没有则返回 false, 源码如下:
- public boolean remove(Object o) {
- if (o == null) return false;
- //(1)双重加锁
- fullyLock();
- try {
- //(2)遍历队列找则删除返回 true
- for (Node<E> trail = head, p = trail.next;
- p != null;
- trail = p, p = p.next) {
- //(3)
- if (o.equals(p.item)) {
- unlink(p, trail);
- return true;
- }
- }
- //(4)找不到返回 false
- return false;
- } finally {
- //(5)解锁
- fullyUnlock();
- }
- }
代码 (1) 通过 fullyLock 获取双重锁, 当前线程获取后, 其他线程进行入队或者出队的操作就会被阻塞挂起. 双重锁方法 fullyLock 的源码如下:
- void fullyLock() {
- putLock.lock();
- takeLock.lock();
- }
代码 (2) 遍历队列寻找要删除的元素, 找不到则直接返回 false, 找到则执行 unlink 操作, unlink 的源码如下:
- void unlink(Node<E> p, Node<E> trail) {
- p.item = null;
- trail.next = p.next;
- if (last == p)
- last = trail;
- 如果当前队列满, 删除后, 也不忘记唤醒等待的线程
- if (count.getAndDecrement() == capacity)
- notFull.signal();
- }
可以看到删除元素后, 如果发现当前队列有空闲空间, 则唤醒 notFull 的条件队列中一个因为调 用 put 或者 offer 方法而被阻塞的线程.
代码 (5) 调用 fullyUnlock 方法使用与加锁顺序相反的顺序释放双重锁, 源码如下:
- void fullyUnlock() {
- takeLock.unlock();
- putLock.unlock();
- }
7.size 操作, 获取当前队列元素个数. 源码如下:
- public int size() {
- return count.get();
- }
总结: 由于在操作出队入队的时候操作 Count 的时候加了锁, 因此相比 ConcurrentLinkedQueue 的 size 方法比较准确.
最后用一张图来加深 LinkedBlockingQueue 的理解, 如下图:
因此我们要思考一个问题: 为何 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不适用一个原子变量呢?
这是因为使用原子变量保存队列元素个数需要保证入队出队操作和操作原子变量是原子操作, 而 ConcurrentLinkedQueue 是使用 CAS 无锁算法的, 所以无法做到这个.
来源: https://www.cnblogs.com/huangjuncong/p/9218194.html