我们在 67 节和 68 节实现了线程的一些基本协作机制,那是利用基本的 wait/notify 实现的,我们提到,Java 并发包中有一些专门的同步工具类,本节,我们就来探讨它们。
我们要探讨的工具类包括:
与 71 节介绍的显示锁和 72 节介绍的显示条件类似,它们也都是基于 AQS 实现的,AQS 可参看 71 节。在一些特定的同步协作场景中,相比使用最基本的 wait/notify,显示锁 / 条件,它们更为方便,效率更高。下面,我们就来探讨它们的基本概念、用法、用途和基本原理。
读写锁 ReentrantReadWriteLock
之前章节我们介绍了两种锁,66 节介绍了 synchronized,71 节介绍了显示锁 ReentrantLock。对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。
怎么让读操作能够并行,又不影响一致性呢?答案是使用读写锁。在 Java 并发包中,接口 ReadWriteLock 表示读写锁,主要实现类是可重入读写锁 ReentrantReadWriteLock。
ReadWriteLock 的定义为:
- public interface ReadWriteLock {
- Lock readLock();
- Lock writeLock();
- }
通过一个 ReadWriteLock 产生两个锁,一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。
需要注意的是,只有 "读 - 读" 操作是可以并行的,"读 - 写" 和 "写 - 写" 都不可以。只有一个线程可以进行写操作,在获取写锁时,只有没有任何线程持有任何锁才可以获取到,在持有写锁时,其他任何线程都获取不到任何锁。在没有其他线程持有写锁的情况下,多个线程可以获取和持有读锁。
ReentrantReadWriteLock 是可重入的读写锁,它有两个构造方法,如下所示:
- public ReentrantLock()
- publicReentrantLock(booleanfair)
fire 表示是否公平,不传递的话是 false,含义与显式锁一节介绍的类似,就不赘述了。
我们看个简单的例子,使用 ReentrantReadWriteLock 实现一个缓存类 MyCache,代码如下:
- public class MyCache {
- privateMap map = newHashMap<>();
- privateReentrantReadWriteLock readWriteLock =new ReentrantReadWriteLock();
- privateLock readLock = readWriteLock.readLock();
- privateLock writeLock = readWriteLock.writeLock();
- public Object get(String key) {
- readLock.lock();
- try {
- return map.get(key);
- } finally {
- readLock.unlock();
- }
- }
- public Object put(String key, Object value) {
- writeLock.lock();
- try {
- return map.put(key, value);
- } finally {
- writeLock.unlock();
- }
- }
- public void clear() {
- writeLock.lock();
- try {
- map.clear();
- } finally {
- writeLock.unlock();
- }
- }
- }
代码比较简单,就不赘述了。
读写锁是怎么实现的呢?读锁和写锁看上去是两个锁,它们是怎么协调的?具体实现比较复杂,我们简述下其思路。
内部,它们使用同一个整数变量表示锁的状态,16 位给读锁用,16 位给写锁用,使用一个变量便于进行 CAS 操作,锁的等待队列其实也只有一个。
写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。
读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了 0,如果是,唤醒等待队列中的下一个线程。
信号量 Semaphore
之前介绍的锁都是限制只有一个线程可以同时访问一个资源。现实中,资源往往有多个,但每个同时只能被一个线程访问,比如,饭店的饭桌、火车上的卫生间。有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。还有的情况,与软件的授权和计费有关,对不同等级的账户,限制不同的最大并发访问数。
信号量类 Semaphore 就是用来解决这类问题的,它可以限制对资源的并发访问数,它有两个构造方法:
- publicSemaphore(int permits)
- publicSemaphore(intpermits,booleanfair)
fire 表示公平,含义与之前介绍的是类似的,permits 表示许可数量。
Semaphore 的方法与锁是类似的,主要的方法有两类,获取许可和释放许可,主要方法有:
- //阻塞获取许可
- public voidacquire()throws InterruptedException
- //阻塞获取许可,不响应中断
- public void acquireUninterruptibly()
- //批量获取多个许可
- public voidacquire(intpermits)throws InterruptedException
- public voidacquireUninterruptibly(int permits)
- //尝试获取
- public boolean tryAcquire()
- //限定等待时间获取
- public booleantryAcquire(intpermits,longtimeout, TimeUnit unit)throws InterruptedException
- //释放许可
- public voidrelease()
我们看个简单的示例,限制并发访问的用户数不超过 100,代码如下:
- public class AccessControlService {
- public static classConcurrentLimitExceptionextends RuntimeException {
- private static final longserialVersionUID = 1L;
- }
- private static final intMAX_PERMITS = 100;
- privateSemaphore permits =newSemaphore(MAX_PERMITS,true);
- public boolean login(String name, String password) {
- if(!permits.tryAcquire()) {
- // 同时登录用户数超过限制
- throw new ConcurrentLimitException();
- }
- // ..其他验证
- return true;
- }
- public void logout(String name) {
- permits.release();
- }
- }
代码比较简单,就不赘述了。
需要说明的是,如果我们将 permits 的值设为 1,你可能会认为它就变成了一般的锁,不过,它与一般的锁是不同的。一般锁只能由持有锁的线程释放,而 Semaphore 表示的只是一个许可数,任意线程都可以调用其 release 方法。主要的锁实现类 ReentrantLock 是可重入的,而 Semaphore 不是,每一次的 acquire 调用都会消耗一个许可,比如,看下面代码段:
- Semaphore permits =newSemaphore(1);
- permits.acquire();
- permits.acquire();
- System.out.println("acquired");
程序会阻塞在第二个 acquire 调用,永远都不会输出 "acquired"。
信号量的基本原理比较简单,也是基于 AQS 实现的,permits 表示共享的锁个数,acquire 方法就是检查锁个数是否大于 0,大于则减一,获取成功,否则就等待,release 就是将锁个数加一,唤醒第一个等待的线程。
倒计时门栓 CountDownLatch
我们在 68 节使用 wait/notify 实现了一个简单的门栓 MyLatch,我们提到,Java 并发包中已经提供了类似工具,就是 CountDownLatch。它的大概含义是指,它相当于是一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为 0 后,门栓打开,等待的所有线程都可以通过,它是一次性的,打开后就不能再关上了。
CountDownLatch 里有一个计数,这个计数通过构造方法进行传递:
- publicCountDownLatch(intcount)
多个线程可以基于这个计数进行协作,它的主要方法有:
- public voidawait()throws InterruptedException
- public booleanawait(longtimeout, TimeUnit unit)throws InterruptedException
- public voidcountDown()
await() 检查计数是否为 0,如果大于 0,就等待,await() 可以被中断,也可以设置最长等待时间。countDown 检查计数,如果已经为 0,直接返回,否则减少计数,如果新的计数变为 0,则唤醒所有等待的线程。
在 68 节,我们介绍了门栓的两种应用场景,一种是同时开始,另一种是主从协作。它们都有两类线程,互相需要同步,我们使用 CountDownLatch 重新演示下。
在同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为 1,运动员线程调用 await,主线程调用 countDown,示例代码如下:
- public class RacerWithCountDownLatch {
- static classRacerextends Thread {
- CountDownLatch latch;
- public Racer(CountDownLatch latch) {
- this.latch = latch;
- }
- @Override
- public void run() {
- try {
- this.latch.await();
- System.out.println(getName()
- + " start run "+System.currentTimeMillis());
- } catch (InterruptedException e) {
- }
- }
- }
- public static voidmain(String[] args)throws InterruptedException {
- intnum = 10;
- CountDownLatch latch =newCountDownLatch(1);
- Thread[] racers =new Thread[num];
- for(inti = 0; i < num; i++) {
- racers[i] =new Racer(latch);
- racers[i].start();
- }
- Thread.sleep(1000);
- latch.countDown();
- }
- }
代码比较简单,就不赘述了。在主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用 countDown,主线程调用 await 进行等待,示例代码如下:
- public class MasterWorkerDemo {
- static classWorkerextends Thread {
- CountDownLatch latch;
- public Worker(CountDownLatch latch) {
- this.latch = latch;
- }
- @Override
- public void run() {
- try {
- // simulate working on taskThread.sleep((int) (Math.random() * 1000));
- // simulate exception
- if(Math.random() < 0.02) {
- throw newRuntimeException("bad luck");
- }
- } catch (InterruptedException e) {
- } finally {
- this.latch.countDown();
- }
- }
- }
- public static voidmain(String[] args)throws InterruptedException {
- intworkerNum = 100;
- CountDownLatch latch =new CountDownLatch(workerNum);
- Worker[] workers =new Worker[workerNum];
- for(inti = 0; i < workerNum; i++) {
- workers[i] =new Worker(latch);
- workers[i].start();
- }
- latch.await();
- System.out.println("collect worker results");
- }
- }
需要强调的是,在这里,countDown 的调用应该放到 finally 语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从 await 调用中返回。
循环栅栏 CyclicBarrier
我们在 68 节使用 wait/notify 实现了一个简单的集合点 AssemblePoint,我们提到,Java 并发包中已经提供了类似工具,就是 CyclicBarrier。它的大概含义是指,它相当于是一个栅栏,所有线程在到达该栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。
CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
与 CountDownLatch 类似,它也有一个数字,但表示的是参与的线程个数,这个数字通过构造方法进行传递:
- publicCyclicBarrier(intparties)
它还有一个构造方法,接受一个 Runnable 参数,如下所示:
- publicCyclicBarrier(intparties, Runnable barrierAction)
这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。
CyclicBarrier 的主要方法就是 await:
- public int await() throws InterruptedException,
- BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException,
- BrokenBarrierException,
- TimeoutException
await 在等待其他线程到达栅栏,调用 await 后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。
await 可以被中断,可以限定最长等待时间,中断或超时后会抛出异常。需要说明的是异常 BrokenBarrierException,它表示栅栏被破坏了,什么意思呢?在 CyclicBarrier 中,参与的线程是互相影响的,只要其中一个线程在调用 await 时被中断了,或者超时了,栅栏就会被破坏,此外,如果栅栏动作抛出了异常,栅栏也会被破坏,被破坏后,所有在调用 await 的线程就会退出,抛出 BrokenBarrierException。
我们看一个简单的例子,多个游客线程分别在集合点 A 和 B 同步:
- public class CyclicBarrierDemo {
- static classTouristextends Thread {
- CyclicBarrier barrier;
- public Tourist(CyclicBarrier barrier) {
- this.barrier = barrier;
- }
- @Override
- public void run() {
- try {
- // 模拟先各自独立运行Thread.sleep((int) (Math.random() * 1000));
- // 集合点A
- barrier.await();
- System.out.println(this.getName() + " arrived A "
- + System.currentTimeMillis());
- // 集合后模拟再各自独立运行Thread.sleep((int) (Math.random() * 1000));
- // 集合点B
- barrier.await();
- System.out.println(this.getName() + " arrived B "
- + System.currentTimeMillis());
- } catch (InterruptedException e) {
- } catch (BrokenBarrierException e) {
- }
- }
- }
- public static void main(String[] args) {
- intnum = 3;
- Tourist[] threads =new Tourist[num];
- CyclicBarrier barrier =newCyclicBarrier(num,new Runnable() {
- @Override
- public void run() {
- System.out.println("all arrived " + System.currentTimeMillis()
- + " executed by " + Thread.currentThread().getName());
- }
- });
- for(inti = 0; i < num; i++) {
- threads[i] =new Tourist(barrier);
- threads[i].start();
- }
- }
- }
在我的电脑上的一次输出为:
- all arrived 1490053578552 executed by Thread-1
- Thread-1 arrived A 1490053578555
- Thread-2 arrived A 1490053578555
- Thread-0 arrived A 1490053578555
- all arrived 1490053578889 executed by Thread-0
- Thread-0 arrived B 1490053578890
- Thread-2 arrived B 1490053578890
- Thread-1 arrived B 1490053578890
多个线程到达 A 和 B 的时间是一样的,使用 CyclicBarrier,达到了重复同步的目的。
CyclicBarrier 与 CountDownLatch 可能容易混淆,我们强调下其区别:
小结
本节介绍了 Java 并发包中的一些同步协作工具:
实际中,应该优先使用这些工具,而不是手工用 wait/notify 或者显示锁 / 条件同步。
下一节,我们来探讨一个特殊的概念,线程局部变量 ThreadLocal,它是什么呢?
(与其他章节一样,本节所有代码位于 https://github.com/swiftma/program-logic)
----------------
未完待续,查看最新文章,敬请关注微信公众号 "老马说编程"(扫描下方二维码),从入门到高级,深入浅出,老马和你一起探索 Java 编程及计算机技术的本质。用心原创,保留所有版权。
来源: http://www.cnblogs.com/swiftma/p/6725095.html