并发工具类
本系列文章主要讲解 Java 并发相关的内容, 包括同步, 锁, 信号量, 阻塞队列, 线程池等, 整体思维导图如下:
系列文章列表:
Java 并发基础 - 并发模型, 基础接口以及 Thread
Java 并发基础 - 同步和锁
本文主要以实例讲解 CountDownLatch,Semaphore, 阻塞队列和线程池等内容.
CountDownLatch
基本概念和用途
CountDownLatch 主要是在其他线程执行操作前, 允许一个或者多个线程一直等待. 其源码实现主要采用 AQS, 具体可参考 Java 并发基础 - 同步和锁. 构造方法 CountDownLatch(int count)用来初始化计数器, 实际就是设置 count(最终是设置 state 值)的初始值. 该值设置后, 不能重置, 所有当线程必须用这种方法反复倒计数时, 可改为使用 CyclicBarrier. countDown()方法用来减少计数器的值, 每次减 1. getCount()方法用来返回当前计数器的值. 流程图简化如下
运行示例
在 SynchronizedDemo 代码中, 我们使用 Thread 的 join 方法等待一个 Spender 和 Earner 线程运行完成后, 再去获取账户余额 balance 的值, 这里我们利用 CountDownLatch 计数器, 先阻塞主线程, 待一组 Spender 和一组 Earner 线程完成后, 再让主线程获取账户余额的值, 代码如下:
- package com.molyeo.java.concurrent;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- * Created by zhangkh on 2018/7/17.
- */
- public class CountDownLatchDemo {
- public static void main(String[] args) throws InterruptedException {
- System.out.println("main thread start");
- Account account = new Account();
- account.setBalance(100000);
- CountDownLatch latch = new CountDownLatch(20);
- System.out.println("main latch init="+latch.getCount());
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 10; i++) {
- SpenderWithCountDownLatch spender = new SpenderWithCountDownLatch(account, latch);
- executorService.submit(spender);
- }
- for (int i = 0; i < 10; i++) {
- EarnerWithCountDownLatch earner = new EarnerWithCountDownLatch(account, latch);
- executorService.submit(earner);
- }
- System.out.println("main thread block");
- latch.await();
- System.out.println("main latch="+latch.getCount());
- System.out.println("main thread continue to do");
- System.out.println("balance="+account.getBalance());
- executorService.shutdown();
- }
- }
先初始化 CountDownLatch 的值为 20, 然后分布创建一组 Spender 线程 (每组 10 个) 和一组 Earner 线程(每组 10 个), 并将 account 和 latch 传递给这些线程.
SpenderWithCountDownLatch 代码如下, 运行时主要是 30 次减少账户余额, 每次减少 1000. 运行完成后调用 latch.countDown(), 减少计数器的值.
- public class SpenderWithCountDownLatch implements Runnable {
- private final Account account;
- private final CountDownLatch latch;
- public SpenderWithCountDownLatch(Account account, CountDownLatch latch) {
- this.account = account;
- this.latch = latch;
- }
- @Override
- public void run() {
- for (int i = 0; i < 30; i++) {
- account.subtractAmount(1000);
- }
- latch.countDown();
- System.out.println("Spender run ......");
- }
- }
EarnerWithCountDownLatch 代码如下, 运行时主要是 30 次增加账户余额, 每次增加 1000. 运行完成后调用 latch.countDown(), 减少计数器的值.
- public class EarnerWithCountDownLatch implements Runnable {
- private final Account account;
- private final CountDownLatch latch;
- public EarnerWithCountDownLatch(Account account, CountDownLatch latch) {
- this.account = account;
- this.latch = latch;
- }
- @Override
- public void run() {
- for (int i = 0; i < 30; i++) {
- account.addAmount(1000);
- }
- latch.countDown();
- System.out.println("Earner run ...." );
- }
- }
Account 代码如下, 需要利用同步块或者 Lock 保证 addAmount 和 subtractAmount 方法线程安全
- public class Account {
- private double balance;
- public double getBalance() {
- return balance;
- }
- public void setBalance(double balance) {
- this.balance = balance;
- }
- public void addAmount(double amount) {
- synchronized (Account.class) {
- balance = balance + amount;
- }
- }
- public void subtractAmount(double amount) {
- synchronized (Account.class) {
- balance = balance - amount;
- }
- }
- }
整个程序输出如下:
- main thread start
- main latch init=20
- main thread block
- Spender run ......
- Spender run ......
- Spender run ......
- Earner run ....
- Earner run ....
- Earner run ....
- Earner run ....
- Earner run ....
- Earner run ....
- Spender run ......
- Earner run ....
- Earner run ....
- Spender run ......
- Earner run ....
- Earner run ....
- Spender run ......
- Spender run ......
- Spender run ......
- Spender run ......
- Spender run ......
- main latch=0
- main thread continue to do
- balance=100000.0
满足预期结果为 100000, 同时主线程一直阻塞直到 latch 的值为 0.
CyclicBarrier
基本概念和主要方法
CyclicBarrier 允许一组线程互相等待, 直到所有的线程到到达公共屏障点(common barrier point). 和 CountDownLatch 不同的是, CyclicBarrier 可以在释放等待线程后重置然后重用.
构造方法
public CyclicBarrier(int parties, Runnable barrierAction)
其中 parties, 表示线程数量, 即参与者数量 barrierAction 表示启动 barrier 时执行指定的操作, 该操作由最后一个进入 barrier 的线程执行 await() 参与者阻塞等待, 直到所有的参与者都到达 barrier 流程图简化如下
使用示例
我们看下面一个示例, 其功能是主线程, Spender 线程和 Earner 线程共用一个 barrier, 其中 barrier 初始值为 3, 三个线程都到达 barrier 后执行 BarrierAction 定义的动作.
- package com.molyeo.java.concurrent;
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- * Created by zhangkh on 2018/9/5.
- */
- public class CyclicBarrierDemo {
- public static void main(String[] args) {
- Account account=new Account();
- account.setBalance(100000);
- CyclicBarrier barrier = new CyclicBarrier(3, new BarrierAction());
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- SpenderWithCyclicBarrier spender = new SpenderWithCyclicBarrier(account, barrier);
- executorService.submit(spender);
- EarnerWithCyclicBarrier earner = new EarnerWithCyclicBarrier(account, barrier);
- executorService.submit(earner);
- try{
- System.out.println(String.format(" s waiting at barrier",Thread.currentThread().getName()));
- barrier.await();
- }catch (InterruptedException e){
- e.printStackTrace();
- }catch (BrokenBarrierException e){
- e.printStackTrace();
- }
- System.out.println("balance="+account.getBalance());
- System.out.println(String.format(" s done",Thread.currentThread().getName()));
- }
- }
- class BarrierAction implements Runnable{
- public void run() {
- System.out.println(String.format(" s executed",Thread.currentThread().getName()));
- }
- }
- class SpenderWithCyclicBarrier implements Runnable {
- private final Account account;
- private final CyclicBarrier barrier;
- public SpenderWithCyclicBarrier(Account account, CyclicBarrier barrier) {
- this.account = account;
- this.barrier = barrier;
- }
- @Override
- public void run() {
- for (int i = 0; i < 30; i++) {
- account.subtractAmount(1000);
- }
- try{
- System.out.println(String.format(" s waiting at barrier",Thread.currentThread().getName()));
- barrier.await();
- }catch (InterruptedException e){
- e.printStackTrace();
- }catch (BrokenBarrierException e){
- e.printStackTrace();
- }
- System.out.println(String.format(" s done",Thread.currentThread().getName()));
- }
- }
- class EarnerWithCyclicBarrier implements Runnable {
- private final Account account;
- private final CyclicBarrier barrier;
- public EarnerWithCyclicBarrier(Account account, CyclicBarrier barrier) {
- this.account = account;
- this.barrier = barrier;
- }
- @Override
- public void run(){
- for (int i = 0; i < 30; i++) {
- account.addAmount(1000);
- }
- try{
- System.out.println(String.format(" s waiting at barrier",Thread.currentThread().getName()));
- barrier.await();
- }catch (InterruptedException e){
- e.printStackTrace();
- }catch (BrokenBarrierException e){
- e.printStackTrace();
- }
- System.out.println(String.format(" s done",Thread.currentThread().getName()));
- }
- }
- class Account {
- private double balance;
- public double getBalance() {
- return balance;
- }
- public void setBalance(double balance) {
- this.balance = balance;
- }
- public void addAmount(double amount) {
- synchronized (Account.class) {
- balance = balance + amount;
- }
- }
- public void subtractAmount(double amount) {
- synchronized (Account.class) {
- balance = balance - amount;
- }
- }
- }
程序输出如下:
- pool-1-thread-2 waiting at barrier
- main waiting at barrier
- pool-1-thread-1 waiting at barrier
- pool-1-thread-1 executed
- pool-1-thread-1 done
- pool-1-thread-2 done
- balance=100000.0
- main done
我们可以看到 pool-1-thread-2 和 main 线程执行完对账户余额的操作后, 先到达 barrier 阻塞等待, pool-1-thread-1 线程最后到达, 然后由 pool-1-thread-1 线程执行预定义的动作, 即输出 executed 后, 这三个线程再继续执行其他信息的输出. 这里要注意到时, 各位输出的内容可能是上面的不太一致, 不过第三行和第四行的线程名要么是 main, 要么是 pool-1-thread-1, 或者是 pool-1-thread-2. 不会存在两个线程名不一样的情况. 这里说明了到达 barrier 后预定义的动作是由最后到达的线程去执行的.
本文参考
- Java 7 Concurrency Cookbook http://it-ebooks.info/book/3916/
- http://ifeve.com/concurrency-modle-seven-week-1/
- http://tutorials.jenkov.com/java-concurrency/concurrency-models.html
- http://tutorials.jenkov.com/java-util-concurrent/lock.html
- java se 8 apidoc https://docs.oracle.com/javase/8/docs/api/
关于作者 爱编程, 爱钻研, 爱分享, 爱生活 关注分布式, 高并发, 数据挖掘 如需捐赠, 请扫码
来源: https://www.cnblogs.com/aidodoo/p/9601499.html