Semaphore 信号量通常做为控制线程并发个数的工具来使用, 它可以用来限制同时并发访问资源的线程个数.
一, Semaphore 使用
下面我们通过一个简单的例子来看下 Semaphore 的具体使用, 我们同时执行 10 个计数线程, 并定义一个 Semaphore 变量用来控制并发值, 同一时间只允许两个线程并发执行;
- public static void main(String[] args) {
- Semaphore semaphore = new Semaphore(2);
- // 启动计数线程
- for (int i = 1; i <= 10; i++) {
- new SemaphoreThread(semaphore).start();
- }
- }
计数线程
- public class SemaphoreThread extends Thread {
- private Semaphore semaphore;
- public SemaphoreThread(Semaphore semaphore) {
- this.semaphore = semaphore;
- }
- public void run() {
- try {
- semaphore.acquire();// 获取执行许可
- Thread.sleep(2000);
- System.out.println(this.getName() + "线程," + "开始进行计数");
- // 模拟计数时长
- Thread.sleep(2000);
- // 一个线程完成, 允许下一个线程开始计数
- System.out.println(this.getName() + "线程," + "计数完毕");
- semaphore.release();// 归还许可
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
输出结果
Thread-0 线程, 开始进行计数
Thread-1 线程, 开始进行计数
Thread-1 线程, 计数完毕
Thread-0 线程, 计数完毕
Thread-2 线程, 开始进行计数
Thread-3 线程, 开始进行计数
Thread-2 线程, 计数完毕
Thread-3 线程, 计数完毕
Thread-4 线程, 开始进行计数
Thread-5 线程, 开始进行计数
Thread-5 线程, 计数完毕
Thread-4 线程, 计数完毕
Thread-6 线程, 开始进行计数
Thread-7 线程, 开始进行计数
Thread-6 线程, 计数完毕
Thread-7 线程, 计数完毕
Thread-8 线程, 开始进行计数
Thread-9 线程, 开始进行计数
Thread-8 线程, 计数完毕
Thread-9 线程, 计数完毕
通过输出结果可以看出, Semaphore 根据我们设定的并发值限制了线程同时执行的个数, 每次只运行两个线程进行计数.
二, Semaphore 源码分析
接下来我们对 Semaphore 具体的内部实现进行分析与总结
1,Semaphore 的构造
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -2694183684443567898L;
- NonfairSync(int permits) {
- super(permits);
- }
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- }
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1192457210091910933L;
- /**
- 1, 设置 AbstractQueuedSynchronizer 中同步状态的值 state, 也就是计数器的值.
- 2, 这个值 volatile 变量, 必须保证线程间的可见性;
- **/
- Sync(int permits) {
- setState(permits);
- }
- // 获取 state 的值
- final int getPermits() {
- return getState();
- }
- // 通过 CAS 方式减少 state 值, 对应 Semaphore 的 acquire 获取许可
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();
- int remaining = available - acquires;
- if (remaining <0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- // 通过 CAS 方式增加 state 值, 对应 Semaphore 的 release 归还许可
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next))
- return true;
- }
- }
- // 减少许可
- final void reducePermits(int reductions) {
- for (;;) {
- int current = getState();
- int next = current - reductions;
- if (next> current) // underflow
- throw new Error("Permit count underflow");
- if (compareAndSetState(current, next))
- return;
- }
- }
- // 许可置 0
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
- }
通过代码可以看出 Semaphore 也是基于 AbstractQueuedSynchronizer 类来实现的, 它会根据你传入的并发线程数量来构造一个继承自 AbstractQueuedSynchronizer 的 Syc 实现类;
2,acquire 方法
Semaphore 的 acquire 方法实现获取执行许可, acquire 方法底层调用的其实是 AbstractQueuedSynchronizer 的 acquireSharedInterruptibly 方法, 我们看下具体代码
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- //tryAcquireShared 由 Semaphore 的 Sync 类的 nonfairTryAcquireShared 方法具体实现
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
从上面我们已经知道 nonfairTryAcquireShared 方法内部其实是一个针对 state 值减法操作, 并通过 CAS 操作改变同步状态 State 的值, 直到要获取的许可线程超过设置的并发值, tryAcquireShared(arg) 返回值小于 0, 执行 doAcquireSharedInterruptibly 方法开始尝试获取锁, 并进入阻塞;
3,release 方法
Semaphore 的 release 方法对应释放执行许可
- public void release() {
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- //tryAcquireShared 由 Semaphore 的 Sync 类的 tryReleaseShared 方法具体实现, 执行归还许可操作;
- if (tryReleaseShared(arg)) {
- // 释放锁状态, 唤醒阻塞线程
- doReleaseShared();
- return true;
- }
- return false;
- }
执行 tryReleaseShared 方法归还归许可, 对 state 值做加法操作, 没有问题的话返回 true 值, 执行 doReleaseShared 方法释放锁, 唤醒阻塞线程.
三, 总结
线程并发个数控制工具 Semaphore 类与 CountDownLatch 类似, 都是基于 AbstractQueuedSynchronizer 类实现的, 通过操作同步状态 state 值结合共享锁的模式控制一个或多个线程的执行从而实现具体的功能. 以上就是对 Semaphore 类使用与源码进行的分析与总结, 其中如有不足与不正确的地方还望指出与海涵.
来源: https://www.cnblogs.com/dafanjoy/p/11143086.html