本来想着直接说线程池的, 不过在说线程池之前, 我们必须要知道并发安全队列; 因为一般情况下线程池中的线程数量是一定的, 肯定不会超过某个阈值, 那么当任务太多了的时候, 我们必须把多余的任务保存到并发安全队列中, 当线程池中的线程空闲下来了, 就会到并发安全队列中拿任务;
那么什么是并发安全队列呢? 其实可以简单看作是一个链表, 然后我们先办法去存取节点; 总的来说, 并发安全队列分为两种, 一种是阻塞的, 一种是非阻塞的, 前者是用锁来实现的, 后者用 CAS 实现的;
一. 简单介绍 ConcurrentLinkedQueue
这个队列用法没什么好说的, 就类似 LinkedList 的用法, 怎么对一个链表继续增删改查, 不多说, 我们就说一下其中几个关键的方法;
首先, 这个队列是一个线程安全的无界非阻塞队列, 其实就是一个单向链表, 无界的意思就是没有限制最大长度, 非阻塞表示用 CAS 实现入队和出队操作, 我们打开这个类就可以知道, 有一个内部类 Node, 其中重要的属性如下所示:
- // 用于存放节点的值
- volatile E item;
- // 指向下一个节点
- volatile Node<E> next;
- // 这里也是用的是 UNSAFE 类, 前面说过了, 这个类直接提供 CAS 操作
- private static final sun.misc.Unsafe UNSAFE;
- //item 字段的偏移量
- private static final long itemOffset;
- //next 的偏移量
- private static final long nextOffset;
然后 ConcurrentLinkedQueue 中几个重要的属性, 好像也没什么重要的, 就保存了头节点和尾节点, 注意, 默认情况下头节点和尾节点都是哨兵节点, 也就是一个存 null 的 Node 节点
- // 存放链表的头节点
- private transient volatile Node<E> head;
- // 存放链表的尾节点
- private transient volatile Node<E> tail;
- //UNSAFE 对象
- private static final sun.misc.Unsafe UNSAFE;
- //head 字段的偏移量
- private static final long headOffset;
- //tail 字段偏移量
- private static final long tailOffset;
下面我们直接看一些重要方法吧! 慢慢分析其中的算法才是关键的
二. offer 方法
这个方法的作用就是在队列末端添加一个节点, 如果传递的参数是 null, 就抛出空指针异常, 否则由于该队列是无界队列, 该方法会一直返回 true, 而且该方法使用 CAS 算法实现的, 所以不会阻塞线程;
- // 队列末端添加一个节点
- public boolean offer(E e) {
- // 如果 e 为空, 那么抛出空指针异常
- checkNotNull(e);
- // 将传进来的元素封装成一个节点, Node 的构造器中调用 UNSAFE.putObject(this, itemOffset, item)把 e 赋值给节点中的 item
- final Node<E> newNode = new Node<E>(e);
- //[1]
- // 这里的 for 循环从最后的节点开始
- for (Node<E> t = tail, p = t;;) {
- Node<E> q = p.next;
- //[2]如果 q 为 null, 说明 p 就是最后的节点了
- if (q == null) {
- //[3]CAS 更新: 如果 p 节点的下一个节点是 null, 就把写个节点更新为 newNode
- if (p.casNext(null, newNode)) {
- //[4]CAS 成功, 但是这时 p==t, 所以不会进入到这里的 if 里面, 直接返回 true
- // 那么什么时候会走到这里面来呢? 其实是要有另外一个线程也在调用 offer 方法的时候, 会进入到这里面来
- if (p != t)
- casTail(t, newNode);
- return true;
- }
- }
- else if (p == q) //[5]
- p = (t != (t = tail)) ? t : head;
- else //[6]
- p = (p != t && t != (t = tail)) ? t : q;
- }
- }
上面执行到 [3] 的时候, 由于头节点和尾节点默认都是指向哨兵节点的, 由于这个时候 p 的下一个节点为 null, 所以当前线程 A 执行 CAS 会成功, 下图所示;
如果此时还有一个线程 B 也来尝试 [3] 中 CAS, 由于此时 p 节点的下一个节点不是 null 了, 于是线程 B 会跳到 [1] 出进行第二次循环, 然后会到 [6] 中, 由于 p 和 t 此时是相等的, 所以这里是 false, 即 p=q, 下图所示:
然后线程 B 又会跳到 [1] 处进行第三次循环, 由于执行了 Node<E> q = p.next, 所以此时 q 指向最后的 null, 就到了 [3] 处进行 CAS, 这次是可以成功的, 成功之后如下图所示:
这个时候因为 p!=t, 所以可以进入到[4], 这里又会进行一个 CAS: 如果 tail 和 t 指向的节点一样, 那么就将 tail 指向新添加的节点, 如图所示, 这个时候线程 B 也就执行完了;
其实还有 [5] 没有走到, 这个是在 poll 操作之后才执行的, 我们先跳过, 等说完 poll 方法之后再回头看看; 另外说一下, add 方法其实就是调用的是 offer 方法, 就不多说了;
三. poll 方法
这个方法是获取头部的这个节点, 如果队列为空则返回 null;
- public E poll() {
- // 这里其实就是一个 goto 的标记, 用于跳出 for 循环
- restartFromHead:
- //[1]
- for (;;) {
- for (Node<E> h = head, p = h, q;;) {
- E item = p.item;
- //[2]如果当前节点中存的值不为空, 则 CAS 设置为 null
- if (item != null && p.casItem(item, null)) {
- //[3]CAS 成功就更新头节点的位置
- if (p != h)
- updateHead(h, ((q = p.next) != null) ? q : p);
- return item;
- }
- //[4]当前队列为空, 就返回 null
- else if ((q = p.next) == null) {
- updateHead(h, p);
- return null;
- }
- //[5]当前节点和下一个节点一样, 说明节点自引用, 则重新找头节点
- else if (p == q)
- continue restartFromHead;
- //[6]
- else
- p = q;
- }
- }
- }
- final void updateHead(Node<E> h, Node<E> p) {
- if (h != p && casHead(h, p))
- h.lazySetNext(h);
- }
分为几种情况, 第一种情况是线程 A 调用 poll 方法的时候, 发现队列是空的, 即头节点和尾节点都指向哨兵节点, 就会直接到[4], 返回 null
第二种情况, 线程 A 执行到了 [4], 此时有一个线程却调用 offer 方法添加了一个节点, 下图所示, 那么此时线程 A 就不会走[4] 了,[5]也不满足, 于是会到 [6] 这里来, 然后线程 A 又会跳到 [1] 处进行循环, 此时 p 指向的节点中 item 不为 null, 所以会到 [2] 中;
到了 [2] 中将 p 指向的节点中 item 用 CAS 设置为 null, 然后就到了 [3], 下面第一个图, 由于 p!=h,q=null, 所以最后调用的是 updateHead(h,p), 这方法: 如果头节点和 h 指向的是一样的, 就将头节点指向 p, 我们还能看到 updateHead 方法中 h.lazySetNext(h) 表示 h 的下一个节点指向自己, 下面图二
到了这里还没完, 还记不记得 offer 方法中有一个地方的代码没有执行的啊! 就是这种情况, 尾节点自己引用自己, 我们再调用 offer 会怎么样呢?
回到 offer 方法, 先会到[1], 然后 q 指向自己这个哨兵节点(注意, 此时虽然 p 指向的节点中存的是 null, 但是 p!=null}, 于是再到[5], 此时的图如下左图所示; 此时由于 t==tail, 所以 p=head;
再在 offer 方法循环一次, 此时 q 指向 null, 下面左图所示, 然后就可以进入 [2] 中进行 CAS,CAS 成功, 因为此时 p!=t, 所以还要进行 CAS 将 tail 指向新节点, 下面右图所示, 可以让 GC 回收那个垃圾!
妈耶, 这里比较绕! 哈哈哈哈哈哈哈哈哈哈哈
四. peek 方法
这个方法的作用就是获取队列头部的元素, 只获取不移除, 注意这个方法和上面的 poll 方法的区别啊!
- public E peek() {
- //[1]goto 标志
- restartFromHead:
- for (;;) {
- for (Node<E> h = head, p = h, q;;) {
- //[2]
- E item = p.item;
- //[3]
- if (item != null || (q = p.next) == null) {
- updateHead(h, p);
- return item;
- }
- //[4]
- else if (p == q)
- continue restartFromHead;
- else//[5]
- p = q;
- }
- }
- }
- final void updateHead(Node<E> h, Node<E> p) {
- if (h != p && casHead(h, p))
- h.lazySetNext(h);
- }
如果队列中为空的时候, 走到 [3] 的时候, 就会如下图所示, 由于 h==p, 所以 updateHead 方法啥也不做, 然后返回就返回 item 为 null
如果队列不为空, 那么如下左图所示, 此时进入循环内不满足条件, 会到 [5] 这里, 将 p 指向 q, 然后再进行一次循环到[3], 将 q 指向 p 的后一个节点, 下面右图所示;
然后调用 updateHead 方法, 用 CAS 将头节点指向 p 这里, 然后将 h 自己指向自己, 下图所示, 最后返回 item
五. 总结
其实还有几个方法没说, 但是感觉比较容易就不浪费篇幅了, 有兴趣的可以看看: size 方法用于计算队列中节点的数量, 可是由于没有加锁, 在并发的条件下不准确; remove 方法删除某个节点, 其实就是遍历然后用 equals 方法比较 item 是不是一样, 只不过如果存在多个符合条件的节点只删除第一个, 然后返回 true, 否则返回 false;contains 方法判断队列中是否包含指定 item 的节点, 也就是遍历, 很容易;
最麻烦的就是 offer 方法和 poll 方法, offer 方法是在队列的最后面添加节点, 而 poll 是获取头节点, 并且删除第一个真正的队列节点(注意, 节点分为两种, 一种是哨兵节点, 一种是真正的存了数据的节点啊), 还简单的说了一下 poll 方法和 peek 方法的区别, 后者只是获取, 而不删除啊! 用下面这个图帮助记忆一下;
来源: https://www.cnblogs.com/wyq1995/p/12271664.html