前言
在之前的文章中,已经对 ArrayBlockingQueue , LinkedBlockingQueue 这两个比较常用的阻塞队列做了源码分析,我们知道其内部都是通过 ReentrantLock 来保证数据读写的线程安全,通过 Condition 来完成线程等待和唤醒,只不过 ArrayBlockingQueue 在读写时使用了一把锁所完成,而 LinkedBlockingQueue 对于读和写分别使用了两把锁来进行处理,从而达到读写分离的效果.
然而,通过锁机制来实现一个线程安全的队列,在并发不是特别高的情况下并不是非常合适,因为在大多数情况下都只有几个线程同时访问,而每次执行都需要去加一次锁,从而导致线程进行上下文切换,影响整体性能.因此,JDK 还为我们提供了一个无锁线程安全的队列--ConcurrentLinkedQueue,其底层使用 CAS 来实现无阻塞的并发控制.本文将该队列的实现机制和源码做一个分析,让我们共同看看 Doug Lea 大神是如何巧妙地通过无锁机制来实现一个线程安全的队列.
1.ConcurrentLinkedQueue 介绍
首先让我们看看 JDK 文档对该类的描述:
ConcurrentLinkedQueue 的 API 描述. png
ConcurrentLinkedQueue 是一个基于链表,无界,线程安全的队列.这个队列将元素按照先进先出的顺序进行存储.队列的头节点是在队列中存在时间最久的节点,队列的尾节点是在队列中存在时间最短的节点.新的元素会被插入到队列的尾部,而队列的元素的获取操作则会从队列的头部去获取元素.ConcurrentLinekedQueue 适合作为多个线程共享访问的集合.与大多数并发集合的实现类似,该类也不允许添加 null 元素,该类的实现使用了一个高效的无锁算法,其算法的是基于 podc-1996.pdf 所改进的.
除了上面所描述的基本特性之外,ConcurrentLinkedQueue 中还有一些其他的特点:
队列中的最后一个元素的 next 属性总是为 null,并且最后一个节点可以通过 tail 节点以时间复杂度为 o(1) 的方式到达,也可以通过 head 以时间复杂度 o(n) 的方式到达.即队列中的最后一个元素,其总是可达的.
节点出队列,是通过执行 CAS 操作,将其 item 设置为 null.这里与我们传统想象的节点删除不太一样,它是通过将一个节点的 item 标记为 null,标记这个节点已经出队列,在下次操作的时候,如果一旦发现节点的 item 为 null 时,就将该节点的 next 设置为自己,从而真正完成节点从队列的移除,因此节点的删除是延迟处理的. (下面我们在分析节点出队列的过程时可以更加清楚地看到其实现细节)
如果队列中的节点其 item 不为 null, 它们从是可以从 head 开始,逐个查找到对应的节点.
ConcurrentLinkedQueue 的 size 方法不是一个常量级别的操作,每一次获取队列的大小,都需要整体地对队列做一次遍历操作,并且得到值也是一个非精确的值,因为在遍历的过程中,队列的结构可能也会被其他线程所改变.
2. ConcurrentLinkedQueue 内部结构分析
由于是基于链表的实现方式,与其他的并发队列类似,都会在内部定义一个节点类,ConcurrentLinedQueu 亦是如此,首先我们看一下节点的定义:
//该节点是一个静态内部类,因此其只能作用于该队列内部
private static class Node<E> {
/*
* 当前节点存储的元素,注意到这里使用volatile关键字来对节点进行
* 修饰,其目的是在并发读的时候保证内存的可见性
*/
volatile E item;
//当前节点的下一个节点
volatile Node<E> next;
/**
* 构建新的节点,这里没有使用volatile的方式来对节点的元素值进行设置,而是使用普通的写方式
* 因为对于一个新增的节点,只有在其被成功插入到队列尾部才对外可见,因此在这里没有对数据可见性的强制要求
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
/*
* 通过Unsafe来完成对当前节点元素的CAS操作
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/*
* 使用普通的方式来设置当前节点的下一个节点
*/
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
/*
* 通过Unsafe来完成对当前节点的下一个节点的CAS操作
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
//通过Unsafe来获取一个Node节点的item属性在内存中相对该对象的位置偏移量
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
//通过Unsafe来获取一个Node节点的next节点属性在内存中相对该对象的位置偏移量
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面节点的定义与之前分析的 SynchronousQueue 中的内部定义的节点非常类似,这里不再过多阐述.
队列的属性定义
为了提高快速查找队列中第一个节点和最后一个节点,因此 ConcurrentLinkedQueue 中分别定义了一个 head 节点和 tail 节点来快速定位.
/**
* 不变性:
* - 队列中所有未删除的节点都可以通过head节点的succ方法查找到
* - head节点一定不可能等于null
* - (tmp = head).next != tmp,即head的next不能指向自己.
*
* 可变性:
* - head的item可能为null,也可能不为null
* - tail节点可能会滞后于head节点,因此从head节点未必一定可以找到tail节点
*
*/
private transient volatile Node<E> head;
/**
* 不变性:
* - 节点中的最后一个元素总是可以通过tail的succ方法来获取
* - tail节点不等于null
*
* 可变性:
* - head的item可能为null,也可能不为null
* - tail 节点的next可能指向自己,也可能不指向自己
* - tail节点可能会滞后于head节点,因此从head节点未必一定可以找到tail节点
*/
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
ConcurrentLinkedQueue 源码分析
通过上面的描述我们知道了该队列是通过一个头节点和一个尾节点,然后将中间链接节点之间两两相连接构成一个队列,下面让我们分析一下 ConcurrentLinkedQueue 的内部的具体实现.(在这里需要说明一下, 由于其节点是完全地基于 Unsafe 来完成 CAS 的操作,如果你对该内容还不是很熟悉的话,可以参考我的 深入分析 Java 中的原子操作 这篇文章,里面对原子操作有一个比较细致的描述.
入队列源码分析
在分析源码之前,我们先通过几张图来说明一下 ConcurrentLinkedQueue 的入队列的整体过程.在了解完整体的过程后,再结合源码去分析细节会更加容易理解:
队列初始化
队列初始化状态
在队列刚创建时,head 和 tail 同时指向空节点,我们也称其为 dummy 节点.
dummy 节点的说明
添加元素 a
添加元素 a
向队列中添加元素 a,此时只是新节点追加到第一个节点的后面,但是 tail 节点并未发生改变.
添加元素 b
添加元素 b
向队列中添加元素 b,此时新节点与原先的 tail 节点之间的距离大于 1,因此 tail 节点在这个时候会更新,真正的指向了最后一个节点
添加元素 c
添加元素 c
由于新节点与 tail 节点的距离没有大于 1,因此此时 tail 节点同样不会发生更新.
添加元素 d
添加元素 d
通过上面的图示,我们可以看到 ConcurrentLinkedQueue 在入队列过程中非常明显的一个特点就是 tail 指针不是实时更新的,即 tail 节点可能会滞后于队列中真正的最后一个节点,只有当最后的一个节点与 tail 节点之前的距离大于 1 时才会更新,而这样设计的目的就是为了减少避免每增加一个节点,tail 节点都需要去执行一次 CAS 操作的情况发生.
public boolean offer(E e) {
// 由于元素不允许为null,因此对元素做一个检查
checkNotNull(e);
// 生成待插入的节点
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 当q等于null时,则p就是队列中的最后一个元素
if (q == null) {
// 执行cas操作,如果执行成功,则newNode成为队列的最后一个元素,但它未必是tail指向的元素
if (p.casNext(null, newNode)) {
// 通过判断p!=t,从而确定当前新节点与tail指向的节点之间的距离是否大于1
// 如果大于1,则需要更新tail指针
if (p != t)
casTail(t, newNode);
return true;
}
}
// 如果p==q,则意味着当前p节点已经被从队列中移除(如果单纯从入队列看是看不出来的,后面结合出队列再回头分析)
else if (p == q)
/* 判断在执行过程中tail是否发生变化,如果未发生变化,则tail也已经脱落队列
* 因为 p = t = tail,而p已经脱离队列,从而推断出tail也脱离了队列
* 那么此时只能从head开始,重新查找队列的最后一个元素
* 如果tail发生了变化,则直接从当前队列的tail开始查找队列的最后一个元素
*/
p = (t != (t = tail)) ? t : head;
else
/**
* 由于p节点的next不为null,并且p节点并未从队列中删除,因此需要继续查找队列的最后一个节点
* 判断执行过程中tail节点是否发生了变化
* 如果发生了变化,则让p执行当前的tail,否则就让p直接指向它的next节点q
*/
p = (p != t && t != (t = tail)) ? t : q;
}
}
可能会有不少朋友对上面 t != (t = tail) 的处理感到疑惑,疑惑的原因可能会觉得一个变量自己和自己比较,那不是一定为 true 嘛,怎么还会出现等于 false 的可能呢.为了理解这个问题,我们一起看一下下面的这段代码:
public class VarCompareTest {
static volatile int b = 2;
public static void main(String[] args) {
int a = b;
int c = a != (a = b) ? 5 : 4;
System.out.println(a);
System.out.println(c);
}
}
这段代码的也用到了上面类似的 t != (t = tail),但是在编译完成后,我们通过 IDEA 本身的反编译工具来查看一下对应的 Class 文件的结果 (目前由于对字节码指令不熟悉,因此不从那个角度去解读):
反编译 Class 文件后的结果
通过上面的代码,我们可以看到,一开始 a 的值就等于 b 的值,其值为 2;紧接着 a 的值首先通过一个局部变量记录,然后再将 b 的值赋值给 a,由于 b 可能存在多线程修改的可能,此时 b 的值可能被其他线程改成了 3,因此 a 的值也会变为 3,最后再拿 2 与 3 进行比较,即 var10000 != b 进行比较,就会存在等于 false 的情况发生了.对于 t != (t = tail) 的情况也是如此,相信通过这个例子说明大家应该明白其中的原理了!
出队列源码分析
在分析出队列源码之前,我们也结合上面的图来看一下出队列的整体过程:
队列当前状态
这里假设队列是基于上面入队列之后的状态进行的.
移除第 1 个节点
移除第一个节点
原本指向 head 的节点,此时的 next 指向了自己,因此它从队列中真正的移除.存储 a 的节点,其 item 置为了 null,并且 head 节点发生变更,真正指向了队列中第一个有效 (真正存储数据) 的节点.
移除第 2 个节点
移除第 2 个节点
此时只是仅仅将节点的 item 设置为了 null,但是 head 节点没有发生变更,这样做的目的也是为了减少一次 CAS 操作,它会等到下一次才去变更.
移除第 3 个节点
移除第 3 个节点
移除第 4 个节点
移除第 4 个节点
看完上图的分析,相信大家对 ConcurrentLinkedQueue 出队列的操作应该有一个直观的理解了,下面我们看下源码的具体实现:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果元素的item不为null,则说明p节点是当前队列中的第一个未被删除的节点
// 此时也说明head指向的节点确实是队列中的第一个元素
// 通过CAS操作,将item设置为null,来标记其已经被删除
if (item != null && p.casItem(item, null)) {
// 判断p节点与head指向的节点是否是同一个,如果不是则需要将head节点向前移动
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 节点的next为null,则说明队列为空,只有一个dummy节点
else if ((q = p.next) == null) {
// 尝试将dummy节点p设置为新的head
updateHead(h, p);
return null;
}
// 如果p节点被删除,只能从队列的head从头开始再次查找
else if (p == q)
// 跳回到最外层的循环,重新执行一次Node<E> h = head, p = h, q;操作
continue restartFromHead;
else
// 由于head指向的节点其item为null,即head指向的节点不是一个有效节点,因此继续通过head的next继续查找
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
// 判断新设置的节点与原来的head节点是否是同一个,如果不是则将新节点设置为新的head节点,
// 并且将原来的head节点的next指向自己.
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
tail 节点滞后于 head 节点的场景分析
一般来说,head 节点都在队列的左边,而 tail 节点在队列的右边.然而,在 ConcurrentLinkedQueue 中,可能存在 tail 节点在左边,而 head 节点却跑到了右边的情况,这种场景我们将其称为
tail lag behind head
.下面我们分析一下在什么样的场景下会发生这样的情况,这对于真正理解 ConcurrentLinkedQueue 非常重要!
添加一个元素
从上图可以看到,在添加完一个元素后,tail 节点并有发生改变.此时,假设我们去获取队列中的元素,队列的结构就变成如下的样子.
取出队列元素
从上面的结构我们可以看到,此时 tail 节点滞后于 head 节点,并且我们此时通过 head 节点也无法查找到 tail 节点,因为该节点已经从队列中移除.当下一次添加元素的时候,就会出现 tail 节点自己指向自己的情况,此时就需要重新获取到 head,将新增的元素追加到 head 后面.
总结
至此,ConcurrentLinkedQueue 的实现我们已经分析完成了.该类的核心设计就在于 CAS 的无阻塞以及 head/tail 节点的延迟更新.尽量我们在实际的开发中基本不会去实现一个如此复杂的队列,但是通过分析一个经典无阻塞队列,可以更加好地帮助我们理解并发编程.如果存在分析不对的地方,还望大神指出.
来源: http://www.jianshu.com/p/32d6526494fd