一, CyclicBarrier 使用
CyclicBarrier 从字面上可以直接理解为线程运行的屏障, 它可以让一组线程执行到一个共同的屏障点时被阻塞, 直到最后一个线程执行到指定位置, 你设置的执行线程就会触发运行; 同时 CyclicBarrier 相比与 CountDownLatch, 它是可以被重置的; 下面我们通过一个简单例子看下 CyclicBarrier 的使用;
实例化一个 CyclicBarrier 对象并传入你要控制的线程内部;
- public static void main(String[] args) {
- CyclicBarrier cb = new CyclicBarrier(3, new Runnable() {
- public void run() {
- System.out.println("所有线程集合");
- }
- });
- for (int i = 0; i <3; i++) {
- new CyclicBarrierThread(i + "", cb).start();
- }
- }
计数线程代码, 每当计数到偶数时调用 CyclicBarrier 的 await() 方法
- public class CyclicBarrierThread extends Thread{
- private CyclicBarrier barrier;
- private String name;
- private int count;
- public CyclicBarrierThread(String name,CyclicBarrier barrier) {
- this.name=name;
- this.barrier=barrier;
- this.count=0;
- }
- public void run() {
- try {
- for(int i=0;i<10;i++) {
- Thread.sleep(100);
- count++;
- System.out.println(name+"号线程 ---"+Thread.currentThread().getName()+"开始计数:"+count);
- if(count%2==0) {// 每计数到偶数次时集合一次
- barrier.await();
- System.out.println(name+"号线程 ---"+Thread.currentThread().getName()+"集合完毕, 继续计数");
- }
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
查看代码输出
2 号线程 ---Thread-2 开始计数: 1
0 号线程 ---Thread-0 开始计数: 1
1 号线程 ---Thread-1 开始计数: 1
2 号线程 ---Thread-2 开始计数: 2
1 号线程 ---Thread-1 开始计数: 2
0 号线程 ---Thread-0 开始计数: 2
所有线程集合
2 号线程 ---Thread-2 集合完毕, 继续计数
1 号线程 ---Thread-1 集合完毕, 继续计数
0 号线程 ---Thread-0 集合完毕, 继续计数
2 号线程 ---Thread-2 开始计数: 3
1 号线程 ---Thread-1 开始计数: 3
0 号线程 ---Thread-0 开始计数: 3
2 号线程 ---Thread-2 开始计数: 4
0 号线程 ---Thread-0 开始计数: 4
1 号线程 ---Thread-1 开始计数: 4
所有线程集合
1 号线程 ---Thread-1 集合完毕, 继续计数
2 号线程 ---Thread-2 集合完毕, 继续计数
0 号线程 ---Thread-0 集合完毕, 继续计数
0 号线程 ---Thread-0 开始计数: 5
2 号线程 ---Thread-2 开始计数: 5
1 号线程 ---Thread-1 开始计数: 5
0 号线程 ---Thread-0 开始计数: 6
1 号线程 ---Thread-1 开始计数: 6
2 号线程 ---Thread-2 开始计数: 6
所有线程集合
2 号线程 ---Thread-2 集合完毕, 继续计数
0 号线程 ---Thread-0 集合完毕, 继续计数
1 号线程 ---Thread-1 集合完毕, 继续计数
0 号线程 ---Thread-0 开始计数: 7
1 号线程 ---Thread-1 开始计数: 7
2 号线程 ---Thread-2 开始计数: 7
1 号线程 ---Thread-1 开始计数: 8
0 号线程 ---Thread-0 开始计数: 8
2 号线程 ---Thread-2 开始计数: 8
所有线程集合
2 号线程 ---Thread-2 集合完毕, 继续计数
0 号线程 ---Thread-0 集合完毕, 继续计数
1 号线程 ---Thread-1 集合完毕, 继续计数
0 号线程 ---Thread-0 开始计数: 9
1 号线程 ---Thread-1 开始计数: 9
2 号线程 ---Thread-2 开始计数: 9
1 号线程 ---Thread-1 开始计数: 10
0 号线程 ---Thread-0 开始计数: 10
2 号线程 ---Thread-2 开始计数: 10
所有线程集合
1 号线程 ---Thread-1 集合完毕, 继续计数
2 号线程 ---Thread-2 集合完毕, 继续计数
0 号线程 ---Thread-0 集合完毕, 继续计数
通过输出结果可以看到, 计数线程每计数到偶数次时使用 CyclicBarrier 的 await() 方法, 线程都会进入阻塞等待的状态, 直到最后一个线程到达屏障点时, 触发你定义的执行线程, 而且 CyclicBarrier 的 await() 方法是可以重复使用的.
二, CyclicBarrier 源码分析
下面我们就对 CyclicBarrier 内部的源码实现进行一些分析与总结
1,CyclicBarrier 的构造
首先看下 CyclicBarrier 的构造函数
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- // 拦截的线程数量
- this.parties = parties;
- // 用于计数的 count 值, 每有一个线程执行到屏障点, 就会递减 1
- this.count = parties;
- // 定义的拦截线程
- this.barrierCommand = barrierAction;
- }
CyclicBarrier 的构造函数很简单就是接收你要拦截的线程数量与定义的执行线程.
2,await 方法
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
我们看下具体实现 dowait 方法的实现
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- // 获取可重入锁
- final ReentrantLock lock = this.lock;
- // 加锁
- lock.lock();
- try {
- //CyclicBarrier 内部定义的一个 Generation 类
- final Generation g = generation;
- // 判断 Generation 的 broken 状态
- if (g.broken)
- throw new BrokenBarrierException();
- // 如果线程被中断
- if (Thread.interrupted()) {
- //Generation 的 broken 置为 true,count 值重置, 并唤醒所有线程
- breakBarrier();
- throw new InterruptedException();
- }
- //count 值减一
- int index = --count;
- if (index == 0) { // 如果 conunt 为 0, 说明最后一个线程到大屏障
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();// 执行你传入的线程
- ranAction = true;
- nextGeneration();// 唤醒所有阻塞的线程, 同时重置 count 值与 Generation
- return 0;
- } finally {
- if (!ranAction)
- // 拦截线程没有正常执行, 唤醒所有线程, 同时重置 count 值, Generation 的 broken 置为 true
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- for (;;) {
- try {
- // 是否设置阻塞的超时时间
- if (!timed)
- // 释放当前锁
- trip.await();//false 表示不设置, 一直阻塞
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);//true 设置阻塞的超时时间
- } catch (InterruptedException IE) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- if (g.broken)
- throw new BrokenBarrierException();
- if (g != generation)
- return index;
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
dowait 方法的实现流程是很清晰的, 通过 ReentrantLock 的 Condition 接口与 count 值相互配合, 主要完成以下功能:
1, 当需要拦截的线程到达屏障点调用 await 方法后获取 ReentrantLock 锁, 保证线程安全;
2, 检查 count 值是否为 0, 判断是否是最后一个线程到达屏障, 如果是的话执行需要触发执行的线程, 调用 Condition 的 signalAll 方法唤醒所有阻塞的线程, 并重置 count 值与 Generation 类, 保障 CyclicBarrier 的重复可用;
3, 如果不是最后一个线程的话, 根据传入的参数调用 Condition 的 await 方法释放锁资源并进入阻塞等待, 直到被唤醒;
3,reset 方法
可以用来主动重置 CyclicBarrier 的状态
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- //generation.broken 设置为 true, 唤醒所有线程, count 值重置
- breakBarrier();
- nextGeneration();
- } finally {
- lock.unlock();
- }
- }
- private void nextGeneration() {
- // signal completion of last generation
- trip.signalAll();
- // set up next generation
- count = parties;
- generation = new Generation();
- }
- private void breakBarrier() {
- generation.broken = true;
- count = parties;
- trip.signalAll();
- }
breakBarrier() 与 nextGeneration(), 这两个方法的主要区别就在于前者会把 generation.broken 设置为 true, 也就是说如果调用 reset 方法主动重置 CyclicBarrier 类的状态, 当前正在使用 CyclicBarrier 类同步的线程都会被唤醒或抛出异常;
4,getNumberWaiting 方法
- public int getNumberWaiting() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return parties - count;
- } finally {
- lock.unlock();
- }
- }
很明显 getNumberWaiting 方法使用来获取当前已经运行至屏蔽点并阻塞等待的线程数量的;
三, 总结
通过上面分析可以看到 CyclicBarrier 的实现原理相对还是比较简单与清晰的, 主要是基于 ReentrantLock 与计数器相结合来实现多个线程的同步控制的. 以上就是对 CyclicBarrier 类的使用与内部实现进行的分析, 其中如有不足与不正确的地方还望指出与海涵.
来源: https://www.cnblogs.com/dafanjoy/p/11110575.html