Semaphore(信号量) 用来控制同时访问特定资源的线程数量, 它通过协调各个线程, 以保证合理的使用公共资源.
Semaphore 提供了一个许可证的概念, 可以把这个许可证看作公共汽车车票, 只有成功获取车票的人才能够上车, 并且车票是有一定数量的, 不可能毫无限制的发下去, 这样就会导致公交车超载. 所以当车票发完的时候 (公交车以满载), 其他人就只能等下一趟车了. 如果中途有人下车, 那么他的位置将会空闲出来, 因此如果这时其他人想要上车的话就又可以获得车票了.
构造函数
Semaphore 类提供了 2 种构造函数, 分别如下:
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
这两个构造方法, 都必须提供许可的数量, 第二个构造方法可以指定是公平模式还是非公平模式, 默认非公平模式.
在 ReentrantLock 中公平锁和非公平锁获取锁机制的差别: 对于公平锁而言, 如果当前线程不在 CLH 队列的头部, 则需要排队等候, 而非公平锁则不同, 它无论当前线程处于 CLH 队列的何处都会直接获取锁. 所以公平信号量和非公平信号量的区别也一样.
获取许可证
Semaphore 类提供了 4 种获取许可证的方法, 分别如下:
- // 获取一个许可证 (响应中断), 在没有可用的许可证时当前线程被阻塞.
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- // 获取一个许可证 (不响应中断)
- public void acquireUninterruptibly() {
- sync.acquireShared(1);
- }
- // 尝试获取许可证 (非公平获取), 立即返回结果 (非阻塞).
- public boolean tryAcquire() {
- return sync.nonfairTryAcquireShared(1)>= 0;
- }
- // 尝试获取许可证 (定时获取)
- public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
下面我们一起看看, acquire() 是如何获取许可证的? 其源码如下:
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) <0)
- doAcquireSharedInterruptibly(arg);
- }
acquireSharedInterruptibly 方法首先就是去调用 tryAcquireShared 方法去尝试获取, tryAcquireShared 在 AQS 里面是抽象方法, FairSync 和 NonfairSync 这两个派生类实现了该方法的逻辑. FairSync 实现的是公平获取的逻辑, 而 NonfairSync 实现的非公平获取的逻辑.
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1192457210091910933L;
- Sync(int permits) {
- setState(permits);
- }
- final int getPermits() {
- return getState();
- }
- // 非公平方式尝试获取
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 获取可用许可证
- int available = getState();
- // 获取剩余许可证
- int remaining = available - acquires;
- // 1. 如果 remaining 小于 0 则直接返回 remaining
- // 2. 如果 remaining 大于 0 则先更新同步状态再返回 remaining
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next))
- return true;
- }
- }
- final void reducePermits(int reductions) {
- for (;;) {
- int current = getState();
- int next = current - reductions;
- if (next> current) // underflow
- throw new Error("Permit count underflow");
- if (compareAndSetState(current, next))
- return;
- }
- }
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
- }
- // 非公平同步器
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -2694183684443567898L;
- NonfairSync(int permits) {
- super(permits);
- }
- // 尝试获取许可证
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- }
- // 公平同步器
- static final class FairSync extends Sync {
- private static final long serialVersionUID = 2014338818796000944L;
- FairSync(int permits) {
- super(permits);
- }
- // 尝试获取许可证
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- // 判断同步队列前面存在排队
- if (hasQueuedPredecessors())
- return -1;
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- }
非公平获取锁的逻辑是先取出当前同步状态 (同步状态表示许可证个数), 将当前同步状态减去参入的参数, 如果结果不小于 0 的话证明还有可用的许可证, 那么就直接使用 CAS 操作更新同步状态的值, 最后不管结果是否小于 0 都会返回该结果值.
公平获取锁的逻辑仅仅是在此之前先去调用 hasQueuedPredecessors 方法判断同步队列是否存在排队, 如果有的话就直接返回 - 1 表示获取失败, 否则继续执行和非公平获取一样的步骤.
释放许可证
Semaphore 类提供了 2 种释放许可证方法, 分别如下:
- public void release() {
- sync.releaseShared(1);
- }
- public void release(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.releaseShared(permits);
- }
调用 release 方法是释放一个许可证, 它的操作很简单, 就调用了 AQS 的 releaseShared 方法.
- // 释放锁的操作 (共享模式)
- public final boolean releaseShared(int arg) {
- //1. 尝试去释放锁
- if (tryReleaseShared(arg)) {
- //2. 如果释放成功就唤醒其他线程
- doReleaseShared();
- return true;
- }
- return false;
- }
AQS 的 releaseShared 方法首先调用 tryReleaseShared 方法尝试释放锁, 其实现逻辑在子类 Sync 里面.
- // 尝试释放操作
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 获取当前同步状态
- int current = getState();
- // 将当前同步状态加上传入的参数
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- // 以 CAS 方式更新同步状态的值, 更新成功则返回 true, 否则继续循环
- if (compareAndSetState(current, next))
- return true;
- }
- }
tryReleaseShared 方法里面采用 for 循环进行自旋, 首先获取同步状态, 将同步状态加上传入的参数, 然后以 CAS 方式更新同步状态, 更新成功就返回 true 并跳出方法, 否则就继续循环直到成功为止.
动手写个连接池
下面我们就来利用 Semaphore 实现一个简单的数据库连接池, 通过这个例子希望读者们能更加深入的掌握 Semaphore 的运用.
- public class ConnectPool {
- // 连接池大小
- private int size;
- // 数据库连接集合
- private Connect[] connects;
- // 连接状态标志
- private boolean[] connectFlag;
- // 剩余可用连接数
- private volatile int available;
- // 信号量
- private Semaphore semaphore;
- // 构造器
- public ConnectPool(int size) {
- this.size = size;
- this.available = size;
- semaphore = new Semaphore(size, true);
- connects = new Connect[size];
- connectFlag = new boolean[size];
- initConnects();
- }
- // 初始化连接
- private void initConnects() {
- // 生成指定数量的数据库连接
- for(int i = 0; i < this.size; i++) {
- connects[i] = new Connect();
- }
- }
- // 获取数据库连接
- private synchronized Connect getConnect(){
- for(int i = 0; i < connectFlag.length; i++) {
- // 遍历集合找到未使用的连接
- if(!connectFlag[i]) {
- // 将连接设置为使用中
- connectFlag[i] = true;
- // 可用连接数减 1
- available--;
- System.out.println("["+Thread.currentThread().getName()+"] 以获取连接 剩余连接数:" + available);
- // 返回连接引用
- return connects[i];
- }
- }
- return null;
- }
- // 获取一个连接
- public Connect openConnect() throws InterruptedException {
- // 获取许可证
- semaphore.acquire();
- // 获取数据库连接
- return getConnect();
- }
- // 释放一个连接
- public synchronized void release(Connect connect) {
- for(int i = 0; i < this.size; i++) {
- if(connect == connects[i]){
- // 将连接设置为未使用
- connectFlag[i] = false;
- // 可用连接数加 1
- available++;
- System.out.println("["+Thread.currentThread().getName()+"] 以释放连接 剩余连接数:" + available);
- // 释放许可证
- semaphore.release();
- }
- }
- }
- // 剩余可用连接数
- public int available() {
- return available;
- }
- }
测试代码:
- public class TestThread extends Thread {
- private static ConnectPool pool = new ConnectPool(3);
- @Override
- public void run() {
- try {
- Connect connect = pool.openConnect();
- Thread.sleep(100); // 休息一下
- pool.release(connect);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- for(int i = 0; i < 10; i++) {
- new TestThread().start();
- }
- }
- }
使用一个数组来存放数据库连接的引用, 在初始化连接池的时候会调用 initConnects 方法创建指定数量的数据库连接, 并将它们的引用存放到数组中, 此外还有一个相同大小的数组来记录连接是否可用.
每当外部线程请求获取一个连接时, 首先调用 semaphore.acquire() 方法获取一个许可证, 然后将连接状态设置为使用中, 最后返回该连接的引用. 许可证的数量由构造时传入的参数决定, 每调用一次 semaphore.acquire() 方法许可证数量减 1, 当数量减为 0 时说明已经没有连接可以使用了, 这时如果其他线程再来获取就会被阻塞. 每当线程释放一个连接的时候会调用 semaphore.release() 将许可证释放, 此时许可证的总量又会增加, 代表可用的连接数增加了, 那么之前被阻塞的线程将会醒来继续获取连接, 这时再次获取就能够成功获取连接了.
来源: https://juejin.im/post/5afa3fe56fb9a07acb3d0186