此篇博客所有源码均来自 JDK 1.8
在没有 Lock 之前,我们使用 synchronized 来控制同步,配合 Object 的 wait()、notify() 系列方法可以实现等待 / 通知模式。在 Java SE5 后,Java 提供了 Lock 接口,相对于 Synchronized 而言,Lock 提供了条件 Condition,对线程的等待、唤醒操作更加详细和灵活。下图是 Condition 与 Object 的监视器方法的对比(摘自《Java 并发编程的艺术》):
Condition 提供了一系列的方法来对阻塞和唤醒线程:
Condition 是一种广义上的条件队列。他为线程提供了一种更为灵活的等待 / 通知模式,线程在调用 await 方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition 必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个 Condition 的实例必须与一个 Lock 绑定,因此 Condition 一般都是作为 Lock 的内部实现。
获取一个 Condition 必须要通过 Lock 的 newCondition() 方法。该方法定义在接口 Lock 下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。Condition 为一个接口,其下仅有一个实现类 ConditionObject,由于 Condition 的操作需要获取相关的锁,而 AQS 则是同步锁的实现基础,所以 ConditionObject 则定义为 AQS 的内部类。定义如下:
- public class ConditionObject implements Condition,java.io.Serializable{}
每个 Condition 对象都包含着一个 FIFO 队列,该队列是 Condition 对象通知 / 等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该 Condition 对象上等待的线程。我们看 Condition 的定义就明白了:
- public class ConditionObject implements Condition,java.io.Serializable{
- private static final longserialVersionUID =1173984872572414699L;//头节点
- private transientNode firstWaiter;//尾节点
- private transientNode lastWaiter;public ConditionObject() {
- }/** 省略方法 **/}
从上面代码可以看出 Condition 拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用 await() 方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。结构如下:
Node 里面包含了当前线程的引用。Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node 静态内部类)。
Condition 的队列结构比 CLH 同步队列的结构简单些,新增过程较为简单只需要将原尾节点的 nextWaiter 指向新增节点,然后更新 lastWaiter 即可。
调用 Condition 的 await() 方法会使当前线程进入等待状态,同时会加入到 Condition 等待队列同时释放锁。当从 await() 方法返回时,当前线程一定是获取了 Condition 相关连的锁。
- public final void await()throwsInterruptedException {// 当前线程中断
- if(Thread.interrupted())throw newInterruptedException();//当前线程加入等待队列Node node = addConditionWaiter();//释放锁
- longsavedState = fullyRelease(node);intinterruptMode =0;/**
- * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
- * 直到检测到此节点在同步队列上
- */
- while(!isOnSyncQueue(node)) {//线程挂起LockSupport.park(this);//如果已经中断了,则退出
- if((interruptMode = checkInterruptWhileWaiting(node)) !=0)break;
- }//竞争同步状态
- if(acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;//清理下条件队列中的不是在等待条件的节点
- if(node.nextWaiter !=null)// clean up if cancelledunlinkCancelledWaiters();if(interruptMode !=0)
- reportInterruptAfterWait(interruptMode);
- }
此段代码的逻辑是:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在 CLH 同步队列中(收到 signal 信号之后就会在 AQS 队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。
加入条件队列(addConditionWaiter())源码如下:
- privateNodeaddConditionWaiter() {
- Node t = lastWaiter;//尾节点
- //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
- if(t !=null&& t.waitStatus != Node.CONDITION) {//清除条件队列中所有状态不为Condition的节点unlinkCancelledWaiters();
- t = lastWaiter;
- }//当前线程新建节点,状态CONDITIONNode node =newNode(Thread.currentThread(), Node.CONDITION);/**
- * 将该节点加入到条件队列中最后一个位置
- */
- if(t ==null)
- firstWaiter = node;elset.nextWaiter = node;
- lastWaiter = node;returnnode;
- }
该方法主要是将当前线程加入到 Condition 条件队列中。当然在加入到尾节点之前会清楚所有状态不为 Condition 的节点。
fullyRelease(Node node),负责释放该线程持有的锁。
- final longfullyRelease(Node node) {booleanfailed =true;try{//节点状态--其实就是持有锁的数量
- longsavedState = getState();//释放锁
- if(release(savedState)) {
- failed =false;returnsavedState;
- }else{throw newIllegalMonitorStateException();
- }
- }finally{if(failed)
- node.waitStatus = Node.CANCELLED;
- }
- }
isOnSyncQueue(Node node):如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回 true
- final booleanisOnSyncQueue(Node node) {//状态为Condition,获取前驱节点为null,返回false
- if(node.waitStatus == Node.CONDITION || node.prev ==null)return false;//后继节点不为null,肯定在CLH同步队列中
- if(node.next !=null)return true;returnfindNodeFromTail(node);
- }
unlinkCancelledWaiters():负责将条件队列中状态不为 Condition 的节点删除
- private void unlinkCancelledWaiters() {
- Node t = firstWaiter;
- Node trail =null;while(t !=null) {
- Node next = t.nextWaiter;if(t.waitStatus != Node.CONDITION) {
- t.nextWaiter =null;if(trail ==null)
- firstWaiter = next;elsetrail.nextWaiter = next;if(next ==null)
- lastWaiter = trail;
- }elsetrail = t;
- t = next;
- }
- }
调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到 CLH 同步队列中。
- public final void signal() {//检测当前线程是否为拥有锁的独
- if(!isHeldExclusively())throw newIllegalMonitorStateException();//头节点,唤醒条件队列中的第一个节点Node first = firstWaiter;if(first !=null)
- doSignal(first);//唤醒}
该方法首先会判断当前线程是否已经获得了锁,这是前置条件。然后唤醒条件队列中的头节点。
doSignal(Node first):唤醒头节点
- private void doSignal(Node first) {
- do {//修改头结点,完成旧头结点的移出工作
- if( (firstWaiter = first.nextWaiter) ==null)
- lastWaiter =null;
- first.nextWaiter =null;
- }while(!transferForSignal(first) &&
- (first = firstWaiter) !=null);
- }
doSignal(Node first) 主要是做两件事:1. 修改头节点,2. 调用 transferForSignal(Node first) 方法将节点移动到 CLH 同步队列中。transferForSignal(Node first) 源码如下:
- final booleantransferForSignal(Node node) {//将该节点从状态CONDITION改变为初始状态0,
- if(!compareAndSetWaitStatus(node, Node.CONDITION,0))return false;//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点Node p = enq(node);intws = p.waitStatus;//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
- if(ws >0|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);return true;
- }
整个通知的流程如下:
一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过 isOnSyncQueue(Node node) 方法不断自检看节点是否已经在 CLH 同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法唤醒 CLH 同步队列的首节点。被唤醒的线程,将从 await() 方法中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。
只知道原理,如果不知道使用那就坑爹了,下面是用 Condition 实现的生产者消费者问题:
- public class ConditionTest{
- privateLinkedList buffer; //容器
- private intmaxSize ;//容器最大
- privateLock lock;privateCondition fullCondition;privateCondition notFullCondition;
- ConditionTest(intmaxSize){this.maxSize = maxSize;
- buffer =newLinkedList();
- lock = newReentrantLock();
- fullCondition = lock.newCondition();
- notFullCondition = lock.newCondition();
- }public void set(String string)throwsInterruptedException {
- lock.lock();//获取锁
- try{while(maxSize == buffer.size()){
- notFullCondition.await();//满了,添加的线程进入等待状态}
- buffer.add(string);
- fullCondition.signal();
- }finally{
- lock.unlock();//记得释放锁}
- }publicStringget()throwsInterruptedException {
- String string;
- lock.lock();try{while(buffer.size() ==0){
- fullCondition.await();
- }
- string = buffer.poll();
- notFullCondition.signal();
- }finally{
- lock.unlock();
- }returnstring;
- }
- }
来源: http://blog.csdn.net/chenssy/article/details/69279356