在 JDK 的并发包里提供了几个非常有用的并发工具类 CountDownLatchCyclicBarrier 和 Semaphore 工具类提供了一种并发流程控制的手段, Exchanger 工具类则提供了在线程间交换数据的一种手段本章会配合一些应用场景来介绍如何使用这些工具类
CountDownLatch
CountDownLatch 允许一个或多个线程等待其他线程完成操作
假如有这样一个需求: 我们需要解析一个 Excel 里多个 sheet 的数据, 此时可以考虑使用多线程, 每个线程解析一个 sheet 里的数据, 等到所有的 sheet 都解析完之后, 程序需要提示解析完成 (或者汇总结果) 在这个需求中, 要实现主线程等待所有线程完成 sheet 的解析操作, 最简单的做法是使用 join()方法
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicInteger;
- public class JoinCountDownLatchTest {
- private static Random sr = new Random(47);
- private static AtomicInteger result = new AtomicInteger(0);
- private static int threadCount = 10;
- private static class Parser implements Runnable {
- String name;
- public Parser(String name) {
- this.name = name;
- }@Override public void run() {
- int sum = 0;
- int seed = Math.abs(sr.nextInt());
- Random r = new Random(47);
- for (int i = 0; i < 100; i++) {
- sum += r.nextInt(seed);
- }
- result.addAndGet(sum);
- System.out.println(name + "线程的解析结果:" + sum);
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread[] threads = new Thread[threadCount];
- for (int i = 0; i < threadCount; i++) {
- threads[i] = new Thread(new Parser("Parser-" + i));
- }
- for (int i = 0; i < threadCount; i++) {
- threads[i].start();
- }
- for (int i = 0; i < threadCount; i++) {
- threads[i].join();
- }
- System.out.println("所有线程解析结束!");
- System.out.println("所有线程的解析结果:" + result);
- }
- }
输出:
Parser-1 线程的解析结果:-2013585201
Parser-0 线程的解析结果: 1336321192
Parser-2 线程的解析结果: 908136818
Parser-5 线程的解析结果:-1675827227
Parser-3 线程的解析结果: 1638121055
Parser-4 线程的解析结果: 1513365118
Parser-6 线程的解析结果: 489607354
Parser-8 线程的解析结果: 1513365118
Parser-7 线程的解析结果:-1191966831
Parser-9 线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果: 1605138237
join 用于让当前执行线程等待 join 线程执行结束其实现原理是不停检查 join 线程是否存活, 如果 join 线程存活则让当前线程永远等待
在 JDK 1.5 之后的并发包中提供的 CountDownLatch 也可以实现 join 的功能, 并且比 join 的功能更多
- import java.util.Random;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.atomic.AtomicInteger;
- public class CountDownLatchTest {
- private static Random sr = new Random(47);
- private static AtomicInteger result = new AtomicInteger(0);
- private static int threadCount = 10; // 线程数量
- private static CountDownLatch countDown = new CountDownLatch(threadCount); //CountDownLatch
- private static class Parser implements Runnable {
- String name;
- public Parser(String name) {
- this.name = name;
- }@Override public void run() {
- int sum = 0;
- int seed = Math.abs(sr.nextInt());
- Random r = new Random(47);
- for (int i = 0; i < 100; i++) {
- sum += r.nextInt(seed);
- }
- result.addAndGet(sum);
- System.out.println(name + "线程的解析结果:" + sum);
- countDown.countDown(); // 注意这里
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread[] threads = new Thread[threadCount];
- for (int i = 0; i < threadCount; i++) {
- threads[i] = new Thread(new Parser("Parser-" + i));
- }
- for (int i = 0; i < threadCount; i++) {
- threads[i].start();
- }
- /*
- for(int i=0;i<threadCount;i++){
- threads[i].join();
- }*/
- countDown.await(); // 将 join 改为使用 CountDownLatch
- System.out.println("所有线程解析结束!");
- System.out.println("所有线程的解析结果:" + result);
- }
- }
输出:
Parser-0 线程的解析结果: 1336321192
Parser-1 线程的解析结果:-2013585201
Parser-2 线程的解析结果:-1675827227
Parser-4 线程的解析结果: 1638121055
Parser-3 线程的解析结果: 908136818
Parser-5 线程的解析结果: 1513365118
Parser-7 线程的解析结果: 489607354
Parser-6 线程的解析结果: 1513365118
Parser-8 线程的解析结果:-1191966831
Parser-9 线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果: 1605138237
CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器, 如果你想等待 N 个点完成, 这里就传入 N
当我们调用 CountDownLatch 的 countDown 方法时, N 就会减 1,CountDownLatch 的 await 方法会阻塞当前线程, 直到 N 变成零由于 countDown 方法可以用在任何地方, 所以这里说的 N 个点, 可以是 N 个线程, 也可以是 1 个线程里的 N 个执行步骤用在多个线程时, 只需要把这个 CountDownLatch 的引用传递到线程里即可
如果有某个解析 sheet 的线程处理得比较慢, 我们不可能让主线程一直等待, 所以可以使用另外一个带指定时间的 await 方法 await(long time,TimeUnit unit), 这个方法等待特定时间后, 就会不再阻塞当前线程 join 也有类似的方法
注意: 计数器必须大于等于 0, 只是等于 0 时候, 计数器就是零, 调用 await 方法时不会阻塞当前线程 CountDownLatch 不可能重新初始化或者修改 CountDownLatch 对象的内部计数器的值一个线程调用 countDown 方法 happen-before, 另外一个线程调用 await 方法
CyclicBarrier
CyclicBarrier 的字面意思是可循环使用 (Cyclic) 的屏障 (Barrier) 它要做的事情是, 让一组线程到达一个屏障 (也可以叫同步点) 时被阻塞, 直到最后一个线程到达屏障时, 屏障才会
开门, 所有被屏障拦截的线程才会继续运行当所有等待线程都被释放以后, CyclicBarrier 可以被重用 CyclicBarrier 类位于 java.util.concurrent 包下, CyclicBarrier 提供 2 个构造器:
- public CyclicBarrier(int parties, Runnable barrierAction) {
- }
- public CyclicBarrier(int parties) {
- }
参数 parties 指让多少个线程或者任务等待至 barrier 状态; 参数 barrierAction 为当这些线程都达到 barrier 状态时会执行的内容
- public class Test {
- public static void main(String[] args) {
- int N = 4;
- CyclicBarrier barrier = new CyclicBarrier(N);
- for(int i=0;i<N;i++)
- new Writer(barrier).start();
- }
- static class Writer extends Thread{
- private CyclicBarrier cyclicBarrier;
- public Writer(CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
- @Override
- public void run() {
- System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
- try {
- Thread.sleep(5000); // 以睡眠来模拟写入数据操作
- System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕, 等待其他线程写入完毕");
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }catch(BrokenBarrierException e){
- e.printStackTrace();
- }
- System.out.println("所有线程写入完毕, 继续处理其他任务...");
- }
- }
- }
执行结果:
线程 Thread-0 正在写入数据...
线程 Thread-3 正在写入数据...
线程 Thread-2 正在写入数据...
线程 Thread-1 正在写入数据...
线程 Thread-2 写入数据完毕, 等待其他线程写入完毕
线程 Thread-0 写入数据完毕, 等待其他线程写入完毕
线程 Thread-3 写入数据完毕, 等待其他线程写入完毕
线程 Thread-1 写入数据完毕, 等待其他线程写入完毕
所有线程写入完毕, 继续处理其他任务...
所有线程写入完毕, 继续处理其他任务...
所有线程写入完毕, 继续处理其他任务...
所有线程写入完毕, 继续处理其他任务...
CyclicBarrier 和 CountDownLatch 的区别
CountDownLatch 的计数器只能使用一次, 而 CyclicBarrier 的计数器可以使用 reset()方法重置所以 CyclicBarrier 能处理更为复杂的业务场景例如, 如果计算发生错误, 可以重置计数器, 并让线程重新执行一次
CyclicBarrier 还提供其他有用的方法, 比如 getNumberWaiting 方法可以获得 Cyclic-Barrier 阻塞的线程数量 isBroken()方法用来了解阻塞的线程是否被中断
Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量, 它通过协调各个线程, 以保证合理的使用公共资源 Semaphore 可以控同时访问的线程个数, 通过 acquire()获取一个许可, 如果没有就等待, 而 release() 释放一个许可
假若一个工厂有 5 台机器, 但是有 8 个工人, 一台机器同时只能被一个工人使用, 只有使用完了, 其他工人才能继续使用那么我们就可以通过 Semaphore 来实现:
- public class Test {
- public static void main(String[] args) {
- int N = 8; // 工人数
- Semaphore semaphore = new Semaphore(5); // 机器数目
- for(int i=0;i<N;i++)
- new Worker(i,semaphore).start();
- }
- static class Worker extends Thread{
- private int num;
- private Semaphore semaphore;
- public Worker(int num,Semaphore semaphore){
- this.num = num;
- this.semaphore = semaphore;
- }
- @Override
- public void run() {
- try {
- semaphore.acquire();
- System.out.println("工人"+this.num+"占用一个机器在生产...");
- Thread.sleep(2000);
- System.out.println("工人"+this.num+"释放出机器");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
执行结果:
工人 0 占用一个机器在生产...
工人 1 占用一个机器在生产...
工人 2 占用一个机器在生产...
工人 4 占用一个机器在生产...
工人 5 占用一个机器在生产...
工人 0 释放出机器
工人 2 释放出机器
工人 3 占用一个机器在生产...
工人 7 占用一个机器在生产...
工人 4 释放出机器
工人 5 释放出机器
工人 1 释放出机器
工人 6 占用一个机器在生产...
工人 3 释放出机器
工人 7 释放出机器
工人 6 释放出机器
Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类 Exchanger 用于进行线程间的数据交换它提供一个同步点, 在这个同步点, 两个线程可以交换彼此的数据这两个线程通过 exchange 方法交换数据, 如果第一个线程先执行 exchange()方法, 它会一直等待第二个线程也执行 exchange 方法, 当两个线程都到达同步点时, 这两个线程就可以交换数据, 将本线程生产出来的数据传递给对方
下面来看一下 Exchanger 的应用场景
1Exchanger 可以用于遗传算法, 遗传算法里需要选出两个人作为交配对象, 这时候会交换两人的数据, 并使用交叉规则得出 2 个交配结果
2Exchanger 也可以用于校对工作, 比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水, 为了避免错误, 采用 AB 岗两人进行录入, 录入到 Excel 之后, 系统需要加载这两个 Excel, 并对两个 Excel 数据进行校对, 看看是否录入一致.
- import java.util.concurrent.Exchanger;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- public class ExchangerTest {
- private static final Exchanger<String> exgr = new Exchanger<String>();
- private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
- public static void main(String[] args) {
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- String A = "银行流水 100";// A 录入银行流水数据
- String B=exgr.exchange(A);
- System.out.println("A 的视角: A 和 B 数据是否一致:" + A.equals(B) +
- ",A 录入的是:" + A + ",B 录入是:" + B);
- } catch (InterruptedException e) {
- }
- }
- });
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- String B = "银行流水 200";// B 录入银行流水数据
- String A = exgr.exchange(B);
- System.out.println("B 的视角: A 和 B 数据是否一致:" + A.equals(B) +
- ",A 录入的是:" + A + ",B 录入是:" + B);
- } catch (InterruptedException e) {
- }
- }
- });
- threadPool.shutdown();
- }
- }
输出:
B 的视角: A 和 B 数据是否一致: false,A 录入的是: 银行流水 100,B 录入是: 银行流水 200
A 的视角: A 和 B 数据是否一致: false,A 录入的是: 银行流水 100,B 录入是: 银行流水 200
如果两个线程有一个没有执行 exchange()方法, 则会一直等待, 如果担心有特殊情况发生, 避免一直等待, 可以使用 exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长
参考:
Java 并发工具类详解
Java 并发编程 - 原子类及并发工具类
来源: http://www.bubuko.com/infodetail-2499049.html