1,Lock 接口
1.1,Lock 玉 synchronized 的不同
Lock 不是 Java 语言内置的, synchronized 是 Java 语言的关键字, 因此是内置特性. Lock 是一个类, 通过这个类可以实现同步访问;
Lock 和 synchronized 有一点非常大的不同, 采用 synchronized 不需要用户去手动释放锁, 当 synchronized 方法或者 synchronized 代码块执行完之后, 系统会自动让线程释放对锁的占用; 而 Lock 则必须要用户去手动释放锁, 如果没有主动释放锁, 就有可能导致出现死锁现象.
Lock 在使用时需要显式地获取和释放锁, 虽然其缺少隐式获取锁的便捷性, 但却拥有了锁获取与释放的可操作性, 可中断的获取锁及超时获取锁等多种 synchronized 关键字所不具备的同步特性.
1.2,Lock 接口实现
- public interface Lock {
- // 获取锁, 若锁不可用, 则当前线程将会阻塞, 直到获得锁
- void lock();
- // 获取锁, 当被中断或获取到锁才返回;
- // 若锁不可用, 则当前线程被阻塞, 直到获取锁或被中断
- void lockInterruptibly() throws InterruptedException;
- // 尝试获取锁, 并立即返回; true: 获取锁成功; false: 获取锁失败
- boolean tryLock();
- // 尝试在指定的超时时间获取锁, 当获取到锁时返回 true; 当超时或被中断时返回 false
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
- // 释放锁
- void unlock();
- // 返回一个和锁绑定的条件队列;
- // 在等待条件之前线程必须先获取当前锁, 同时 await()方法会原子地释放锁
- // 并在返回之前重新获取到锁
- Condition newCondition();
- }
2,LockSupport 工具类
LockSupport 定义了一组的公共静态方法, 这些方法提供了最基本的线程阻塞和唤醒功能, 而 LockSupport 也成为构建同步组件的基础工具. LockSupport 定义了一组以 park 开头的方法用来阻塞当前线程, 以及 unpark(Thread thread)方法来唤醒一个被阻塞的线程.
2.1,LockSupport 的基本特征
重入性:
LockSupport 是非重入锁;
面向线程锁:
这是 LockSupport 很重要的一个特征, 也是与 synchronized,object,reentrantlock 最大的不同, 这样也就没有公平锁和非公平的区别, 所以无法只是依靠 LockSupport 实现单例模式. 同时面向线程锁的特征在一定程度上降低代码的耦合度.
锁 park 与解锁 unpark 顺序可颠倒性:
这个特征虽然是 Java 锁中比较独特是特征, 其实这个特征也是基于 LockSupport 的状态, 类似二元信号量 (0 和 1) 实现的.
解锁 unpark 的重复性:
unpark 可重复是指在解锁的时候可以重复调用 unpark; 同样因为 LockSupport 基于二元锁状态重复调用 unpark 并不会影响到下一次 park 操作;
2.2,LockSupport 与其他锁的比较
基于 LockSupport 提供的核心的功能, 即: park 阻塞与 unpark 恢复运行状态; 与此功能类似的有 Object 的 wait/notify 和 ReentrantLock.Condition 的 await(),signal()等;
锁的实现机制不同:
LockSupport 面向的是线程, 而 Object 和 ReentrantLock.Condition 都是典型的依赖一个对象实现锁机制;
锁的监视器依赖:
LockSupport 不需要依赖监视器, 在源码中可以发现 LockSupport 并没有提供 pubic 的构造器, 它的所有方法都是静态的; 在 Object 和 ReentrantLock.Condition 都是需要 new 一个自身对象作为监视器介质;
粒度:
粒度上很显然 LockSupport 粒度更细;
使用灵活度:
LockSupport 不需要依赖监视器一定程度上降低耦合而且解锁 unpark 和锁 park 顺序灵活;
2.3,LockSupport 的实现
- public class LockSupport {
- private LockSupport() {} // Cannot be instantiated.
- // 设置阻塞当前线程的操作者, 一般保存阻塞操作的线程 Thread 对象
- //parkBlocker 对象为线程 Thread.class 的属性 parkBlocker
- private static void setBlocker(Thread t, Object arg) {
- // Even though volatile, hotspot doesn't need a write barrier here.
- UNSAFE.putObject(t, parkBlockerOffset, arg);
- }
- // 唤醒给定线程;
- //1, 若给定线程的许可 (permit) 不可用, 则赋予给定线程许可;
- //2, 若给定线程被 park 阻塞, 则唤醒给定线程;
- //3, 若给定线程正常运行, 则本次 unpark 会保证下次针对该线程的 park 不会阻塞该线程
- //4, 当线程还未启动, umpark 操作不会有任何作用
- public static void unpark(Thread thread) {
- if (thread != null)
- UNSAFE.unpark(thread);
- }
- // 阻塞当前当前线程, 直到获得许可;
- // 当许可可用时, park 会立即返回;
- // 当许可不可用时, 当前线程将会阻塞, 直到以下事件发生
- //1, 其他线程调用 unpark 唤醒此线程
- //2, 其他线程调用 Thread#interrupt 中断线程
- //3, 调用应不可知错误返回
- //park 方法不会报告是什么原因导致的调用返回
- // 调用者需在返回时自行检查是什么条件导致调用返回
- public static void park(Object blocker) {
- Thread t = Thread.currentThread();
- setBlocker(t, blocker);
- UNSAFE.park(false, 0L);
- setBlocker(t, null);
- }
- // 功能与 park 基本相同, 此方法添加阻塞时间参数
- public static void parkNanos(Object blocker, long nanos) {
- if (nanos> 0) {
- Thread t = Thread.currentThread();
- setBlocker(t, blocker);
- UNSAFE.park(false, nanos);
- setBlocker(t, null);
- }
- }
- // 功能与 park 基本相同, 此方法添加阻塞到某个时间点的参数
- public static void parkUntil(Object blocker, long deadline) {
- Thread t = Thread.currentThread();
- setBlocker(t, blocker);
- UNSAFE.park(true, deadline);
- setBlocker(t, null);
- }
- // 返回给定线程的 parkBlocker 参数
- public static Object getBlocker(Thread t) {
- if (t == null)
- throw new NullPointerException();
- return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
- }
- //
- public static void park() {
- UNSAFE.park(false, 0L);
- }
- public static void parkNanos(long nanos) {
- if (nanos> 0)
- UNSAFE.park(false, nanos);
- }
- public static void parkUntil(long deadline) {
- UNSAFE.park(true, deadline);
- }
- //unsafe 对象
- private static final sun.misc.Unsafe UNSAFE;
- // 保存 parkBlocker 属性在 Thread 中的偏移量
- private static final long parkBlockerOffset;
- private static final long SEED;
- private static final long PROBE;
- private static final long SECONDARY;
- static {
- try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> tk = Thread.class;
- parkBlockerOffset = UNSAFE.objectFieldOffset
- (tk.getDeclaredField("parkBlocker"));
- SEED = UNSAFE.objectFieldOffset
- (tk.getDeclaredField("threadLocalRandomSeed"));
- PROBE = UNSAFE.objectFieldOffset
- (tk.getDeclaredField("threadLocalRandomProbe"));
- SECONDARY = UNSAFE.objectFieldOffset
- (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
- } catch (Exception ex) { throw new Error(ex); }
- }
- }
2.5,LockSupport 底层实现
LockSupport 的 Park 及 Unpark 是通过 Unsafe 的对应类实现的, 而 Unsafe 的相关实现在 jvm 中, 以下是 jvm 中 Unsafe.Park()及 Unsafe.Unpark()的 cpp 实现.
- UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
- HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
- EventThreadPark event;
- JavaThreadParkedState jtps(thread, time != 0);
- thread->parker()->park(isAbsolute != 0, time);
- if (event.should_commit()) {
- const oop obj = thread->current_park_blocker();
- if (time == 0) {
- post_thread_park_event(&event, obj, min_jlong, min_jlong);
- } else {
- if (isAbsolute != 0) {
- post_thread_park_event(&event, obj, min_jlong, time);
- } else {
- post_thread_park_event(&event, obj, time, min_jlong);
- }
- }
- }
- HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
- } UNSAFE_END
- UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
- Parker* p = NULL;
- if (jthread != NULL) {
- ThreadsListHandle tlh;
- JavaThread* thr = NULL;
- oop java_thread = NULL;
- (void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
- if (java_thread != NULL) {
- // This is a valid oop.
- if (thr != NULL) {
- // The JavaThread is alive.
- p = thr->parker();
- }
- }
- } // ThreadsListHandle is destroyed here.
- // 'p' points to type-stable-memory if non-NULL. If the target
- // thread terminates before we get here the new user of this
- // Parker will get a 'spurious' unpark - which is perfectly valid.
- if (p != NULL) {
- HOTSPOT_THREAD_UNPARK((uintptr_t) p);
- p->unpark();
- }
- } UNSAFE_END
而 Unsafe 中的相关实现是通过 os_posix.cpp 中的 Parker::park()及 Parker::unpark()实现的.
- void Parker::park(bool isAbsolute, jlong time) {
- // Optional fast-path check:
- // Return immediately if a permit is available.
- // We depend on Atomic::xchg() having full barrier semantics
- // since we are doing a lock-free update to _counter.
- if (Atomic::xchg(0, &_counter)> 0) return;
- Thread* thread = Thread::current();
- assert(thread->is_Java_thread(), "Must be JavaThread");
- JavaThread *jt = (JavaThread *)thread;
- // Optional optimization -- avoid state transitions if there's
- // an interrupt pending.
- if (Thread::is_interrupted(thread, false)) {
- return;
- }
- // Next, demultiplex/decode time arguments
- struct timespec absTime;
- if (time <0 || (isAbsolute && time == 0)) { // don't wait at all
- return;
- }
- if (time> 0) {
- to_abstime(&absTime, time, isAbsolute, false);
- }
- // Enter safepoint region
- // Beware of deadlocks such as 6317397.
- // The per-thread Parker:: mutex is a classic leaf-lock.
- // In particular a thread must never block on the Threads_lock while
- // holding the Parker:: mutex. If safepoints are pending both the
- // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
- ThreadBlockInVM tbivm(jt);
- // Don't wait if cannot get lock since interference arises from
- // unparking. Also re-check interrupt before trying wait.
- if (Thread::is_interrupted(thread, false) ||
- pthread_mutex_trylock(_mutex) != 0) {
- return;
- }
- int status;
- if (_counter> 0) { // no wait needed
- _counter = 0;
- status = pthread_mutex_unlock(_mutex);
- assert_status(status == 0, status, "invariant");
- // Paranoia to ensure our locked and lock-free paths interact
- // correctly with each other and Java-level accesses.
- OrderAccess::fence();
- return;
- }
- OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
- jt->set_suspend_equivalent();
- // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
- assert(_cur_index == -1, "invariant");
- if (time == 0) {
- _cur_index = REL_INDEX; // arbitrary choice when not timed
- status = pthread_cond_wait(&_cond[_cur_index], _mutex);
- assert_status(status == 0, status, "cond_timedwait");
- }
- else {
- _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
- status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
- assert_status(status == 0 || status == ETIMEDOUT,
- status, "cond_timedwait");
- }
- _cur_index = -1;
- _counter = 0;
- status = pthread_mutex_unlock(_mutex);
- assert_status(status == 0, status, "invariant");
- // Paranoia to ensure our locked and lock-free paths interact
- // correctly with each other and Java-level accesses.
- OrderAccess::fence();
- // If externally suspended while waiting, re-suspend
- if (jt->handle_special_suspend_equivalent_condition()) {
- jt->java_suspend_self();
- }
- }
- void Parker::unpark() {
- int status = pthread_mutex_lock(_mutex);
- assert_status(status == 0, status, "invariant");
- const int s = _counter;
- _counter = 1;
- // must capture correct index before unlocking
- int index = _cur_index;
- status = pthread_mutex_unlock(_mutex);
- assert_status(status == 0, status, "invariant");
- // Note that we signal() *after* dropping the lock for "immortal" Events.
- // This is safe and avoids a common class of futile wakeups. In rare
- // circumstances this can cause a thread to return prematurely from
- // cond_{timed}wait() but the spurious wakeup is benign and the victim
- // will simply re-test the condition and re-park itself.
- // This provides particular benefit if the underlying platform does not
- // provide wait morphing.
- if (s <1 && index != -1) {
- // thread is definitely parked
- status = pthread_cond_signal(&_cond[index]);
- assert_status(status == 0, status, "invariant");
- }
- }
由以上源码可知, 实现 park 及 unpark 的关键是 Mutex 互斥体, Condition 信号量,_counter 计数器.
park 主要流程:
当_counter> 0, 则直接调用 pthread_mutex_unlock 解锁并返回;
当超时时间 time>0, 则调用 pthread_cond_timedwait 进行超时等待, 直到超时时间到达;
当超时时间 time=0, 则调用 pthread_cond_wait 等待;
当 wait 返回时设置_counter = 0, 并调用 pthread_mutex_unlock 解锁;
unpark 主要流程:
调用 pthread_mutex_lock 获取锁, 设置_counter=1, 调用 pthread_mutex_unlock 解锁;
若_counter 的原值等于 0, 则调用 pthread_cond_signal 进行通知处理;
来源: http://www.jianshu.com/p/f32f941335aa