1,CountDownLatch 实现
CountDownLatch 的实现基于 AQS 的共享模式, 其 Sync 实现如下:
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- // 初始的 count 值, state 中保存此 count 值
- Sync(int count) {
- setState(count);
- }
- int getCount() {
- return getState();
- }
- // 尝试获取共享锁, 只有当 state 为 0 时, 即计数值为 0 时才能获取到共享锁
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- // 尝试释放共享锁, 通过 CAS 的方式将 state 即 count 值减一
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- }
CountDownLatch 总体流程分三部:
锁计数值初始化: 即初始化 state 值为锁的 count 值;
await 过程: await 时会调用 tryAcquireShared()获取共享锁, 若此时 state 值为大于 0, 则会将当前线程节点插入同步队列尾部, 并阻塞当前线程;
countDown 过程: countDown 时会 tryReleaseShared()无阻塞地减少锁计数值, 当锁计数值减少到 0 时, 就会唤醒同步队列中阻塞的线程节点.
2,CyclicBarrier 实现
CyclicBarrier 是基于 ReentrantLock 和 Condition 实现的锁工具.
实现源码:
- // 构造函数, parties 为初始化的计数器, barrierAction 为当 parties 计数减到 0 时的回调
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- // 初始计数
- this.parties = parties;
- // 本轮计数值
- this.count = parties;
- // 计数为 0 时的回调
- this.barrierCommand = barrierAction;
- }
- // 等待
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- //h 获取锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 获取当前分代
- final Generation g = generation;
- // 此次分代已被停止?
- if (g.broken)
- throw new BrokenBarrierException();
- // 当前线程被终止?
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // 分代计数值减 1, 若计数值减到 0, 则运行回调, 并调用 nextGeneration()
- // 唤醒所有在等待的线程, 同时更新分代信息
- int index = --count;
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- // 计数值未减到 0, 则需要等待
- for (;;) {
- try {
- // 无时间条件等待
- if (!timed)
- trip.await();
- // 设置了超时时间等待
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException IE) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- // 等待的过程中分代已经停止, 则抛出异常
- // 原可能是某个线程在等待的过程中线程被中断
- if (g.broken)
- throw new BrokenBarrierException();
- // 分代已经更新? 说明是被 signal 信号唤醒的
- if (g != generation)
- return index;
- // 等待超时?
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
3,Semaphore 实现
Semaphore 的实现是基于 AQS 的共享锁, 有公平和非公平两种模式.
基本的 Sync 实现:
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1192457210091910933L;
- // 初始的许可数量, 同步锁的 state 保存许可的数量
- Sync(int permits) {
- setState(permits);
- }
- // 获取许可数量
- final int getPermits() {
- return getState();
- }
- // 非公平的方式获取共享锁
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();
- // 可用许可数减去需求的许可数
- int remaining = available - acquires;
- // 许可数大于 0 时, CAS 获取许可
- if (remaining <0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- // 释放许可
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- // 许可数加锁释放的许可数
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- //CAS 更新许可, 直到成功
- if (compareAndSetState(current, next))
- return true;
- }
- }
- // 减少许可
- final void reducePermits(int reductions) {
- for (;;) {
- int current = getState();
- int next = current - reductions;
- if (next> current) // underflow
- throw new Error("Permit count underflow");
- //
- if (compareAndSetState(current, next))
- return;
- }
- }
- // 将许可消耗完
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
- }
公平的 FairSync 实现::
- static final class FairSync extends Sync {
- private static final long serialVersionUID = 2014338818796000944L;
- FairSync(int permits) {
- super(permits);
- }
- // 公平方式获取许可
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- // 当前节点有前驱节点, 则获取失败
- if (hasQueuedPredecessors())
- return -1;
- // 无前驱节点时, CAS 方式获取许可
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- }
非公平的 Sync 实现:
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -2694183684443567898L;
- NonfairSync(int permits) {
- super(permits);
- }
- // 直接使用 Sync 的 nonfairTryAcquireShared()实现, 非公平方式获取许可
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- }
相关对外接口实现:
- // 信号量初始化, permits: 初始许可数; fire: 是否公平
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
- // 获取许可
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- // 释放许可
- public void release() {
- sync.releaseShared(1);
- }
4, 特性对比
区别:
CountDownLatch 使一个线程 A 或是组线程 A 等待其它线程执行完毕后, 线程 A 或是组线程 A 才继续执行. CyclicBarrier: 一组线程使用 await()指定 barrier, 所有线程都到达各自的 barrier 后, 再同时执行各自 barrier 下面的代码. Semaphore: 是用来控制同时访问特定资源的线程数量, 它通过协调各个线程, 以保证合理的使用公共资源.
CountDownLatch 是减计数方式, 计数 ==0 时释放所有等待的线程; CyclicBarrier 是加计数方式, 计数达到构造方法中参数指定的值时释放所有等待的线程. Semaphore, 每次 semaphore.acquire(), 获取一个资源, 每次 semaphore.acquire(n), 获取 n 个资源, 当达到 semaphore 指定资源数量时就不能再访问线程处于阻塞, 必须等其它线程释放资源, semaphore.relase()每次资源一个资源, semaphore.relase(n)每次资源 n 个资源.
CountDownLatch 当计数到 0 时, 计数无法被重置; CyclicBarrier 计数达到指定值时, 计数置为 0 重新开始.
CountDownLatch 每次调用 countDown()方法计数减一, 调用 await()方法只进行阻塞, 对计数没任何影响; CyclicBarrier 只有一个 await()方法, 调用 await()方法计数加 1, 若加 1 后的值不等于构造方法的值, 则线程阻塞.
CountDownLatch,CyclikBarrier,Semaphore 都有一个 int 类型参数的构造方法. CountDownLatch,CyclikBarrier 这个值作为计数用, 达到该次数即释放等待的线程, 而 Semaphore 中所有 acquire 获取到的资源达到这个数, 会使得其它线程阻塞.
共同:
CountDownLatch 与 CyclikBarrier 两者的共同点是都具有 await()方法, 并且执行此方法会引起线程的阻塞, 达到某种条件才能继续执行 (这种条件也是两者的不同).Semaphore,acquire 方获取的资源达到最大数量时, 线程再次 acquire 获取资源时, 也会使线程处于阻塞状态. CountDownLatch 与 CyclikBarrier 两者的共同点是都具有 await() 方法, 并且执行此方法会引起线程的阻塞, 达到某种条件才能继续执行(这种条件也是两者的不同).Semaphore,acquire 方获取的资源达到最大数量时, 线程再次 acquire 获取资源时, 也会使线程处于阻塞状态. CountDownLatch,CyclikBarrier,Semaphore 都有一个 int 类型参数的构造方法.
CountDownLatch,CyclikBarrier,Semaphore 都有一个 int 类型参数的构造方法.
CountdownLatch 和 CyclicBarrier 的区别:
CountDownLatch 简单的说就是一个线程等待, 直到他所等待的其他线程都执行完成并且调用 countDown()方法发出通知后, 当前线程才可以继续执行.
CyclicBarrier 是所有线程都进行等待, 直到所有线程都准备好进入 await()方法之后, 所有线程同时开始执行!
CountDownLatch 的计数器只能使用一次. 而 CyclicBarrier 的计数器可以使用 reset() 方法重置. 所以 CyclicBarrier 能处理更为复杂的业务场景, 比如如果计算发生错误, 可以重置计数器, 并让线程们重新执行一次.
Semaphore:
Semaphore 翻译成字面意思为 信号量, Semaphore 可以控同时访问的线程个数, 通过 acquire() 获取一个许可, 如果没有就等待, 而 release() 释放一个许可.
来源: http://www.jianshu.com/p/7a71c35dc5ce