基于 AQS 的前世今生, 来学习并发工具类 CountDownLatch. 本文将从 CountDownLatch 的应用场景, 源码原理解析来学习这个并发工具类.
1, 应用场景
CountDownLatch 是并发包中用来控制一个或者多个线程等待其他线程完成操作的并发工具类. 现以工作中的一个场景来描述下 CountDownLatch 的应用, 代码如下:
- /*
- 模拟工作中的一个需求场景:
- 用户会选择多个算法来计算费用, 最后会将所有算法计算出的费用做一个加权求平均数, 这个平均数是最终的费用.
- 每个算法的复杂度都不一样, 打算每个线程负责一个算法的实现, 所有的线程执行完成, 最后再求平均数.
- 1, 为每个算法创建一个线程, 每个线程负责一个算法的实现
- 2, 通过 CountDownLatch 来控制所有算法线程的同步
- 3, 全部计算完成后再求平均数
- */
- public class CountDownLatchTask {
- public static void main(String[] args) {
- CountDownLatchTask countDownLatchTask = new CountDownLatchTask();
- countDownLatchTask.startThreads(5);
- }
- // 根据线程数和选择的算法 调度算法对应的实现
- private void startThreads(int threadNumber) {
- CountDownLatch countDownLatch = new CountDownLatch(threadNumber);
- for (int i = 0; i < threadNumber; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- System.out.println("线程算法实现:" + Thread.currentThread().getName());
- countDownLatch.countDown();
- }
- }).start();
- }
- try {
- countDownLatch.await();
- System.out.println("加权求平均数");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
在分析原理实现前, 总结下 CountDownLatch 的作用就是阻塞其他线程直到条件允许后才释放该阻塞, 除了上述这个小案例, 实际工作中还有很多可以使用 CountDownLatch 的场景, 比如解析 Excel 文件时可以同时解析多个 Sheet 页, 所有的 Sheet 解析完成才算完成了 Excel 文件的解析. 从这个代码中也可以看到 CountDownLatch 的主要方法就是 await 和 countDown, 下面将以这两个方法来分析下 CountDownLatch 的原理实现.
2, 源码原理解析
2.1 await 方法
调用 await 方法会阻塞当前线程直到计数器的数值为 0, 方法如下:
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1); // 共享式获取 AQS 的同步状态
- }
调用的是 AQS 的 acquireSharedInterruptibly 方法:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())// 线程中断 说明闭锁对线程中断敏感
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0) // 闭锁未使用完成 线程进入同步队列自旋等待
- doAcquireSharedInterruptibly(arg);
- }
其中 tryAcquireShared 依赖的是 Sync 的实现, 和之前的 ReentrantLock,ReentrantReadWriteLock 及 Semaphore 相比, CountDownLatch 的 Sync 只提供了一种方式, 代码如下:
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1; //AQS 的同步状态为 0 则闭锁结束 可以进行下一步操作
- }
doAcquireSharedInterruptibly 方法就不再赘述, 和之前 Semaphore 的实现是一致的, 本质上仍然是 AQS 同步队列的入队自旋等待.
2.2 countDown 方法
调用 countDown 方法会将计数器的数值减 1 直到计数器为 0, 方法如下:
- public void countDown() {
- sync.releaseShared(1);
- }
和 Semaphore 一样, 调用的是 AQS 的 releaseShared 方法:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {// 减少闭锁的计数器
- doReleaseShared();// 唤醒后续线程节点
- return true;
- }
- return false;
- }
其中 tryReleaseShared 依赖的是 Sync 的实现, 和之前的 ReentrantLock,ReentrantReadWriteLock 及 Semaphore 相比, CountDownLatch 的 Sync 只提供了一种方式, 代码如下:
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false; // 计数器已经是 0 了
- int nextc = c-1; // 计数器减 1
- if (compareAndSetState(c, nextc)) //CAS 更新同步状态
- return nextc == 0;
- }
- }
唤醒后续线程节点的 doReleaseShared 也不再赘述, 和之前 Semaphore 的实现是一致的.
总结: CountDownLatch 类使用 AQS 同步状态来表示计数. 在 await 时, 所有的线程进入同步队列自旋等待, 在 countDown 时, 获取闭锁成功的线程会减少闭锁的计数器, 同时唤醒后续线程取获取闭锁, 直到 await 中的计数器为 0, 获取到闭锁的线程才可以通过, 执行下一步操作.
参考资料:
来源: https://www.cnblogs.com/iou123lg/p/9739697.html