本集概要:
怎样构建一个线程安全而又高效可伸缩的缓存?
怎样利用设计模式, 把缓存做成通用的工具?
除了 synchronize 和 volatile, 我们还能使用哪些工具来开发线程安全的代码?
前情回顾: Volatile 趣谈我是怎么把贝克汉姆的进球弄丢的
大雄开发的门线传感器在曼联和阿森纳的比赛中一鸣惊人, 越来越多的客户向公司订购这款软件......
糙版缓存
一天, 哆啦对大雄说, 大雄, 你看我们后台这个统计用户消费信息的报表, 每次统计都要去查数据库, 既消耗资源, 查询起来也慢, 你看看能不能给做一个缓存?
缓存? 这个简单呀, 缓存的原理其实就是一个键值对, 也就是 Map, 只要把用户的 id 作为 key, 把对应的统计结果作为 value, 放到 Map 里头缓存起来, 下次再来查询时, 先到 Map 去搜, 搜到了就直接返回, 搜不到, 再去查数据库计算, 这样就 ok 了
可以呀, 思路很清晰嘛, 一下子就讲出了一份设计文档
那是, 那我开始写代码啦!, 大雄泡了杯茶, 关掉 QQ 和 Outlook, 开始写起了代码
很快, 第一版代码出炉了, UserCostStatComputer(本文的示例代码, 可到 Github 下载):
- public class UserCostStatComputer {
- private final Map < String,
- BigInteger > cache = new HashMap();
- public synchronized BigInteger compute(String userId) throws InterruptedException {
- BigInteger result = cache.get(userId);
- if (result == null) {
- result = doCompute(userId);
- cache.put(userId, result);
- }
- return result;
- }
- private BigInteger doCompute(String userId) throws InterruptedException {
- // assume it cost a long time...
- TimeUnit.SECONDS.sleep(3);
- return new BigInteger(userId);
- }
- }
为了线程安全, 我给整个 compute 方法加上了 synchronized 修饰符, 保证每次只有一个线程可以进入 compute 方法, 大雄向哆啦介绍他的代码
哆啦看了看, 说, 大雄, 你这代码写的快是快, 但是有点糙啊
这
咱先不说性能, 就说你这设计, 你有没有考虑过, 要是还有其他地方也需要缓存, 你难道也把这份缓存的逻辑拷贝过去?
啊, 对呀, 总不能每次要用到缓存, 就在原先代码里加个 Map, 然后再写上 if 判断吧......
"小伙子觉悟不错, 好好想想, 怎样给原先没有缓存的统计函数, 加上缓存的功能?"
给原先没有某某功能的函数, 加上某某功能, 这话听着很熟悉啊...
通用缓存
大雄在大脑中检索着, 对了! 装饰模式! 哎, 之前学过的, 怎么到实际运用中就忘了呢!
利用设计模式, 大雄很快把代码进行了重构
首先, 定义一个 Computable 接口:
- public interface Computable<A, V> {
- V compute(A arg) throws Exception;
- }
接着, 每个要使用缓存功能的计算器, 都要去实现这个接口, 比如 UserCostStatComputer:
- public class UserCostStatComputer implements Computable<String, BigInteger> {
- @Override
- public BigInteger compute(String userId) throws Exception {
- // assume there is a long time compute...
- TimeUnit.SECONDS.sleep(3);
- return new BigInteger(userId);
- }
- }
现在这个 UserCostStatComputer 是没有缓存功能的, 没关系, 来一个装饰器就 OK 了, Memoizer1:
- public class Memoizer1<A, V> implements Computable<A, V> {
- private final Map<A, V> cache = new HashMap<A, V>();
- private final Computable<A, V> c;
- public Memoizer1(Computable<A, V> c) {
- this.c = c;
- }
- public synchronized V compute(A arg) throws Exception {
- V result = cache.get(arg);
- if (result == null) {
- result = c.compute(arg);
- cache.put(arg, result);
- }
- return result;
- }
- }
Memoizer1 即实现了 Computable 接口, 又持有一个 Computable 对象, 同时在自己的 compute 方法中, 调用了 Computable 对的 compute 方法, 在这个 compute 方法的外圈, 可以看到就是之前的缓存逻辑
那么要怎样使用 Memoizer1 给 UserCostStatComputer 装饰上缓存功能呢? 很简单:
- Computable computer = new Memoizer1(new UserCostStatComputer());
- computer.compute("1");
利用 Memoizer1 这个装饰器, 我们成功的给原先不具有缓存功能的 UserCostStatComputer 加上了缓存
后面要是有其他的计算器, 比如用户地区分布统计计算器用户年龄分布统计计算器, 我们都可以像 UserCostStatComputer 一样, 实现 Computable, 就可以使用装饰器, 装饰上缓存功能了, 大雄得意洋洋地说
不错, 小伙子悟性很强啊, 哆啦拍拍大雄的肩膀, 好了, 现在通过设计模式解决了代码通用性问题, 是时候看看你这个性能问题了
雇个保姆
性能问题? 我这个代码性能有问题么, 大雄翻着白眼
你看看, 你把 synchronize 加在了什么地方? 你加在了函数签名的位置, 意味着一次只能有一个线程能够进入 compute 方法, 这就导致了一个用户的 id 在计算的时候, 其他用户无法进行计算, 只能等到正在计算的用户计算完了, 才能进入 compute 方法
啊, 这对性能影响挺大的
没错, 换句话说, 你这个加锁的粒度太大了, 我们先把 synchronize 去掉, 看看有什么线程安全问题, 再来逐一解决
去掉 synchronize 的 compute 方法:
- public V compute(A arg) throws Exception {
- V result = cache.get(arg);
- if (result == null) {
- result = c.compute(arg);
- cache.put(arg, result);
- }
- return result;
- }
很明显, 由于你用的是线程不安全的 HashMap,cache.put(arg, result)这一行代码就有线程安全问题, 哆啦说
那给这行代码加锁? 像这样, 大雄说着在纸上写出草稿:
- synchronized(this) {
- cache.put(arg, result);
- }
哈哈, 你怎么那么喜欢用 synchronized, 这样可以是可以, 但是粒度还是太大了, Java 早就给你提供了一个叫 ConcurrentHashMap 的保姆, 我们只要把线程安全委托给它去处理就好了
啊, 我怎么又忘了...
说完, 大雄修改了代码:
- private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
- // private final Map<A, V> cache = new HashMap<A, V>();
未来任务
"现在再来看其他地方, 由于把 synchronize 去掉了, 现在不同的用户 id 可以同时计算, 但是也导致了相同用户 id 也会同时被计算, 也就是说同样的数据会被计算多次, 这样缓存就没意义了"
啊, 假设用户 id 为 1 的数据正在计算, 这个时候又来了一个线程, 也是计算用户 id 为 1 的, 如果这个线程可以知道, 当前有没有线程正在计算 id 为 1 的数据, 如果有, 就等待这个线程返回计算结果, 如果没有, 就自己计算, 那问题不就解决了? 可是这个逻辑挺复杂的, 好难实现啊
哈哈, 很简单, Java 早就提供了 Future 和 FutureTask, 来实现你说的这种功能, 来, 把键盘给我, 好久没写代码了, 哆啦说完, 马上敲起了键盘
终极版 Memoizer:
- public class Memoizer < A,
- V > implements Computable < A,
- V > {
- private final ConcurrentMap < A,
- Future < V >> cache = new ConcurrentHashMap < A,
- Future < V >> ();
- private final Computable < A,
- V > c;
- public Memoizer(Computable < A, V > c) {
- this.c = c;
- }
- public V compute(final A arg) throws Exception {
- // use loop for retry when CancellationException
- while (true) {
- Future < V > f = cache.get(arg);
- if (f == null) {
- Callable < V > callable = new Callable < V > () {
- public V call() throws Exception {
- return c.compute(arg);
- }
- };
- FutureTask < V > ft = new FutureTask < V > (callable);
- // use putIfAbsent to avoid multi thread compute the same value
- f = cache.putIfAbsent(arg, ft);
- if (f == null) {
- f = ft;
- ft.run();
- }
- }
- try {
- return f.get();
- } catch(CancellationException e) {
- // remove cache and go into next loop to retry
- cache.remove(arg, f);
- } catch(ExecutionException e) {
- // throw it and then end
- e.printStackTrace();
- throw e;
- }
- }
- }
- }
简单解释一下代码逻辑, 线程进了 compute 方法后:
首先去存放 future 的 Map 里, 搜索有没有已经其他线程已经创建好的 future
如果有(不等于 null), 那就调用 future 的 get 方法
如果已经计算完成, get 方法会立刻返回计算结果
否则, get 方法会阻塞, 直到结果计算出来再将其返回
如果没有已经创建的 future, 那么就自己创建 future, 进行计算
有几个小点要单独解释的:
1 为什么要使用 putIfAbsent
这里之所以用 putIfAbsent 而不用 put, 原因很简单
如果有两个都是计算 userID=1 的线程, 同时调用 put 方法, 那么返回的结果都不会为 null, 后面还是会创建两个任务去计算相同的值
而 putIfAbsent, 当 map 里面已经有对应的值了, 则会返回已有的值, 否则, 会返回 null, 这样就可以解决相同的值计算两次的问题
2 为什么要 while (true)
这是因为 future 的 get 方法会由于线程被中断而抛出 CancellationException
我们对于 CancellationException 的处理是 cache.remove(arg, f);, 也就是把缓存清理掉, 然后进入下一个循环, 重新计算, 直到计算成功, 或者抛出 ExecutionException
总结
这篇文章中, 我们先是用装饰模式, 改良了代码的设计, 接着通过使用并发容器 ConcurrentHashMap 以及同步工具 Future, 取代了原先的 synchronize, 实现了线程安全性的委托
Java 并发编程实践中, 将委托称为实现线程安全的一个最有效策略委托其实就是把原来通过使用 synchronize 和 volatile 来实现的线程安全, 委托给 Java 提供的一些基础构建模块 (当然你也可以写自己的基础构建模块) 去实现, 这些基础构建模块包括:
同步容器: 比如 Vector, 同步容器的原理大多就是 synchronize, 所以用的不多;
并发容器: 比如本文用到的 ConcurrentHashMap, 当然还有 CopyonWriteArrayListLinkedBlockingQueue 等, 根据你的需求使用不同数据结构的并发容器;
同步工具类: 比如本文用到的 Future 和 FutureTask, 还有闭锁 (Latches) 信号量 (Semaphores) 栅栏 (Barriers) 等
这些基础构建模块, 在加上之前所讲的 synchronize 和 volatile, 就形成了 Java 并发的基础知识:
如何写出线程不安全的代码
如何用一句话介绍 synchronize 的内涵
Volatile 趣谈我是怎么把贝克汉姆的进球弄丢的
其实这也就是 Java 并发编程实践第一部分的迷你版, 总结成一句话就是:
要想写出线程安全的代码, 我们需要用到使用 synchronizevolatile, 或者将线程安全委托给基础构建模块
当然, 在实际应用中, 只有这些基础知识是不够的, 最简单的例子, 你不可能每次请求过来都创建线程, 这会吃光内存的所以, 你需要一个线程池, 来限制线程的数量, 这也就引出了 Java 并发编程实践的第二部分, 结构化并发应用程序(Structuring Concurrent Applications), 听名字确实不知道想讲什么, 所以我也将把它啃下来, 然后像第一部分一样, 用尽可能通俗易懂的语言和大家分享学习心得
参考
Java 并发编程实践
Future (Java Platform SE 7 )
来源: http://www.jianshu.com/p/0f13302ec311