概述
对象锁
一般指 synchronized, 和对象有关
每个对象都有个隐形的监视器, 用于线程的同步
线程状态: 创建 (new)-> 就绪 (start)-> 运行 (run)-> 阻塞 (lock/wait/join/sleep)-> 销毁
ReentrantLock
互斥锁
可重入
Condition
实现 wait,notify,notifyAll 的功能
ReadWriteLock - ReentrantReadWriteLock
共享锁: 读读共享
ReentrantLock
synchronized 的功能增强版; JDK 1.5 之前性能优于 synchronized; 现在性能相差不多
- // 简单用法
- public static ReentrantLock lock = new ReentrantLock();
- try {
- lock.lock();
- i++;
- } finally {
- lock.unlock(); // 保证异常情况下也释放; synchronized 由 JVM 释放
- }
可重入
获取计数器, 锁几次, 就需要释放几次
可中断
lock.lock()
不能响应中断
在 park 时被中断, park 会解除, 设置 interrupt 标识, 但是不抛异常, 线程继续
lock.lockInterruptibly()
能响应中断
被 lock.lockInterruptibly() block 的线程, 收到 interrupt 信号时, 会抛出 InterruptedException, 进而退出
可限时
lock.tryLock(long timeout, TimeUnit unit), 超时返回 false
公平锁
默认非公平
先尝试抢占, 再排队
公平
直接排队
public ReentrantLock(boolean fair)
synchronized 是非公平锁
- Condition
- public static ReentrantLock lock = new ReentrantLock();
- public static Condition condition = lock.newCondition();
lock 内部维护一个同步队列
同步队列头的类型是 AbstractQueuedSynchronizer
condition 内部维护一个条件队列
条件队列头的类型为 AbstractQueuedSynchronizer$ConditionObject
一个 lock 可以有多个 condition, 即多个条件队列
持锁者才可以执行 await
因为 await 中有 release 的逻辑, 会抛 IllegalMonitorStateException
condition.await(): 类似 Object.wait()
持锁线程将自己加入条件队列
release 同步队列中后继节点
unparkSuccessor(), 这步一定要做, 不然同步队列里的线程就死在那里了
调用 LockSupport.park()将自己阻塞
当有别的线程调用了 signal(), 实际就是调用 LockSupport.unpark()把该线程唤醒
被唤醒线程继续 await()方法中的剩余逻辑
acquireQueued, 这是 lock.lock()中的逻辑, 即获取锁
condition.signal():Object.notify()
唤醒条件队列头节点
线程进入同步队列
- condition.signalAll():Object.notifyAll()
- awaitUninterruptibly()
与 await()不同: 不会在等待过程中响应中断
Semaphore
概述
- Semaphore semaphore = new Semaphore(5);
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- NonfairSync(int permits) {
- super(permits);
- }
- Sync(int permits) {
- setState(permits);
- }
- AbstractQueuedSynchronizer:
- protected final void setState(int newState) {
- state = newState;
- }
- // 公平锁
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
默认非公平
构造方法参数即许可数, 被设置到了 AbstractQueuedSynchronizer 的 state 属性
线程先尝试获得许可, 如果成功, 许可数 - 1; 线程运行结束后释放许可, 许可数 + 1
许可数为 0, 则获取失败; 线程进入 AQS 的同步队列, 被其他释放许可的线程唤醒
- acquire
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
比较 available(其实就是 state 属性)和 acquires
如果拿掉许可之后不小于 0, 就可以拿许可
若是小于 0, 则表示拿许可失败, 在 doAcquireSharedInterruptibly(arg)中将自己阻塞, 即进入同步队列
阻塞: 在 parkAndCheckInterrupt 中通过 LockSupport.park(this)进行自我阻塞
- release
- public void release() {
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next))
- return true;
- }
- }
将 AQS 中的 state 值 + 1
通过 doReleaseShared()唤醒同步队列中的第一个节点
场景
CountDownLatch
概述
CountDownLatch countDownLatch = new CountDownLatch(10);
有点高级 join 的感觉, 主线程调用 countDownLatch.await(), 进入等待
countDownLatch.countDown()执行 10 次, 把那个 10 消耗光
主线程继续
机制
CountDownLatch 持有一个内部类 Sync, 这个 10 会传给 AbstractQueuedSynchronizer 的 state 属性
执行 await 时, 只要 state 不等于 0, 线程就会进入阻塞
队列头是 AbstractQueuedSynchronizer 类型的, 貌似只有 Condition 例外, 其余的队列头应该都是 AQS 类型
countDown 方法中, 当 state 变为 0 时(一定要是变为 0 哦, 不能本来就是 0)
执行 doReleaseShared 把同步队列里的第一个有效 Node 给 unpark, 其实就是上面被阻塞的那个主线程
- CyclicBarrier
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- // parties: 栅栏放开前需要调用的线程数
- // count: 当前剩余需要调用的线程数
- // barrierCommand: 调用线程数满足后, 栅栏放开前执行的命令
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
- // 用法, 每个线程调用 await, 耗完 parties 即可往下走
- CyclicBarrier.await
- final ReentrantLock lock = this.lock; // this 就是 CyclicBarrier
- lock.lock(); // 先拿锁
- int index = --count;
- if (index == 0) { // 判断 index, 为 0 表示线程数已达到
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null) // 可能是空的, 看构造函数了
- command.run();
- ranAction = true;
- nextGeneration(); // signalAll, 重置 count, 重置 generation
- return 0; // await 结束
- } finally {
- if (!ranAction) // command.run 异常时, 打破栅栏, 释放线程
- breakBarrier(); // signalAll, 重置 count, 设置 broken 属性
- }
- }
- trip.await // 自我阻塞
- private void nextGeneration() {
- // signal completion of last generation
- trip.signalAll(); // trip:lock 生成的 Condition
- // set up next generation
- count = parties;
- generation = new Generation();
- }
- private void breakBarrier() {
- generation.broken = true;
- count = parties;
- trip.signalAll();
- }
- LockSupport
- /**
- * Makes available the permit for the given thread, if it
- * was not already available. If the thread was blocked on
- * {@code park} then it will unblock. Otherwise, its next call
- * to {@code park} is guaranteed not to block. This operation
- * is not guaranteed to have any effect at all if the given
- * thread has not been started.
- *
- * @param thread the thread to unpark, or {@code null}, in which case
- * this operation has no effect
- */
- public static void unpark(Thread thread) {
- if (thread != null)
- UNSAFE.unpark(thread);
- }
- // 从上面注释可以看出, unpack 可以释放 park 状态线程, 或者将执行 park 的线程; 不过对于还没开始执行的线程, unpark 并不保证效果
- /**
- * Disables the current thread for thread scheduling purposes unless the
- * permit is available.
- *
- * <p>If the permit is available then it is consumed and the call
- * returns immediately; otherwise the current thread becomes disabled
- * for thread scheduling purposes and lies dormant until one of three
- * things happens:
- *
- * <ul>
- *
- * <li>Some other thread invokes {@link #unpark unpark} with the
- * current thread as the target; or
- *
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread; or
- *
- * <li>The call spuriously (that is, for no reason) returns.
- * </ul>
- *
- * <p>This method does <em>not</em> report which of these caused the
- * method to return. Callers should re-check the conditions which caused
- * the thread to park in the first place. Callers may also determine,
- * for example, the interrupt status of the thread upon return.
- */
- public static void park() {
- UNSAFE.park(false, 0L);
- }
测试 park 的线程能不能被 interrupt
可以的, 而且不抛异常
来源: http://www.jianshu.com/p/5a8ed6ab2d14