CountDownLatch
CountDownLatch 可以用于一个或多个线程等待其他线程完成操作.
示例代码
- private static CountDownLatch c = new CountDownLatch(1);
- public static void main(String[] args) {
- System.out.println("main start");
- try {
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- System.out.println("thread1 start");
- Thread.sleep(3000);
- System.out.println("thread1 end");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- c.countDown();
- }
- }, "thread1").start();
- c.await();
- System.out.println("main end");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
代码中, 主线程启动一个 thread1 线程后, 调用 CountDownLatch 的 await() 方法被阻塞, 一直等到计数器减为 0.
CountDownLatch 的构造方法接收一个参数 n 作为计数器, 可以想象为一个门有 n 个门闩, 每次调用 countDown() 方法, 计数器就减 1, 即打开一个门闩. 当所有门闩都打开的时候, 调用 await() 方法的线程被唤醒.
await(long timeout, TimeUnit unit)
方法表示等待一定时间后, 当前不会再阻塞当前线程.
CyclicBarrier
CyclicBarrier 可以看作是可循环使用的屏障. 它的作用是, 让一组线程达到一个屏障时被阻塞, 直到最后一个线程到达屏障时, 屏障才会放行, 所有被阻塞的线程才可以继续运行.
示例代码
- private static CyclicBarrier cb = new CyclicBarrier(3);
- public static void main(String[] args) {
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- System.out.println("thread1 start");
- Thread.sleep(2000);
- cb.await();
- System.out.println("thread1 end");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- System.out.println("thread2 start");
- Thread.sleep(2000);
- cb.await();
- System.out.println("thread2 end");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- try {
- cb.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println("main");
- }
代码中, CyclicBarrier 的构造方法的参数表示屏障拦截的线程数, 每个线程通过 await() 通知 CyclicBarrier 已到达屏障, 然后此线程被阻塞.
另外, CyclicBarrier 还有一另种构造方法
CyclicBarrier(int parties, Runnable barrierAction)
, 表示在所有线程达到屏障时, 先执行 barrierAction, 方便处理更复杂的业务场景.
await(long timeout, TimeUnit unit)
方法表示等待一定时间后, 当前不会再阻塞当前线程.
与 CountDownLatch 的不同之处在于, CyclicBarrier 可以通过 reset() 方法重置.
Semaphore
Semaphore(信号量) 是用来控制同时访问资源的线程数.
Semaphore 可以用于做流量控制, 特别是公共资源有限的场景.
示例代码
- private static Semaphore sema = new Semaphore(5);
- public static void main(String[] args) {
- for(int i=0; i<30; i++) {
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- sema.acquire();
- System.out.println(Thread.currentThread().getName());
- Thread.sleep(3000);
- sema.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- }
代码中, 创建了 30 个线程, 但是同时只有 5 个线程可以并发执行.
线程先调用 acquire() 方法获取许可, 运行完成后再调用 release() 方法释放许可.
另外, Semaphore 还提供了 tryAcquire() 方法来试图获取许可, 获取成功则返回 true.
Exchanger
Exchanger 用于两个线程交换数据.
线程通过调用 exchange() 方法交换数据, 先调用 exchange() 方法的线程会一定阻塞, 等待第二个线程调用此方法; 当两个线都调用了 exchange() 方法时, 两个线程就可以实现交换数据了.
- private static Exchanger<String> ex = new Exchanger<String>();
- public static void main(String[] args) {
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- String result = ex.exchange("I am thread1...");
- System.out.println("thread1:" + result);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "thread1").start();
- new Thread(new Runnable(){
- @Override
- public void run() {
- try {
- String result = ex.exchange("I am thread2...");
- System.out.println("thread2:" + result);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "thread2").start();
- }
来源: https://www.cnblogs.com/lzj0616/p/8922157.html