前言
Semaphore 也是 JUC 包中一个用于并发控制的工具类, 举个常用场景的例子: 有三台电脑五个人, 每个人都要用电脑注册一个自己的账户, 这时最开始只能同时有三个人操作电脑注册账户, 这三个人中有人操作完了剩下的两个人才能占用电脑注册自己的账户. 这就是 Semaphore 的经典使用场景, 跟并发加锁有点像, 只是我们的并发加锁同一时间只让有一个线程执行, 而 Semaphore 的加锁控制是允许同一时间有指定数量的线程同时执行, 超过这个数量就加锁控制.
一, 使用样例
- public static void main(String[] args) {
- Semaphore semaphore = new Semaphore(3); // 对比上面例子中的 3 台电脑
- for (int i = 0; i <5; i++) { // 对比上面例子中的 5 个人
- new Thread(() -> {
- try {
- semaphore.acquire(1); // 注意 acquire 中的值可以传任意值 >=0 的整数
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "acquire 1");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "release 1");
- semaphore.release(1);
- }).start();
- }
- }
执行结果为:
- Thread-0 acquire 1
- Thread-2 acquire 1
- Thread-1 acquire 1
- Thread-1release 1
- Thread-2release 1
- Thread-0release 1
- Thread-4 acquire 1
- Thread-3 acquire 1
- Thread-4release 1
- Thread-3release 1
可以看到同一时间只有三个线程获取到了锁, 这三个执行完释放了之后, 剩下两个菜获取锁执行. 下面看看源码是如何实现的.
二, 源码实现
1,Semaphore 构造器
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
可以看到, Semaphore 有两个构造器, 一个是只传数值默认非公平锁, 另一个可指定用公平锁还是非公平锁. permits 最终还是赋值给了 AQS 中的 state 变量.
2,acquire(1) 方法
- public void acquire(int permits) throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
此方法同样调用了 AQS 中的模板方法:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
1), 查看 tryAcquireShared 的实现方法
先看非公平锁的获取:
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();
- int remaining = available - acquires; // 如果 remaining 是负的, 说明当前剩余的信号量不够了, 需要阻塞
- if (remaining < 0 ||
- compareAndSetState(available, remaining)) // 如果 remaining<0 则直接 return, 不会走 CAS; 如果大于 0, 说明信号量还够, 可走 CAS 将信号量减掉, 成功则返回大于 0 的 remaining
- return remaining;
- }
- }
再看公平锁的获取:
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors()) // 判断是不是在队首, 不是的话直接返回 - 1
- return -1;
- int available = getState(); // 后面逻辑通非公平锁的获取逻辑
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
可以看到, 不管非公平锁和公平锁, 加锁时都是先判断当前 state 够不够减的, 如果减出负数返回获取锁失败, 是正数才走 CAS 将原信号量扣掉, 返回获取锁成功. 加锁时一个减 state 的过程.
2),doAcquireSharedInterruptibly
此方法还是 AQS 中的实现, 逻辑重复, 就不再说明了.
3,release(1) 方法
- public void release(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.releaseShared(permits);
- }
同样调用了 AQS 中的模板方法 releaseShared:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
其中 tryReleaseShared 的实现在 Semaphore 类的 Sync 中, 如下所示:
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- int next = current + releases; // 用当前 state 加上要释放的 releases
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next)) // 用 CAS 将 state 加上
- return true;
- }
- }
另一个方法 doReleaseShared 之前看过, 此处就不赘述了.
三, 小结
Semaphore 信号量类基于 AQS 的共享锁实现, 有公平锁和非公平锁两个版本. 它的加锁与释放锁的不同之处在于和普通的加锁释放锁反着, ReentrantLock 和 ReentrantReadWriteLock 中都是加锁时 state+1, 释放锁时 state-1, 而 Semaphore 中是加锁时 state 减, 释放锁时 state 加.
另外, 如果它还可以 acquire(2) ,release(1), 即获取的和释放的信号量可以不一致, 只是需要注意别释放的信号量太少导致后续任务获取不到足够的量而永久阻塞.
posted on 2019-12-22 10:31 张曾经 阅读 (...) 评论 (...) 编辑 收藏
来源: https://www.cnblogs.com/zzq6032010/p/12076687.html