synchronized 的使用
synchronized 关键字是 Java 中解决并发问题的一种常用方法, 也是最简单的一种方法, 其作用有三个:(1)互斥性: 确保线程互斥的访问同步代码 (2) 可见性: 保证共享变量的修改能够及时可见 (3) 有序性: 有效解决重排序问题, 其用法也有三个:
修饰实例方法
修饰静态方法
修饰代码块
修饰实例方法
- public class Thread1 implements Runnable {
- // 共享资源(临界资源)
- static int i = 0;
- // 如果没有 synchronized 关键字, 输出小于 20000
- public synchronized void increase() {
- i++;
- }
- public void run() {
- for (int j = 0; j < 10000; j++) {
- increase();
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread1 t = new Thread1();
- Thread t1 = new Thread(t);
- Thread t2 = new Thread(t);
- t1.start();
- t2.start();
- t1.join(); // 主线程等待 t1 执行完毕
- t2.join(); // 主线程等待 t2 执行完毕
- System.out.println(i);
- }
- /**
- * 输出结果:
- * 20000
- */
- }
修饰静态方法
- public class Thread1 {
- // 共享资源(临界资源)
- static int i = 0;
- // 如果没有 synchronized 关键字, 输出小于 20000
- public static synchronized void increase() {
- i++;
- }
- public static void main(String[] args) throws InterruptedException {
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- for (int j = 0; j < 10000; j++) {
- increase();
- }
- }
- });
- Thread t2 = new Thread(new Runnable() {@Override public void run() {
- for (int j = 0; j < 10000; j++) {
- increase();
- }
- }
- });
- t1.start();
- t2.start();
- t1.join(); // 主线程等待 t1 执行完毕
- t2.join(); // 主线程等待 t2 执行完毕
- System.out.println(i);
- }
- /**
- * 输出结果:
- * 20000
- */
- }
修饰代码块
- public class Thread1 implements Runnable {
- // 共享资源(临界资源)
- static int i = 0;@Override public void run() {
- for (int j = 0; j < 10000; j++) {
- // 获得了 String 的类锁
- synchronized(String.class) {
- i++;
- }
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread1 t = new Thread1();
- Thread t1 = new Thread(t);
- Thread t2 = new Thread(t);
- t1.start();
- t2.start();
- t1.join();
- t2.join();
- System.out.println(i);
- }
- /**
- * 输出结果:
- * 20000
- */
- }
总结
synchronized 修饰的实例方法, 多线程并发访问时, 只能有一个线程进入, 获得对象内置锁, 其他线程阻塞等待, 但在此期间线程仍然可以访问其他方法
synchronized 修饰的静态方法, 多线程并发访问时, 只能有一个线程进入, 获得类锁, 其他线程阻塞等待, 但在此期间线程仍然可以访问其他方法
synchronized 修饰的代码块, 多线程并发访问时, 只能有一个线程进入, 根据括号中的对象或者是类, 获得相应的对象内置锁或者是类锁
每个类都有一个类锁, 类的每个对象也有一个内置锁, 它们是互不干扰的, 也就是说一个线程可以同时获得类锁和该类实例化对象的内置锁, 当线程访问非 synchronzied 修饰的方法时, 并不需要获得锁, 因此不会产生阻塞
Synchronzied 的底层原理
对象头和内置锁(ObjectMonitor)
根据 jvm 的分区, 对象分配在堆内存中, 可以用下图表示:
对象头
Hotspot 虚拟机的对象头包括两部分信息, 第一部分用于储存对象自身的运行时数据, 如哈希码, GC 分代年龄, 锁状态标志, 锁指针等, 这部分数据在 32bit 和 64bit 的虚拟机中大小分别为 32bit 和 64bit, 官方称它为 "Mark word", 考虑到虚拟机的空间效率, Mark Word 被设计成一个非固定的数据结构以便在极小的空间中存储尽量多的信息, 它会根据对象的状态复用自己的存储空间, 详细情况如下图:
对象头的另外一部分是类型指针, 即对象指向它的类元数据的指针, 如果对象访问定位方式是句柄访问, 那么该部分没有, 如果是直接访问, 该部分保留句柄访问方式如下图:
直接访问如下图:
内置锁(ObjectMonitor)
通常所说的对象的内置锁, 是对象头 Mark Word 中的重量级锁指针指向的 monitor 对象, 该对象是在 HotSpot 底层 C++ 语言编写的(openjdk 里面看), 简单看一下代码:
- // 结构体如下
- ObjectMonitor: :ObjectMonitor() {
- _header = NULL;
- _count = 0;
- _waiters = 0,
- _recursions = 0; // 线程的重入次数
- _object = NULL;
- _owner = NULL; // 标识拥有该 monitor 的线程
- _WaitSet = NULL; // 等待线程组成的双向循环链表,_WaitSet 是第一个节点
- _WaitSetLock = 0;
- _Responsible = NULL;
- _succ = NULL;
- _cxq = NULL; // 多线程竞争锁进入时的单向链表
- FreeNext = NULL;
- _EntryList = NULL; //_owner 从该双向循环链表中唤醒线程结点,_EntryList 是第一个节点
- _SpinFreq = 0;
- _SpinClock = 0;
- OwnerIsThread = 0;
- }
ObjectMonitor 队列之间的关系转换可以用下图表示:
既然提到了_waitSet 和_EntryList(_cxq 队列后面会说), 那就看一下底层的 wait 和 notify 方法
wait 方法的实现过程:
- //1. 调用 ObjectSynchronizer::wait 方法
- void ObjectSynchronizer: :wait(Handle obj, jlong millis, TRAPS) {
- /* 省略 */
- //2. 获得 Object 的 monitor 对象(即内置锁)
- ObjectMonitor * monitor = ObjectSynchronizer: :inflate(THREAD, obj());
- DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
- //3. 调用 monitor 的 wait 方法
- monitor - >wait(millis, true, THREAD);
- /* 省略 */
- }
- //4. 在 wait 方法中调用 addWaiter 方法
- inline void ObjectMonitor: :AddWaiter(ObjectWaiter * node) {
- /* 省略 */
- if (_WaitSet == NULL) {
- //_WaitSet 为 null, 就初始化_waitSet
- _WaitSet = node;
- node - >_prev = node;
- node - >_next = node;
- } else {
- // 否则就尾插
- ObjectWaiter * head = _WaitSet;
- ObjectWaiter * tail = head - >_prev;
- assert(tail - >_next == head, "invariant check");
- tail - >_next = node;
- head - >_prev = node;
- node - >_next = head;
- node - >_prev = tail;
- }
- }
- //5. 然后在 ObjectMonitor::exit 释放锁, 接着 thread_ParkEvent->park 也就是 wait
总结: 通过 object 获得内置锁(objectMonitor), 通过内置锁将 Thread 封装成 OjectWaiter 对象, 然后 addWaiter 将它插入以_waitSet 为首结点的等待线程链表中去, 最后释放锁
notify 方法的底层实现
- //1. 调用 ObjectSynchronizer::notify 方法
- void ObjectSynchronizer: :notify(Handle obj, TRAPS) {
- /* 省略 */
- //2. 调用 ObjectSynchronizer::inflate 方法
- ObjectSynchronizer: :inflate(THREAD, obj()) - >notify(THREAD);
- }
- //3. 通过 inflate 方法得到 ObjectMonitor 对象
- ObjectMonitor * ATTR ObjectSynchronizer: :inflate(Thread * Self, oop object) {
- /* 省略 */
- if (mark - >has_monitor()) {
- ObjectMonitor * inf = mark - >monitor();
- assert(inf - >header() - >is_neutral(), "invariant");
- assert(inf - >object() == object, "invariant");
- assert(ObjectSynchronizer: :verify_objmon_isinpool(inf), "monitor is inva;lid");
- return inf
- }
- /* 省略 */
- }
- //4. 调用 ObjectMonitor 的 notify 方法
- void ObjectMonitor: :notify(TRAPS) {
- /* 省略 */
- //5. 调用 DequeueWaiter 方法移出_waiterSet 第一个结点
- ObjectWaiter * iterator = DequeueWaiter();
- //6. 后面省略是将上面 DequeueWaiter 尾插入_EntrySet 的操作
- /** 省略 */
- }
总结: 通过 object 获得内置锁(objectMonitor), 调用内置锁的 notify 方法, 通过_waitset 结点移出等待链表中的首结点, 将它置于_EntrySet 中去, 等待获取锁注意: notifyAll 根据 policy 不同可能移入_EntryList 或者_cxq 队列中, 此处不详谈
在了解对象头和 ObjectMonitor 后, 接下来我们结合分析 synchronzied 的底层实现
synchronzied 的底层原理
synchronized 修饰代码块
通过下列简介的代码来分析:
- public class test{
- public void testSyn(){
- synchronized(this){
- }
- }
- }
javac 编译, javap -verbose 反编译, 结果如下:
- /**
- * ...
- **/
- public void testSyn();
- descriptor: ()V
- flags: ACC_PUBLIC
- Code:
- stack=2, locals=3, args_size=1
- 0: aload_0
- 1: dup
- 2: astore_1
- 3: monitorenter // 申请获得对象的内置锁
- 4: aload_1
- 5: monitorexit // 释放对象内置锁
- 6: goto 14
- 9: astore_2
- 10: aload_1
- 11: monitorexit // 释放对象内置锁
- 12: aload_2
- 13: athrow
- 14: return
此处我们只讨论了重量级锁 (ObjectMonitor) 的获取情况, 其他锁的获取放在后面 synchronzied 的优化中进行说明源码如下:
- void ATTR ObjectMonitor: :enter(TRAPS) {
- Thread * const Self = THREAD;
- void * cur;
- // 通过 CAS 操作尝试把 monitor 的_owner 字段设置为当前线程
- cur = Atomic: :cmpxchg_ptr(Self, &_owner, NULL);
- // 获取锁失败
- if (cur == NULL) {
- assert(_recursions == 0, "invariant");
- assert(_owner == Self, "invariant");
- return;
- }
- // 如果之前的_owner 指向该 THREAD, 那么该线程是重入,_recursions++
- if (cur == Self) {
- _recursions++;
- return;
- }
- // 如果当前线程是第一次进入该 monitor, 设置_recursions 为 1,_owner 为当前线程
- if (Self - >is_lock_owned((address) cur)) {
- assert(_recursions == 0, "internal state error");
- _recursions = 1; //_recursions 标记为 1
- _owner = Self; // 设置 owner
- OwnerIsThread = 1;
- return;
- }
- /**
- * 此处省略锁的自旋优化等操作, 统一放在后面 synchronzied 优化中说
- **/
总结:
如果 monitor 的进入数为 0, 则该线程进入 monitor, 然后将进入数设置为 1, 该线程即为 monitor 的 owner
如果线程已经占有该 monitor, 只是重新进入, 则进入 monitor 的进入数加 1.
如果其他线程已经占用了 monitor, 则该线程进入阻塞状态, 直到 monitor 的进入数为 0, 再重新尝试获取 monitor 的所有权
synchronized 修饰方法
还是从简洁的代码来分析:
- public class test{
- public synchronized void testSyn(){
- }
- }
javac 编译, javap -verbose 反编译, 结果如下:
- /**
- * ...
- **/
- public synchronized void testSyn();
- descriptor: ()V
- flags: ACC_PUBLIC, ACC_SYNCHRONIZED
- Code:
- stack=0, locals=1, args_size=1
- 0: return
- LineNumberTable:
- line 3: 0
结果和 synchronized 修饰代码块的情况不同, 仔细比较会发现多了 ACC_SYNCHRONIZED 这个标识, test.java 通过 javac 编译形成的 test.class 文件, 在该文件中包含了 testSyn 方法的方法表, 其中 ACC_SYNCHRONIZED 标志位是 1, 当线程执行方法的时候会检查该标志位, 如果为 1, 就自动的在该方法前后添加 monitorenter 和 monitorexit 指令, 可以称为 monitor 指令的隐式调用
上面所介绍的通过 synchronzied 实现同步用到了对象的内置锁(ObjectMonitor), 而在 ObjectMonitor 的函数调用中会涉及到 Mutex lock 等特权指令, 那么这个时候就存在操作系统用户态和核心态的转换, 这种切换会消耗大量的系统资源, 因为用户态与内核态都有各自专用的内存空间, 专用的寄存器等, 用户态切换至内核态需要传递给许多变量参数给内核, 内核也需要保护好用户态在切换时的一些寄存器值变量等, 这也是为什么早期的 synchronized 效率低的原因在 jdk1.6 之后, 从 jvm 层面做了很大的优化, 下面主要介绍做了哪些优化
synchronized 的优化
在了解了 synchronized 重量级锁效率特别低之后, jdk 自然做了一些优化, 出现了偏向锁, 轻量级锁, 重量级锁, 自旋等优化, 我们应该改正 monitorenter 指令就是获取对象重量级锁的错误认识, 很显然, 优化之后, 锁的获取判断次序是偏向锁 ->轻量级锁 ->重量级锁
偏向锁
源码如下:
- // 偏向锁入口
- void ObjectSynchronizer: :fast_enter(Handle obj, BasicLock * lock, bool attempt_rebias, TRAPS) {
- //UseBiasedLocking 判断是否开启偏向锁
- if (UseBiasedLocking) {
- if (!SafepointSynchronize: :is_at_safepoint()) {
- // 获取偏向锁的函数调用
- BiasedLocking: :Condition cond = BiasedLocking: :revoke_and_rebias(obj, attempt_rebias, THREAD);
- if (cond == BiasedLocking: :BIAS_REVOKED_AND_REBIASED) {
- return;
- }
- } else {
- assert(!attempt_rebias, "can not rebias toward VM thread");
- BiasedLocking: :revoke_at_safepoint(obj);
- }
- }
- // 不能偏向, 就获取轻量级锁
- slow_enter(obj, lock, THREAD);
- }
BiasedLocking::revoke_and_rebias 调用过程如下流程图:
偏向锁的撤销过程如下:
轻量级锁
轻量级锁获取源码:
- // 轻量级锁入口
- void ObjectSynchronizer: :slow_enter(Handle obj, BasicLock * lock, TRAPS) {
- markOop mark = obj - >mark(); // 获得 Mark Word
- assert(!mark - >has_bias_pattern(), "should not see bias pattern here");
- // 是否无锁不可偏向, 标志 001
- if (mark - >is_neutral()) {
- // 图 A 步骤 1
- lock - >set_displaced_header(mark);
- // 图 A 步骤 2
- if (mark == (markOop) Atomic: :cmpxchg_ptr(lock, obj() - >mark_addr(), mark)) {
- TEVENT(slow_enter: release stacklock);
- return;
- }
- // Fall through to inflate() ...
- } else if (mark - >has_locker() && THREAD - >is_lock_owned((address) mark - >locker())) { // 如果 Mark Word 指向本地栈帧, 线程重入
- assert(lock != mark - >locker(), "must not re-lock the same lock");
- assert(lock != (BasicLock * ) obj - >mark(), "don't relock with same BasicLock");
- lock - >set_displaced_header(NULL); //header 设置为 null
- return;
- }
- lock - >set_displace d_header(markOopDesc: :unused_mark());
- // 轻量级锁膨胀, 膨胀完成之后尝试获取重量级锁
- ObjectSynchronizer: :inflate(THREAD, obj()) - >enter(THREAD);
- }
轻量级锁获取流程如下:
轻量级锁撤销源码:
- void ObjectSynchronizer: :fast_exit(oop object, BasicLock * lock, TRAPS) {
- assert(!object - >mark() - >has_bias_pattern(), "should not see bias pattern here");
- markOop dhw = lock - >displaced_header();
- markOop mark;
- if (dhw == NULL) { // 如果 header 为 null, 说明这是线程重入的栈帧, 直接返回, 不用回写
- mark = object - >mark();
- assert(!mark - >is_neutral(), "invariant");
- if (mark - >has_locker() && mark != markOopDesc: :INFLATING()) {
- assert(THREAD - >is_lock_owned((address) mark - >locker()), "invariant");
- }
- if (mark - >has_monitor()) {
- ObjectMonitor * m = mark - >monitor();
- }
- return;
- }
- mark = object - >mark();
- if (mark == (markOop) lock) {
- assert(dhw - >is_neutral(), "invariant");
- //CAS 将 Mark Word 内容写回
- if ((markOop) Atomic: :cmpxchg_ptr(dhw, object - >mark_addr(), mark) == mark) {
- TEVENT(fast_exit: release stacklock);
- return;
- }
- }
- //CAS 操作失败, 轻量级锁膨胀, 为什么在撤销锁的时候会有失败的可能?
- ObjectSynchronizer: :inflate(THREAD, object) - >exit(THREAD);
- }
轻量级锁撤销流程如下:
轻量级锁膨胀
源代码:
- ObjectMonitor * ATTR ObjectSynchronizer: :inflate(Thread * Self, oop object) {
- assert(Universe: :verify_in_progress() || !SafepointSynchronize: :is_at_safepoint(), "invariant");
- for (;;) { // 为后面的 continue 操作提供自旋
- const markOop mark = object - >mark(); // 获得 Mark Word 结构
- assert(!mark - >has_bias_pattern(), "invariant");
- //Mark Word 可能有以下几种状态:
- // * Inflated(膨胀完成) - just return
- // * Stack-locked(轻量级锁) - coerce it to inflated
- // * INFLATING(膨胀中) - busy wait for conversion to complete
- // * Neutral(无锁) - aggressively inflate the object.
- // * BIASED(偏向锁) - Illegal. We should never see this
- if (mark - >has_monitor()) { // 判断是否是重量级锁
- ObjectMonitor * inf = mark - >monitor();
- assert(inf - >header() - >is_neutral(), "invariant");
- assert(inf - >object() == object, "invariant");
- assert(ObjectSynchronizer: :verify_objmon_isinpool(inf), "monitor is invalid");
- //Mark->has_monitor()为 true, 说明已经是重量级锁了, 膨胀过程已经完成, 返回
- return inf;
- }
- if (mark == markOopDesc: :INFLATING()) { // 判断是否在膨胀
- TEVENT(Inflate: spin
- while INFLATING);
- ReadStableMark(object);
- continue; // 如果正在膨胀, 自旋等待膨胀完成
- }
- if (mark - >has_locker()) { // 如果当前是轻量级锁
- ObjectMonitor * m = omAlloc(Self); // 返回一个对象的内置 ObjectMonitor 对象
- m - >Recycle();
- m - >_Responsible = NULL;
- m - >OwnerIsThread = 0;
- m - >_recursions = 0;
- m - >_SpinDuration = ObjectMonitor: :Knob_SpinLimit; // 设置自旋获取重量级锁的次数
- //CAS 操作标识 Mark Word 正在膨胀
- markOop cmp = (markOop) Atomic: :cmpxchg_ptr(markOopDesc: :INFLATING(), object - >mark_addr(), mark);
- if (cmp != mark) {
- omRelease(Self, m, true);
- continue; // 如果上述 CAS 操作失败, 自旋等待膨胀完成
- }
- m - >set_header(dmw);
- m - >set_owner(mark - >locker()); // 设置 ObjectMonitor 的_owner 为拥有对象轻量级锁的线程, 而不是当前正在 inflate 的线程
- m - >set_object(object);
- /**
- * 省略了部分代码
- **/
- return m;
- }
- }
- }
轻量级锁膨胀流程图:
现在来回答下之前提出的问题: 为什么在撤销轻量级锁的时候会有失败的可能?
假设 thread1 拥有了轻量级锁, Mark Word 指向 thread1 栈帧, thread2 请求锁的时候, 就会膨胀初始化 ObjectMonitor 对象, 将 Mark Word 更新为指向 ObjectMonitor 的指针, 那么在 thread1 退出的时候, CAS 操作会失败, 因为 Mark Word 不再指向 thread1 的栈帧, 这个时候 thread1 自旋等待 infalte 完毕, 执行重量级锁的退出操作
重量级锁
重量级锁的获取入口:
- void ATTR ObjectMonitor: :enter(TRAPS) {
- Thread * const Self = THREAD;
- void * cur;
- cur = Atomic: :cmpxchg_ptr(Self, &_owner, NULL);
- if (cur == NULL) {
- assert(_recursions == 0, "invariant");
- assert(_owner == Self, "invariant");
- return;
- }
- if (cur == Self) {
- _recursions++;
- return;
- }
- if (Self - >is_lock_owned((address) cur)) {
- assert(_recursions == 0, "internal state error");
- _recursions = 1;
- // Commute owner from a thread-specific on-stack BasicLockObject address to
- // a full-fledged "Thread *".
- _owner = Self;
- OwnerIsThread = 1;
- return;
- }
- /**
- * 上述部分在前面已经分析过, 不再累述
- **/
- Self - >_Stalled = intptr_t(this);
- //TrySpin 是一个自旋获取锁的操作, 此处就不列出源码了
- if (Knob_SpinEarly && TrySpin(Self) > 0) {
- Self - >_Stalled = 0;
- return;
- }
- /*
- * 省略部分代码
- */
- for (;;) {
- EnterI(THREAD);
- /**
- * 省略了部分代码
- **/
- }
- }
进入 EnterI (TRAPS)方法(这段代码个人觉得很有意思):
- void ATTR ObjectMonitor: :EnterI(TRAPS) {
- Thread * Self = THREAD;
- if (TryLock(Self) > 0) {
- // 这下不自旋了, 我就默默的 TryLock 一下
- return;
- }
- DeferredInitialize();
- // 此处又有自旋获取锁的操作
- if (TrySpin(Self) > 0) {
- return;
- }
- /**
- * 到此, 自旋终于全失败了, 要入队挂起了
- **/
- ObjectWaiter node(Self); // 将 Thread 封装成 ObjectWaiter 结点
- Self - >_ParkEvent - >reset();
- node._prev = (ObjectWaiter * ) 0xBAD;
- node.TState = ObjectWaiter: :TS_CXQ;
- ObjectWaiter * nxt;
- for (;;) { // 循环, 保证将 node 插入队列
- node._next = nxt = _cxq; // 将 node 插入到_cxq 队列的首部
- //CAS 修改_cxq 指向 node
- if (Atomic: :cmpxchg_ptr( & node, &_cxq, nxt) == nxt) break;
- if (TryLock(Self) > 0) { // 我再默默的 TryLock 一下, 真的是不想挂起呀!
- return;
- }
- }
- if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
- // Try to assume the role of responsible thread for the monitor.
- // CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
- Atomic: :cmpxchg_ptr(Self, &_Responsible, NULL);
- }
- TEVENT(Inflated enter - Contention);
- int nWakeups = 0;
- int RecheckInterval = 1;
- for (;;) {
- if (TryLock(Self) > 0) break; // 临死之前, 我再 TryLock 下
- if ((SyncFlags & 2) && _Responsible == NULL) {
- Atomic: :cmpxchg_ptr(Self, &_Responsible, NULL);
- }
- if (_Responsible == Self || (SyncFlags & 1)) {
- TEVENT(Inflated enter - park TIMED);
- Self - >_ParkEvent - >park((jlong) RecheckInterval);
- RecheckInterval *= 8;
- if (RecheckInterval > 1000) RecheckInterval = 1000;
- } else {
- TEVENT(Inflated enter - park UNTIMED);
- Self - >_ParkEvent - >park(); // 终于挂起了
- }
- if (TryLock(Self) > 0) break;
- /**
- * 后面代码省略
- **/
- }
try 了那么多次 lock, 接下来看下 TryLock:
- int ObjectMonitor: :TryLock(Thread * Self) {
- for (;;) {
- void * own = _owner;
- if (own != NULL) return 0; // 如果有线程还拥有着重量级锁, 退出
- //CAS 操作将_owner 修改为当前线程, 操作成功 return>0
- if (Atomic: :cmpxchg_ptr(Self, &_owner, NULL) == NULL) {
- return 1;
- }
- //CAS 更新失败 return<0
- if (true) return - 1;
- }
- }
重量级锁获取入口流程图:
重量级锁的出口:
- void ATTR ObjectMonitor: :exit(TRAPS) {
- Thread * Self = THREAD;
- if (THREAD != _owner) {
- if (THREAD - >is_lock_owned((address) _owner)) {
- _owner = THREAD;
- _recursions = 0;
- OwnerIsThread = 1;
- } else {
- TEVENT(Exit - Throw IMSX);
- if (false) {
- THROW(vmSymbols: :java_lang_IllegalMonitorStateException());
- }
- return;
- }
- }
- if (_recursions != 0) {
- _recursions--; // 如果_recursions 次数不为 0. 自减
- TEVENT(Inflated exit - recursive);
- return;
- }
- if ((SyncFlags & 4) == 0) {
- _Responsible = NULL;
- }
- for (;;) {
- if (Knob_ExitPolicy == 0) {
- OrderAccess: :release_store_ptr( & _owner, NULL); // drop the lock
- OrderAccess: :storeload();
- if ((intptr_t(_EntryList) | intptr_t(_cxq)) == 0 || _succ != NULL) {
- TEVENT(Inflated exit - simple egress);
- return;
- }
- TEVENT(Inflated exit - complex egress);
- if (Atomic: :cmpxchg_ptr(THREAD, &_owner, NULL) != NULL) {
- return;
- }
- TEVENT(Exit - Reacquired);
- } else {
- if ((intptr_t(_EntryList) | intptr_t(_cxq)) == 0 || _succ != NULL) {
- OrderAccess: :release_store_ptr( & _owner, NULL);
- OrderAccess: :storeload();
- if (_cxq == NULL || _succ != NULL) {
- TEVENT(Inflated exit - simple egress);
- return;
- }
- if (Atomic: :cmpxchg_ptr(THREAD, &_owner, NULL) != NULL) {
- TEVENT(Inflated exit - reacquired succeeded);
- return;
- }
- TEVENT(Inflated exit - reacquired failed);
- } else {
- TEVENT(Inflated exit - complex egress);
- }
- }
- ObjectWaiter * w = NULL;
- int QMode = Knob_QMode;
- if (QMode == 2 && _cxq != NULL) {
- /**
- * 模式 2:cxq 队列的优先权大于 EntryList, 直接从 cxq 队列中取出一个线程结点, 准备唤醒
- **/
- w = _cxq;
- ExitEpilog(Self, w);
- return;
- }
- if (QMode == 3 && _cxq != NULL) {
- /**
- * 模式 3: 将 cxq 队列插入到_EntryList 尾部
- **/
- w = _cxq;
- for (;;) {
- //CAS 操作取出 cxq 队列首结点
- ObjectWaiter * u = (ObjectWaiter * ) Atomic: :cmpxchg_ptr(NULL, &_cxq, w);
- if (u == w) break;
- w = u; // 更新 w, 自旋
- }
- ObjectWaiter * q = NULL;
- ObjectWaiter * p;
- for (p = w; p != NULL; p = p - >_next) {
- guarantee(p - >TState == ObjectWaiter: :TS_CXQ, "Invariant");
- p - >TState = ObjectWaiter: :TS_ENTER; // 改变 ObjectWaiter 状态
- // 下面两句为 cxq 队列反向构造一条链, 即将 cxq 变成双向链表
- p - >_prev = q;
- q = p;
- }
- ObjectWaiter * Tail;
- // 获得_EntryList 尾结点
- for (Tail = _EntryList; Tail != NULL && Tail - >_next != NULL; Tail = Tail - >_next);
- if (Tail == NULL) {
- _EntryList = w; //_EntryList 为空,_EntryList=w
- } else {
- // 将 w 插入_EntryList 队列尾部
- Tail - >_next = w;
- w - >_prev = Tail;
- }
- }
- if (QMode == 4 && _cxq != NULL) {
- /**
- * 模式四: 将 cxq 队列插入到_EntryList 头部
- **/
- w = _cxq;
- for (;;) {
- ObjectWaiter * u = (ObjectWaiter * ) Atomic: :cmpxchg_ptr(NULL, &_cxq, w);
- if (u == w) break;
- w = u;
- }
- ObjectWaiter * q = NULL;
- ObjectWaiter * p;
- for (p = w; p != NULL; p = p - >_next) {
- guarantee(p - >TState == ObjectWaiter: :TS_CXQ, "Invariant");
- p - >TState = ObjectWaiter: :TS_ENTER;
- p - >_prev = q;
- q = p;
- }
- if (_EntryList != NULL) {
- //q 为 cxq 队列最后一个结点
- q - >_next = _EntryList;
- _EntryList - >_prev = q;
- }
- _EntryList = w;
- }
- w = _EntryList;
- if (w != NULL) {
- ExitEpilog(Self, w); // 从_EntryList 中唤醒线程
- return;
- }
- w = _cxq;
- if (w == NULL) continue; // 如果_cxq 和_EntryList 队列都为空, 自旋
- for (;;) {
- // 自旋再获得 cxq 首结点
- ObjectWaiter * u = (ObjectWaiter * ) Atomic: :cmpxchg_ptr(NULL, &_cxq, w);
- if (u == w) break;
- w = u;
- }
- /**
- * 下面执行的是: cxq 不为空,_EntryList 为空的情况
- **/
- if (QMode == 1) { // 结合前面的代码, 如果 QMode == 1,_EntryList 不为空, 直接从_EntryList 中唤醒线程
- // QMode == 1 : drain cxq to EntryList, reversing order
- // We also reverse the order of the list.
- ObjectWaiter * s = NULL;
- ObjectWaiter * t = w;
- ObjectWaiter * u = NULL;
- while (t != NULL) {
- guarantee(t - >TState == ObjectWaiter: :TS_CXQ, "invariant");
- t - >TState = ObjectWaiter: :TS_ENTER;
- // 下面的操作是双向链表的倒置
- u = t - >_next;
- t - >_prev = u;
- t - >_next = s;
- s = t;
- t = u;
- }
- _EntryList = s; //_EntryList 为倒置后的 cxq 队列
- } else {
- // QMode == 0 or QMode == 2
- _EntryList = w;
- ObjectWaiter * q = NULL;
- ObjectWaiter * p;
- for (p = w; p != NULL; p = p - >_next) {
- guarantee(p - >TState == ObjectWaiter: :TS_CXQ, "Invariant");
- p - >TState = ObjectWaiter: :TS_ENTER;
- // 构造成双向的
- p - >_prev = q;
- q = p;
- }
- }
- if (_succ != NULL) continue;
- w = _EntryList;
- if (w != NULL) {
- ExitEpilog(Self, w); // 从_EntryList 中唤醒线程
- return;
- }
- }
- }
ExitEpilog 用来唤醒线程, 代码如下:
- void ObjectMonitor: :ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
- assert(_owner == Self, "invariant");
- _succ = Knob_SuccEnabled ? Wakee - >_thread: NULL;
- ParkEvent * Trigger = Wakee - >_event;
- Wakee = NULL;
- OrderAccess: :release_store_ptr( & _owner, NULL);
- OrderAccess: :fence();
- if (SafepointSynchronize: :do_call_back()) {
- TEVENT(unpark before SAFEPOINT);
- }
- DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
- Trigger - >unpark(); // 唤醒线程
- // Maintain stats and report events to JVMTI
- if (ObjectMonitor: :_sync_Parks != NULL) {
- ObjectMonitor: :_sync_Parks - >inc();
- }
- }
重量级锁出口流程图:
自旋
通过对源码的分析, 发现多处存在自旋和 tryLock 操作, 那么这些操作好不好, 如果 tryLock 过少, 大部分线程都会挂起, 因为在拥有对象锁的线程释放锁后不能及时感知, 导致用户态和核心态状态转换较多, 效率低下, 极限思维就是: 没有自旋, 所有线程挂起, 如果 tryLock 过多, 存在两个问题: 1. 即使自旋避免了挂起, 但是自旋的代价超过了挂起, 得不偿失, 那我还不如不要自旋了 2. 如果自旋仍然不能避免大部分挂起的话, 那就是又自旋又挂起, 效率太低极限思维就是: 无限自旋, 白白浪费了 cpu 资源, 所以在代码中每个自旋和 tryLock 的插入应该都是经过测试后决定的
编译期间锁优化
锁消除
还是先看一下简洁的代码
- public class test {
- public String test(String s1, String s2) {
- return s1 + s2;
- }
- }
javac javap 后:
- public class test {
- public test();
- Code:
- 0: aload_0
- 1: invokespecial #1 // Method java/lang/Object."<init>":()V
- 4: return
- public java.lang.String test(java.lang.String, java.lang.String);
- Code:
- 0: new #2 // class java/lang/StringBuilder
- 3: dup
- 4: invokespecial #3 // Method java/lang/StringBuilder."<init>":()V
- 7: aload_1
- 8: invokevirtual #4 // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
- 11: aload_2
- 12: invokevirtual #4 // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
- 15: invokevirtual #5 // Method java/lang/StringBuilder.toString:()Ljava/lang/String;
- 18: areturn
- }
上述字节码等价成 java 代码为:
- public class test {
- public String test(String s1, String s2) {
- StringBuilder sb = new StringBuilder();
- sb.append(s1);
- sb.append(s2);
- return sb.toString();
- }
- }
sb 的 append 方法是同步的, 但是 sb 是在方法内部, 每个运行的线程都会实例化一个 StringBuilder 对象, 在私有栈持有该对象引用(其他线程无法得到), 也就是说 sb 不存在多线程访问, 那么在 jvm 运行期间, 即时编译器就会将锁消除
锁粗化
将前面的代码稍微变一下:
- public class test {
- StringBuilder sb = new StringBuilder();
- public String test(String s1, String s2) {
- sb.append(s1);
- sb.append(s2);
- return sb.toString();
- }
- }
首先可以确定的是这段代码不能锁消除优化, 因为 sb 是类的实例变量, 会被多线程访问, 存在线程安全问题, 那么访问 test 方法的时候就会对 sb 对象, 加锁, 解锁, 加锁, 解锁, 很显然这一过程将会大大降低效率, 因此在即时编译的时候会进行锁粗化, 在 sb.appends(s1)之前加锁, 在 sb.append(s2)执行完后释放锁
总结
引入偏向锁的目的: 在只有单线程执行情况下, 尽量减少不必要的轻量级锁执行路径, 轻量级锁的获取及释放依赖多次 CAS 原子指令, 而偏向锁只依赖一次 CAS 原子指令置换 ThreadID, 之后只要判断线程 ID 为当前线程即可, 偏向锁使用了一种等到竞争出现才释放锁的机制, 消除偏向锁的开销还是蛮大的如果同步资源或代码一直都是多线程访问的, 那么消除偏向锁这一步骤对你来说就是多余的, 可以通过 - XX:-UseBiasedLocking=false 来关闭
引入轻量级锁的目的: 在多线程交替执行同步块的情况下, 尽量避免重量级锁引起的性能消耗(用户态和核心态转换), 但是如果多个线程在同一时刻进入临界区, 会导致轻量级锁膨胀升级重量级锁, 所以轻量级锁的出现并非是要替代重量级锁
重入: 对于不同级别的锁都有重入策略, 偏向锁: 单线程独占, 重入只用检查 threadId 等于该线程; 轻量级锁: 重入将栈帧中 lock record 的 header 设置为 null, 重入退出, 只用弹出栈帧, 直到最后一个重入退出 CAS 写回数据释放锁; 重量级锁: 重入_recursions++, 重入退出_recursions--,_recursions=0 时释放锁
最后放一张摘自网上的一张大图:
ps: 源码解读与流程图都是原创, 可能会有贻误, 欢迎指正, 过程不易, 如果觉得帮到了你, 就点个推荐吧!
参考资料
- http: //blog.csdn.net/javazejian/article/details/72828483
- http: //www.cnblogs.com/paddix/p/5405678.html
- http: //www.cnblogs.com/paddix/p/5367116.html
- https: //www.jianshu.com/p/c5058b6fe8e5
- http: //blog.csdn.net/zqz_zqz/article/details/70233767
- http: //blog.csdn.net/u012465296/article/details/53022317
- https: //www.zhihu.com/question/39009953?sort=created
- http: //blog.sina.com.cn/s/blog_c038e9930102v2hs.html
- http: //hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/runtime/synchronizer.cpp
- http: //hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/runtime/objectMonitor.cpp
来源: https://www.cnblogs.com/kundeg/p/8422557.html