Semaphore 的作用: 限制线程并发的数量
位于 java.util.concurrent 下,
构造方法
- // 构造函数 代表同一时间, 最多允许 permits 执行 acquire() 和 release() 之间的代码.
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- //False: 表示非公平信号量, 即线程启动的顺序与调用 semaphore.acquire() 的顺序无关, 也就是线程先启动了并不代表先获得 许可
- //True: 公平信号量, 即线程启动的顺序与调用 semaphore.acquire() 的顺序有关, 也就是先启动的线程优先获得许可
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
方法 acquire(n) 的功能是每调用 1 次此方法, 就消耗掉 n 个许可.
方法 release(n) 的功能是每调用 1 次此方法, 就动态添加 n 个许可.
方法 acquireUnnterruptibly() 作用是是等待进入 acquire() 方法的线程不允许被中断.
方法 availablePermits() 返回 Semaphore 对象中当前可以用的许可数.
方法 drainPermits() 获取并返回所有的许可个数, 并且将可用的许可重置为 0
方法 getQueueLength() 的作用是取得等待的许可的线程个数
方法 hasQueueThreads() 的作用是判断有没有线程在等待这个许可
方法 tryAcquire() 的作用是尝试获取 1 个许可. 如果获取不到则返回 false, 通常与 if 语句结合使用, 其具有无阻塞的特点. 无阻塞的特点可以使不至于在同步处于一直持续等待的状态.
方法 tryAcquire(n) 的作用是尝试获取 n 个许可, 如果获取不到则返回 false
方法 tryAcquire(long timeout,TimeUnit unit) 的作用是在指定的时间内尝试获取 1 个许可, 如果获取不到则返回 false
方法 tryAcquire(int permits,long timeout,TimeUnit unit) 的作用是在指定的时间内尝试获取 n 个许可, 如果获取不到则返回 false
使用:
多进路 - 多处理 - 多出路: 允许多个线程同时处理任务
- private static Semaphore semaphore = new Semaphore(4);
- public static void main(String[] args) {
- for (int i = 0; i <10; i++) {
- new Thread(new Runnable(){
- @Override
- public void run(){
- System.out.println(Thread.currentThread().getName()+"正在运行");
- try {
- semaphore.acquire();// // 请求获得许可, 如果有可获得的许可则继续往下执行, 许可数减 1. 否则进入阻塞状态
- System.out.println(Thread.currentThread().getName() + "进入, 并发数:" + (4-semaphore.availablePermits()));
- Thread.sleep(3000);
- System.out.println(Thread.currentThread().getName() +"即将离开, 并发数:" + (4-semaphore.availablePermits()));
- semaphore.release();// 释放许可, 许可数加 1
- }catch( Exception e){
- e.printStackTrace();
- } finally {
- }
- }
- }).start();
- }
- }
运行结果:
Thread-1 正在运行
Thread-6 正在运行
Thread-7 正在运行
Thread-5 正在运行
Thread-3 正在运行
Thread-2 正在运行
Thread-4 正在运行
Thread-0 正在运行
Thread-5 进入, 并发数: 4
Thread-9 正在运行
Thread-7 进入, 并发数: 3
Thread-8 正在运行
Thread-6 进入, 并发数: 2
Thread-1 进入, 并发数: 1
Thread-7 即将离开, 并发数: 4
Thread-5 即将离开, 并发数: 4
Thread-1 即将离开, 并发数: 4
Thread-2 进入, 并发数: 4
Thread-6 即将离开, 并发数: 4
Thread-4 进入, 并发数: 4
Thread-0 进入, 并发数: 4
Thread-3 进入, 并发数: 3
Thread-3 即将离开, 并发数: 4
Thread-4 即将离开, 并发数: 4
Thread-0 即将离开, 并发数: 4
Thread-2 即将离开, 并发数: 4
Thread-8 进入, 并发数: 4
Thread-9 进入, 并发数: 4
Thread-8 即将离开, 并发数: 2
Thread-9 即将离开, 并发数: 2
当使用: 公平锁时: private static Semaphore semaphore = new Semaphore(num,true);
仔细看结果, 就会发现先运行的线程会优先处理
Thread-1 正在运行
Thread-5 正在运行
Thread-5 进入, 并发数: 2
Thread-4 正在运行
Thread-4 进入, 并发数: 3
Thread-2 正在运行
Thread-2 进入, 并发数: 4
Thread-3 正在运行
Thread-0 正在运行
Thread-9 正在运行
Thread-8 正在运行
Thread-7 正在运行
Thread-6 正在运行
Thread-1 进入, 并发数: 1
Thread-4 即将离开, 并发数: 4
Thread-5 即将离开, 并发数: 4
Thread-3 进入, 并发数: 4
Thread-2 即将离开, 并发数: 4
Thread-0 进入, 并发数: 4
Thread-9 进入, 并发数: 4
Thread-1 即将离开, 并发数: 3
Thread-8 进入, 并发数: 4
Thread-3 即将离开, 并发数: 4
Thread-7 进入, 并发数: 4
Thread-9 即将离开, 并发数: 4
Thread-8 即将离开, 并发数: 4
Thread-0 即将离开, 并发数: 4
Thread-6 进入, 并发数: 4
Thread-7 即将离开, 并发数: 2
Thread-6 即将离开, 并发数: 1
View Code
2, 多进路 - 单处理 - 多出路: 允许多个线程同时处理任务, 但是顺序却是同步的, 也就是阻塞的. 所以也称单处理.
在代码中加入 ReentrantLock 对象 , 或者使用 synchronized 代码块, 保存代码的同步性
或者 上面代码 private static Semaphore semaphore = new Semaphore(1);
运行结果:
Thread-0 正在运行
Thread-7 正在运行
Thread-6 正在运行
Thread-2 正在运行
Thread-4 正在运行
Thread-3 正在运行
Thread-5 正在运行
Thread-1 正在运行
Thread-9 正在运行
Thread-0 进入, 并发数: 1
Thread-8 正在运行
Thread-0 即将离开, 并发数: 1
Thread-7 进入, 并发数: 1
Thread-7 即将离开, 并发数: 1
Thread-6 进入, 并发数: 1
Thread-6 即将离开, 并发数: 1
Thread-2 进入, 并发数: 1
Thread-2 即将离开, 并发数: 1
Thread-4 进入, 并发数: 1
Thread-4 即将离开, 并发数: 1
Thread-3 进入, 并发数: 1
Thread-3 即将离开, 并发数: 1
Thread-5 进入, 并发数: 1
Thread-5 即将离开, 并发数: 1
Thread-1 进入, 并发数: 1
Thread-1 即将离开, 并发数: 1
Thread-9 进入, 并发数: 1
Thread-9 即将离开, 并发数: 1
Thread-8 进入, 并发数: 1
Thread-8 即将离开, 并发数: 1
此时, 实现了互斥锁的功能.
源码解析
Semaphore 内部基于 AQS 的共享模式, 所以实现都委托给了 Sync 类.
这里就看一下 NonfairSync 的构造方法:
- NonfairSync(int permits) {
- super(permits);
- }
可以看到直接调用了父类的构造方法, Sync 的构造方法如下:
- Sync(int permits) {
- setState(permits);
- }
可以看到调用了 setState 方法, 也就是说 AQS 中的资源就是许可证的数量.
获取许可
先从获取一个许可看起, 并且先看非公平模式下的实现. 首先看 acquire 方法, acquire 方法有几个重载, 但主要是下面这个方法
- public void acquire(int permits) throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
从上面可以看到, 调用了 Sync 的 acquireSharedInterruptibly 方法, 该方法在父类 AQS 中, 如下:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 如果线程被中断了, 抛出异常
- if (Thread.interrupted())
- throw new InterruptedException();
- // 获取许可失败, 将线程加入到等待队列中
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
AQS 子类如果要使用共享模式的话, 需要实现 tryAcquireShared 方法, 下面看 NonfairSync 的该方法实现:
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
该方法调用了父类中的 nonfairTyAcquireShared 方法, 如下:
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 获取剩余许可数量
- int available = getState();
- // 计算给完这次许可数量后的个数
- int remaining = available - acquires;
- // 如果许可不够或者可以将许可数量重置的话, 返回
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
从上面可以看到, 只有在许可不够时返回值才会小于 0, 其余返回的都是剩余许可数量, 这也就解释了, 一旦许可不够, 后面的线程将会阻塞. 看完了非公平的获取, 再看下公平的获取, 代码如下:
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- // 如果前面有线程再等待, 直接返回 - 1
- if (hasQueuedPredecessors())
- return -1;
- // 后面与非公平一样
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
从上面可以看到, FairSync 与 NonFairSync 的区别就在于会首先判断当前队列中有没有线程在等待, 如果有, 就老老实实进入到等待队列; 而不像 NonfairSync 一样首先试一把, 说不定就恰好获得了一个许可, 这样就可以插队了.
看完了获取许可后, 再看一下释放许可.
释放许可
释放许可也有几个重载方法, 但都会调用下面这个带参数的方法,
- public void release(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.releaseShared(permits);
- }
releaseShared 方法在 AQS 中, 如下:
- public final boolean releaseShared(int arg) {
- // 如果改变许可数量成功
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
AQS 子类实现共享模式的类需要实现 tryReleaseShared 类来判断是否释放成功, 实现如下:
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 获取当前许可数量
- int current = getState();
- // 计算回收后的数量
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- //CAS 改变许可数量成功, 返回 true
- if (compareAndSetState(current, next))
- return true;
- }
- }
从上面可以看到, 一旦 CAS 改变许可数量成功, 那么就会调用 doReleaseShared() 方法释放阻塞的线
减小许可数量
Semaphore 还有减小许可数量的方法, 该方法可以用于用于当资源用完不能再用时, 这时就可以减小许可证. 代码如下:
- protected void reducePermits(int reduction) {
- if (reduction < 0) throw new IllegalArgumentException();
- sync.reducePermits(reduction);
- }
可以看到, 委托给了 Sync,Sync 的 reducePermits 方法如下:
- final void reducePermits(int reductions) {
- for (;;) {
- // 得到当前剩余许可数量
- int current = getState();
- // 得到减完之后的许可数量
- int next = current - reductions;
- if (next> current) // underflow
- throw new Error("Permit count underflow");
- // 如果 CAS 改变成功
- if (compareAndSetState(current, next))
- return;
- }
- }
从上面可以看到, 就是 CAS 改变 AQS 中的 state 变量, 因为该变量代表许可证的数量.
获取剩余许可数量
Semaphore 还可以一次将剩余的许可数量全部取走, 该方法是 drain 方法, 如下:
- public int drainPermits() {
- return sync.drainPermits();
- }
Sync 的实现如下:
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
可以看到, 就是 CAS 将许可数量置为 0.
总结
Semaphore 是信号量, 用于管理一组资源. 其内部是基于 AQS 的共享模式, AQS 的状态表示许可证的数量, 在许可证数量不够时, 线程将会被挂起; 而一旦有一个线程释放一个资源, 那么就有可能重新唤醒等待队列中的线程继续执行.
参考: https://www.cnblogs.com/wujiaofen/p/11356436.html
----------------
来源: http://www.bubuko.com/infodetail-3382589.html