基于 AQS 的前世今生, 来学习并发工具类 Semaphore. 本文将从 Semaphore 的应用场景, 源码原理解析来学习这个并发工具类.
1, 应用场景
Semaphore 用来控制同时访问某个特定资源的操作数量, 或者同时执行某个指定操作的数量. 还可以用来实现某种资源池限制, 或者对容器施加边界.
1.1 当成锁使用
控制同时访问某个特定资源的操作数量, 代码如下:
- public class SemaphoreLock {
- public static void main(String[] args) {
- //1, 信号量为 1 时 相当于普通的锁 信号量大于 1 时 共享锁
- Output o = new Output();
- for (int i = 0; i <5; i++) {
- new Thread(() -> o.output()).start();
- }
- }
- }
- class Output {
- Semaphore semaphore = new Semaphore(1);
- public void output() {
- try {
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName() + "start at" + System.currentTimeMillis());
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() + "stop at" + System.currentTimeMillis());
- }catch(Exception e) {
- e.printStackTrace();
- }finally {
- semaphore.release();
- }
- }
- }
1.2 线程通信信号
线程间通信, 代码如下:
- public class SemaphoreCommunication {
- public static void main(String[] args) {
- //2, 线程间进行通信
- Semaphore semaphore = new Semaphore(1);
- new SendingThread(semaphore,"SendingThread");
- new ReceivingThread(semaphore,"ReceivingThread");
- }
- }
- class SendingThread extends Thread {
- Semaphore semaphore;
- String name;
- public SendingThread(Semaphore semaphore,String name) {
- this.semaphore = semaphore;
- this.name = name;
- new Thread(this).start();
- }
- public void run() {
- try {
- semaphore.acquire();
- for (int i = 0; i <5; i++) {
- System.out.println(name + ":" + i);
- Thread.sleep(1000);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- semaphore.release();
- }
- }
- class ReceivingThread extends Thread {
- Semaphore semaphore;
- String name;
- public ReceivingThread(Semaphore semaphore,String name) {
- this.semaphore = semaphore;
- this.name = name;
- new Thread(this).start();
- }
- public void run() {
- try {
- semaphore.acquire();
- for (int i = 0; i < 5; i++) {
- System.out.println(name + ":" + i);
- Thread.sleep(1000);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- semaphore.release();
- }
- }
1.3 资源池限制
对资源池进行资源限制, 代码如下:
- public class SemaphoreConnect {
- public static void main(String[] args) throws Exception {
- //3, 模拟连接池数量限制
- ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 0; i < 200; i++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- Connection.getInstance().connect();
- }
- });
- }
- executorService.shutdown();
- executorService.awaitTermination(1, TimeUnit.DAYS);
- }
- }
- class Connection {
- private static Connection instance = new Connection();
- private Semaphore semaphores = new Semaphore(10,true);
- private int connections = 0;
- private Connection() {
- }
- public static Connection getInstance() {
- return instance;
- }
- public void connect() {
- try {
- semaphores.acquire();
- doConnect();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- semaphores.release();
- }
- }
- private void doConnect() {
- synchronized (this) {
- connections ++;
- System.out.println("current get connections is :" + connections);
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- synchronized (this) {
- connections --;
- System.out.println("after release current connections is :" + connections);
- }
- }
- }
1.4 容器边界限制
对容器进行边界限制, 代码如下:
- public class SemaphoreBoundedList {
- public static void main(String[] args) {
- //4, 容器边界限制
- final BoundedList ba = new BoundedList(5);
- Runnable runnable1 = new Runnable() {
- public void run() {
- try {
- ba.add("John");
- ba.add("Martin");
- ba.add("Adam");
- ba.add("Prince");
- ba.add("Tod");
- System.out.println("Available Permits :" + ba.getSemaphore().availablePermits());
- ba.add("Tony");
- System.out.println("Final list:" + ba.getArrayList());
- }catch (InterruptedException IE) {
- Thread.interrupted();
- }
- }
- };
- Runnable runnable2 = new Runnable() {
- public void run() {
- try {
- System.out.println("Before removing elements:"+ ba.getArrayList());
- Thread.sleep(5000);
- ba.remove("Martin");
- ba.remove("Adam");
- }catch (InterruptedException IE) {
- Thread.interrupted();
- }
- }
- };
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- }
- }
- class BoundedList<T> {
- private final Semaphore semaphore;
- private List arrayList;
- BoundedList(int limit) {
- this.arrayList = Collections.synchronizedList(new ArrayList());
- this.semaphore = new Semaphore(limit);
- }
- public boolean add(T t) throws InterruptedException {
- boolean added = false;
- semaphore.acquire();
- try {
- added = arrayList.add(t);
- return added;
- } finally {
- if (!added)
- semaphore.release();
- }
- }
- public boolean remove(T t) {
- boolean wasRemoved = arrayList.remove(t);
- if (wasRemoved)
- semaphore.release();
- return wasRemoved;
- }
- public void remove(int index) {
- arrayList.remove(index);
- semaphore.release();
- }
- public List getArrayList() {
- return arrayList;
- }
- public Semaphore getSemaphore() {
- return semaphore;
- }
- }
2, 源码原理解析
2.1 获取信号
获取信号的方法如下:
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);// 共享式获取 AQS 的同步状态
- }
调用的是 AQS 的 acquireSharedInterruptibly 方法:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())// 线程中断 说明信号量对线程中断敏感
- throw new InterruptedException();
- if (tryAcquireShared(arg) <0) // 获取信号量失败 线程进入同步队列自旋等待
- doAcquireSharedInterruptibly(arg);
- }
其中 tryAcquireShared 依赖的是 Sync 的实现, Sync 提供了公平和非公平式的方式, 先看非公平式.
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();// 同步状态 当前的信号量许可数
- int remaining = available - acquires;// 减去释放的信号量 剩余信号量许可数
- if (remaining < 0 ||// 剩余信号量小于 0 直接返回 remaining 不做 CAS
- compareAndSetState(available, remaining))//CAS 更新
- return remaining;
- }
- }
再看下公平式的.
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors())// 判断同步队列如果存在前置节点 获取信号量失败 其他和非公平式是一致的
- return -1;
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
最后来看下, 如果未获取到信号量的处理方法 doAcquireSharedInterruptibly.
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);// 线程进入同步队列
- boolean failed = true;
- try {
- for (;;) {// 自旋
- final Node p = node.predecessor();
- if (p == head) {// 当前节点的前置节点是 AQS 的头节点 即自己是 AQS 同步队列的第一个节点
- int r = tryAcquireShared(arg); // 再去获取信号量
- if (r>= 0) {// 获取成功
- setHeadAndPropagate(node, r);// 退出自旋
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node); // 获取失败 就取消获取
- }
- }
2.2 释放信号
释放信号的方法如下:
- public void release() {
- sync.releaseShared(1);
- }
调用的是 AQS 的 releaseShared 方法:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {// 释放信号量
- doReleaseShared();// 唤醒后续的线程节点
- return true;
- }
- return false;
- }
tryReleaseShared 交由子类 Sync 实现, 代码如下:
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();// 当前信号量许可数
- int next = current + releases; // 当前信号量许可数 + 释放的信号量许可数
- if (next < current) // overflow 这个分支我看着永远走不进来呢
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next))//CAS 更新当前信号量许可数
- return true;
- }
- }
释放成功后, 则继续调用 doReleaseShared, 唤醒后续线程节点可以来争取信号量了.
- private void doReleaseShared() {
- for (;;) {
- Node h = head; // 头节点
- if (h != null && h != tail) {// 同步队列中存在线程等待
- int ws = h.waitStatus; // 头节点线程状态
- if (ws == Node.SIGNAL) {// 头节点线程状态为 SIGNAL 唤醒后续线程节点
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h); // 唤醒下个节点
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
总结: Semaphore 使用 AQS 同步状态来保存信号量的当前计数. 它里面定义的 acquireSharedInterruptibly 方法会减少计数, 当计数为非正值时阻塞线程, releaseShared 方法会增加计数, 在计数不超过信号量限制时要解除线程的阻塞.
来源: https://www.cnblogs.com/iou123lg/p/9689491.html