作为同步组件的基础, AQS 做了太多的工作, 自定义同步组件只需要简单地实现自定义方法, 然后加上 AQS 提供的模板方法, 就可以实现强大的自定义同步组件, 理解了 AQS, 其他理解起来就容易很多了.
Semaphore 介绍
信号量 Semaphore 是一个控制访问多个共享资源的计数器, 和 CountDownLatch 一样, 其本质上是一个 "共享锁".
Semaphore, 在 API 是这么介绍的:
一个计数信号量. 从概念上讲, 信号量维护了一个许可集. 如有必要, 在许可可用前会阻塞每一个 acquire(), 然后再获取该许可. 每个 release() 添加一个许可, 从而可能释放一个正在阻塞的获取者. 但是, 不使用实际的许可对象, Semaphore 只对可用许可的号码进行计数, 并采取相应的行动.
Semaphore 通常用于限制可以访问某些资源 (物理或逻辑的) 的线程数目.
下面我们就一个停车场的简单例子来阐述 Semaphore: 为了简单起见我们假设停车场仅有 5 个停车位, 一开始停车场没有车辆所有车位全部空着, 然后先后到来三辆车, 停车场车位够, 安排进去停车, 然后又来三辆, 这个时候由于只有两个停车位, 所有只能停两辆, 其余一辆必须在外面候着, 直到停车场有空车位, 当然以后每来一辆都需要在外面候着. 当停车场有车开出去, 里面有空位了, 则安排一辆车进去(至于是哪辆 要看选择的机制是公平还是非公平).
从程序角度看, 停车场就相当于信号量 Semaphore, 其中许可数为 5, 车辆就相对线程. 当来一辆车时, 许可数就会减 1 , 当停车场没有车位了(许可书 == 0 ), 其他来的车辆需要在外面等候着. 如果有一辆车开出停车场, 许可数 + 1, 然后放进来一辆车.
号量 Semaphore 是一个非负整数(>=1). 当一个线程想要访问某个共享资源时, 它必须要先获取 Semaphore, 当 Semaphore>0 时, 获取该资源并使 Semaphore - 1. 如果 Semaphore 值 = 0, 则表示全部的共享资源已经被其他线程全部占用, 线程必须要等待其他线程释放资源. 当线程释放资源时, Semaphore 则 + 1
实现分析
Semaphore 结构如下:
从上图可以看出 Semaphore 内部包含公平锁 (FairSync) 和非公平锁(NonfairSync), 继承内部类 Sync, 其中 Sync 继承 AQS(再一次阐述 AQS 的重要性).
Semaphore 提供了两个构造函数:
Semaphore(int permits) : 创建具有给定的许可数和非公平的公平设置的 Semaphore.
Semaphore(int permits, boolean fair) : 创建具有给定的许可数和给定的公平设置的 Semaphore.
实现如下:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore 默认选择非公平锁.
当信号量 Semaphore = 1 时, 它可以当作互斥锁使用. 其中 0,1 就相当于它的状态, 当 = 1 时表示其他线程可以获取, 当 = 0 时, 排他, 即其他线程必须要等待.
信号量获取
Semaphore 提供了 acquire()方法来获取一个许可.
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
内部调用 AQS 的 acquireSharedInterruptibly(int arg), 该方法以共享模式获取同步状态:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
在 acquireSharedInterruptibly(int arg)中, tryAcquireShared(int arg)由子类来实现, 对于 Semaphore 而言, 如果我们选择非公平模式, 则调用 NonfairSync 的 tryAcquireShared(int arg)方法, 否则调用 FairSync 的 tryAcquireShared(int arg)方法.
公平
protected int tryAcquireShared(int acquires) { for (;;) { // 判断该线程是否位于 CLH 队列的列头 if (hasQueuedPredecessors()) return -1; // 获取当前的信号量许可 int available = getState(); // 设置 "获得 acquires 个信号量许可之后, 剩余的信号量许可数" int remaining = available - acquires; //CAS 设置信号量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
非公平
对于非公平而言, 因为它不需要判断当前线程是否位于 CLH 同步队列列头, 所以相对而言会简单些.
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
信号量释放
获取了许可, 当用完之后就需要释放, Semaphore 提供 release()来释放许可.
public void release() { sync.releaseShared(1); }
内部调用 AQS 的 releaseShared(int arg):
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
releaseShared(int arg)调用 Semaphore 内部类 Sync 的 tryReleaseShared(int arg):
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); // 信号量的许可数 = 当前信号许可数 + 待释放的信号许可数 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 设置可获取的信号许可数为 next if (compareAndSetState(current, next)) return true; } }
应用示例
我们已停车为示例:
public class SemaphoreTest { static class Parking{ // 信号量 private Semaphore semaphore; Parking(int count){ semaphore = new Semaphore(count); } public void park(){ try { // 获取信号量 semaphore.acquire(); long time = (long) (Math.random() * 10); System.out.println(Thread.currentThread().getName() + "进入停车场, 停车" + time + "秒..." ); Thread.sleep(time); System.out.println(Thread.currentThread().getName() + "开出停车场..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } } static class Car extends Thread { Parking parking ; Car(Parking parking){ this.parking = parking; } @Override public void run() { parking.park(); // 进入停车场 } } public static void main(String[] args){ Parking parking = new Parking(3); for(int i = 0 ; i < 5 ; i++){ new Car(parking).start(); } }}
运行结果如下:
来源: https://juejin.im/entry/5aed2e9e6fb9a07acb3cc038