死磕 Synchronized 底层实现 -- 轻量级锁
本文为死磕 Synchronized 底层实现第三篇文章, 内容为轻量级锁实现.
轻量级锁并不复杂, 其中很多内容在 偏向锁 https://github.com/farmerjohngit/myblog/issues/13 一文中已提及过, 与本文内容会有部分重叠 .
另外轻量级锁的背景和基本流程在 概论 https://github.com/farmerjohngit/myblog/issues/12 中已有讲解. 强烈建议在看过两篇文章的基础下阅读本文 .
本系列文章将对 HotSpot 的 synchronized 锁实现进行全面分析, 内容包括偏向锁, 轻量级锁, 重量级锁的加锁, 解锁, 锁升级流程的原理及源码分析, 希望给在研究 synchronized 路上的同学一些帮助. 主要包括以下几篇文章:
死磕 Synchronized 底层实现 -- 概论 https://github.com/farmerjohngit/myblog/issues/12
死磕 Synchronized 底层实现 -- 偏向锁 https://github.com/farmerjohngit/myblog/issues/13
死磕 Synchronized 底层实现 -- 轻量级锁 https://github.com/farmerjohngit/myblog/issues/14
死磕 Synchronized 底层实现 -- 重量级锁 (待更新)
更多文章见个人博客: https://github.com/farmerjohngit/myblog
本文分为两个部分:
1. 轻量级锁获取流程
2. 轻量级锁释放流程
本人看的 JVM 版本是 jdk8u, 具体版本号以及代码可以在 这里 看到.
轻量级锁获取流程
下面开始轻量级锁获取流程分析, 代码在 bytecodeInterpreter.cpp#1816 .
- CASE(_monitorenter): {
- oop lockee = STACK_OBJECT(-1);
- ...
- if (entry != NULL) {
- ...
- // 上面省略的代码中如果 CAS 操作失败也会调用到 InterpreterRuntime::monitorenter
- // traditional lightweight locking
- if (!success) {
- // 构建一个无锁状态的 Displaced Mark Word
- markOop displaced = lockee->mark()->set_unlocked();
- // 设置到 Lock Record 中去
- entry->lock()->set_displaced_header(displaced);
- bool call_vm = UseHeavyMonitors;
- if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
- // 如果 CAS 替换不成功, 代表锁对象不是无锁状态, 这时候判断下是不是锁重入
- // Is it simple recursive case?
- if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
- entry->lock()->set_displaced_header(NULL);
- } else {
- // CAS 操作失败则调用 monitorenter
- CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
- }
- }
- }
- UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
- } else {
- istate->set_msg(more_monitors);
- UPDATE_PC_AND_RETURN(0); // Re-execute
- }
- }
如果锁对象不是偏向模式或已经偏向其他线程, 则 success 为 false . 这时候会构建一个无锁状态的 mark Word 设置到 Lock Record 中去, 我们称 Lock Record 中存储对象 mark Word 的字段叫 Displaced Mark Word .
如果当前锁的状态不是无锁状态, 则 CAS 失败. 如果这是一次锁重入, 那直接将 Lock Record 的 Displaced Mark Word 设置为 null .
我们看个 demo, 在该 demo 中重复 3 次获得锁,
- synchronized(obj){
- synchronized(obj){
- synchronized(obj){
- }
- }
- }
假设锁的状态是轻量级锁, 下图反应了 mark Word 和线程栈中 Lock Record 的状态, 可以看到右边线程栈中包含 3 个指向当前锁对象的 Lock Record . 其中栈中最高位的 Lock Record 为第一次获取锁时分配的. 其 Displaced Mark Word 的值为锁对象的加锁前的 mark Word , 之后的锁重入会在线程栈中分配一个 Displaced Mark Word 为 null 的 Lock Record .
为什么 JVM 选择在线程栈中添加 Displaced Mark Word 为 null 的 Lock Record 来表示重入计数呢? 首先锁重入次数是一定要记录下来的, 因为每次解锁都需要对应一次加锁, 解锁次数等于加锁次数时, 该锁才真正的被释放, 也就是在解锁时需要用到说锁重入次数的. 一个简单的方案是将锁重入次数记录在对象头的 mark Word 中, 但 mark Word 的大小是有限的, 已经存放不下该信息了. 另一个方案是只创建一个 Lock Record 并在其中记录重入次数, Hotspot 没有这样做的原因我猜是考虑到效率有影响: 每次重入获得锁都需要遍历该线程的栈找到对应的 Lock Record , 然后修改它的值.
所以最终 Hotspot 选择每次获得锁都添加一个 Lock Record 来表示锁的重入.
接下来看看 InterpreterRuntime::monitorenter 方法
- IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
- ...
- Handle h_obj(thread, elem->obj());
- assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
- "must be NULL or an object");
- if (UseBiasedLocking) {
- // Retry fast entry if bias is revoked to avoid unnecessary inflation
- ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
- } else {
- ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
- }
- ...
- IRT_END
fast_enter 的流程在偏向锁一文已经分析过, 如果当前是偏向模式且偏向的线程还在使用锁, 那会将锁的 mark Word 改为轻量级锁的状态, 同时会将偏向的线程栈中的 Lock Record 修改为轻量级锁对应的形式. 代码位置在 biasedLocking.cpp#212 .
- // 线程还存活则遍历线程栈中所有的 Lock Record
- GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
- BasicLock* highest_lock = NULL;
- for (int i = 0; i <cached_monitor_info->length(); i++) {
- MonitorInfo* mon_info = cached_monitor_info->at(i);
- // 如果能找到对应的 Lock Record 说明偏向的线程还在执行同步代码块中的代码
- if (mon_info->owner() == obj) {
- ...
- // 需要升级为轻量级锁, 直接修改偏向线程栈中的 Lock Record. 为了处理锁重入的 case, 在这里将 Lock Record 的 Displaced Mark Word 设置为 null, 第一个 Lock Record 会在下面的代码中再处理
- markOop mark = markOopDesc::encode((BasicLock*) NULL);
- highest_lock = mon_info->lock();
- highest_lock->set_displaced_header(mark);
- } else {
- ...
- }
- }
- if (highest_lock != NULL) {
- // 修改第一个 Lock Record 为无锁状态, 然后将 obj 的 mark Word 设置为执行该 Lock Record 的指针
- highest_lock->set_displaced_header(unbiased_prototype);
- obj->release_set_mark(markOopDesc::encode(highest_lock));
- ...
- } else {
- ...
- }
我们看 slow_enter 的流程.
- void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
- markOop mark = obj->mark();
- assert(!mark->has_bias_pattern(), "should not see bias pattern here");
- // 如果是无锁状态
- if (mark->is_neutral()) {
- // 设置 Displaced Mark Word 并替换对象头的 mark Word
- lock->set_displaced_header(mark);
- if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
- TEVENT (slow_enter: release stacklock) ;
- return ;
- }
- } else
- if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
- assert(lock != mark->locker(), "must not re-lock the same lock");
- assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
- // 如果是重入, 则设置 Displaced Mark Word 为 null
- lock->set_displaced_header(NULL);
- return;
- }
- ...
- // 走到这一步说明已经是存在多个线程竞争锁了 需要膨胀为重量级锁
- lock->set_displaced_header(markOopDesc::unused_mark());
- ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
- }
轻量级锁释放流程
- CASE(_monitorexit): {
- oop lockee = STACK_OBJECT(-1);
- CHECK_NULL(lockee);
- // derefing's lockee ought to provoke implicit null check
- // find our monitor slot
- BasicObjectLock* limit = istate->monitor_base();
- BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
- // 从低往高遍历栈的 Lock Record
- while (most_recent != limit ) {
- // 如果 Lock Record 关联的是该锁对象
- if ((most_recent)->obj() == lockee) {
- BasicLock* lock = most_recent->lock();
- markOop header = lock->displaced_header();
- // 释放 Lock Record
- most_recent->set_obj(NULL);
- // 如果是偏向模式, 仅仅释放 Lock Record 就好了. 否则要走轻量级锁 or 重量级锁的释放流程
- if (!lockee->mark()->has_bias_pattern()) {
- bool call_vm = UseHeavyMonitors;
- // header!=NULL 说明不是重入, 则需要将 Displaced Mark Word CAS 到对象头的 Mark Word
- if (header != NULL || call_vm) {
- if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
- // CAS 失败或者是重量级锁则会走到这里, 先将 obj 还原, 然后调用 monitorexit 方法
- most_recent->set_obj(lockee);
- CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
- }
- }
- }
- // 执行下一条命令
- UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
- }
- // 处理下一条 Lock Record
- most_recent++;
- }
- // Need to throw illegal monitor state exception
- CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
- ShouldNotReachHere();
- }
轻量级锁释放时需要将 Displaced Mark Word 替换到对象头的 mark Word 中. 如果 CAS 失败或者是重量级锁则进入到 InterpreterRuntime::monitorexit 方法中.
- //%note monitor_1
- IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
- Handle h_obj(thread, elem->obj());
- ...
- ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
- // Free entry. This must be done here, since a pending exception might be installed on
- // 释放 Lock Record
- elem->set_obj(NULL);
- ...
- IRT_END
monitorexit 调用完 slow_exit 方法后, 就释放 Lock Record .
- void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
- fast_exit (object, lock, THREAD) ;
- }
- void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
- ...
- markOop dhw = lock->displaced_header();
- markOop mark ;
- if (dhw == NULL) {
- // 重入锁, 什么也不做
- ...
- return ;
- }
- mark = object->mark() ;
- // 如果是 mark Word==Displaced Mark Word 即轻量级锁, CAS 替换对象头的 mark Word
- if (mark == (markOop) lock) {
- assert (dhw->is_neutral(), "invariant") ;
- if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
- TEVENT (fast_exit: release stacklock) ;
- return;
- }
- }
- // 走到这里说明是重量级锁或者解锁时发生了竞争, 膨胀后调用重量级锁的 exit 方法.
- ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
- }
该方法中先判断是不是轻量级锁, 如果是轻量级锁则将替换 mark Word , 否则膨胀为重量级锁并调用 exit 方法, 相关逻辑将在重量级锁的文章中讲解.
来源: http://www.tuicool.com/articles/UjuyI3V