前言
1965 年, 荷兰计算机科学家 Dijkstra 提出的信号量机制成为一种高效的进程同步机制. 这之后的 15 年, 信号量一直都是并发编程领域的终结者. 1980 年, 管程被提出, 成为继信号量之后的在并发编程领域的第二个选择. 目前几乎所有的语言都支持信号量机制, Java 也不例外. Java 中提供了 Semaphore 并发工具类来支持信号量机制. 下面我们就来了解 Java 实现的信号量机制.
首先介绍信号量模型, 然后介绍如何使用, 最后使用信号量来实现一个限流器.
信号量模型
信号量模型图(图来自参考[1]):
信号量模型总结为: 一个计数器, 一个等待队列和三个对外调用的方法.
计数器和等待队列时对外透明的, 所有我们只能通过三个对外方法来访问计数器和等待队列.
init(): 设置计数器的初始值.
down(): 计数器的值减一. 如果此时计数器的值小于 0, 则当前线程插入等待队列并阻塞, 否则当前线程可以继续执行.
up(): 计数器的值加一. 如果此时计数器的值小于或者等于 0, 则唤醒等待队列中的一个线程, 并将其从等待队列中移除.
这三个方法都是原子性的, 由实现信号量模型的方法保证. 在 Java SDK 中, 信号量模型是由 java.util.concurrent.Semaphore 实现.
信号量模型代码化大致类似如下:
- class Semaphore{
- int count; // 计数器
- Queue queue; // 等待队列
- // 初始化操作
- Semaphore(int c){
- this.count=c;
- }
- void down(){
- this.count--; // 计数器值减一
- if(this.count <0){
- // 将当前线程插入等待队列
- // 阻塞当前线程
- }
- }
- void up(){
- this.count++; // 计数器值加一
- if(this.count <= 0) {
- // 移除等待队列中的某个线程 T
- // 唤醒线程 T
- }
- }
- }
在信号量模型中, down()和 up()这两个操作也被成为 P 操作 (荷兰语 proberen, 测试) 和 V 操作 (荷荷兰语 verhogen, 增加). 在我学的操作系统教材中(C 语言实现),P 操作对应 wait(),V 操作对应 singal(). 虽然叫法不同, 但是语义都是相同的. 在 Java SDK 并发包中, down() 和 up()分别对应于 Semaphore 中的 acquire()和 release().
如何使用信号量
信号量有时也被称为红绿灯, 我们想想红绿灯时怎么控制交通的, 就知道该如何使用信号量. 车辆路过十字路时, 需要先检查是否为绿灯, 如果是则通行, 否则就等待. 想想和加锁机制有点相似, 都是一样的操作, 先检查是否符合条件 ("尝试获取"), 符合("获取到") 则线程继续运行, 否则阻塞线程.
下面使用累加器的例子来说明如何使用信号量.
count+=1 操作是个临界区, 只允许一个线程执行, 即要保证互斥. 于是我们在进入临界区之前, 使用 down()即 Java 中的 acquire(), 在退出之后使用 up()即 Java 中的 release().
- static int count;
- // 初始化信号量
- static final Semaphore s = new Semaphore(1); // 构造函数参数为 1, 表示只允许一个线程进行临界区. 可实现一个互斥锁的功能.
- // 用信号量保证互斥
- static void addOne() {
- s.acquire(); // 获取一个许可(可看作加锁机制中加锁)
- try {
- count+=1;
- } finally {
- s.release(); // 归还许可(可看做加锁机制中解锁)
- }
- }
完整代码如下:
- package com.sakura.concrrent;
- import java.util.concurrent.Semaphore;
- public class SemaphoreTest {
- static int count;
- static final Semaphore s = new Semaphore(1);
- static void addOne() throws InterruptedException {
- // 只会有一个线程将信号量中的计数器减为 1, 而另外一个线程只能将信号量中计数器减为 - 1, 导致被阻塞
- s.acquire();
- try {
- count +=1;
- System.out.println("Now thread is" + Thread.currentThread() + "and count is" + count);
- }finally {
- // 进入临界区的线程在执行完临界区代码后将信号量中计数器的值加 1 然后, 此时信号量中计数器的值为 0, 则从阻塞队列中唤醒被阻塞的进程
- s.release();
- }
- }
- public static void main(String[] args) {
- // 创建两个线程运行
- MyThread thread1 = new MyThread();
- MyThread thread2 = new MyThread();
- thread1.start();
- thread2.start();
- System.out.println("main thread");
- }
- }
- class MyThread extends Thread{
- @Override
- public void run() {
- super.run();
- for(int i=0; i<10; i++) {
- try {
- SemaphoreTest.addOne();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
运行结果:
如果 Semaphore 的构造函数参数 (许可数量, 内置计数器的值) 修改一下:
static final Semaphore s = new Semaphore(2);
则计数器值的为 2, 那么就允许有两个线程进入临界区, 我们的 count 值就会出现问题
快速实现一个限流器
当设置信号量的计数器为 1 时, 可实现一个简单的互斥锁功能. 但是, 我们前面刚介绍过 Java SDK 中的 Lock,Semaphore 的用途显然不会与 Lock 一致, 不然就重复造轮子了. Semaphore 最重要的一个功能便是: 可以允许多个线程访问一个临界区.(上述例子我们就设置了计数器的值为 2, 可发现 thread1 和 thread2 都可进入临界区.)
我们会在什么地方遇见这种需求呢?
各种池化资源, 例如连接池, 对象池, 线程池等等. 例如, 数据库连接池, 在同一时刻, 一定是允许多个线程同时使用连接池, 当然, 每个连接在被释放之前, 是不允许其他线程使用的.
我们设计如下可以允许 N 个线程使用的对象池, 我们将信号量的计数器值设为 N, 就可以让 N 个线程同时进行临界区, 多余的就会被阻塞.(代码来自参考[1])
- class ObjPool<T, R> {
- final List<T> pool; // 使用 List 保存实例对象
- // 用信号量实现限流器
- final Semaphore sem;
- // 构造函数
- ObjPool(int size, T t){
- pool = new Vector<T>(){};
- for(int i=0; i<size; i++){
- pool.add(t);
- }
- sem = new Semaphore(size);
- }
- // 获取对象池的对象, 调用 func
- R exec(Function<T,R> func) {
- T t = null;
- sem.acquire(); // 允许 N 个进程同时进入临界区
- try {
- // 我们需要注意, 因为多个进行可以进入临界区, 所以 Vector 的 remove 方法是线程安全的
- t = pool.remove(0);
- return func.apply(t); // 获取对象池汇中的一个对象后, 调用 func 函数
- } finally {
- pool.add(t); // 离开临界区之前, 将之前获取的对象放回到池中
- sem.release(); // 使得计数器加 1, 如果信号量中计数器小于等于 0, 那么说明有线程在等待, 此时就会自动唤醒等待线程
- }
- }
- }
- // 创建对象池
- ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
- // 通过对象池获取 t, 之后执行
- pool.exec(t -> {
- System.out.println(t);
- return t.toString();
- });
小结
记得学习操作系统时, 信号量类型分为了好几种整型信号量, 记录型信号量, AND 信号量以及 "信号量集"(具体了解可戳参考[2]). 我认为 Java SDK 中 Semaphore 应该是记录型信号量的实现. 不由想起, 编程语言是对 OS 层面操作的一种抽象描述. 这句话需要品需要细细品.
参考:
[1] 极客时间专栏王宝令《Java 并发编程实战》
[2] 静水深流. 操作系统之信号量机制总结. https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html
来源: https://www.cnblogs.com/myworld7/p/12315393.html