Java 8 并行流 (parallel stream) 采用共享线程池, 对性能造成了严重影响. 可以包装流来调用自己的线程池解决性能问题.
问题
Java 8 的并行流可以让我们相对轻松地执行并行任务.
myList.parallelStream.map(obj -> longRunningOperation())
但是这样存在一个严重的问题: 在 JVM 的后台, 使用通用的 fork/join
池来完成上述功能, 该池是所有并行流共享的. 默认情况, fork/join
池会为每个处理器分配一个线程. 假设你有一台 16 核的机器, 这样你就只能创建 16 个线程. 对 CPU
密集型的任务来说, 这样是有意义的, 因为你的机器确实只能执行 16 个线程. 但是真实情况下, 不是所有的任务都是 CPU 密集型的. 例如:
- myList.parallelStream
- .map(this::retrieveFromA)
- .map(this::processUsingB)
- .forEach(this::saveToC)
- myList.parallelStream
- .map(this::retrieveFromD)
- .map(this::processUsingE)
- .forEach(this::saveToD)
这两个流很大程度上是受限于 IO 操作, 所以会等待其他系统. 但这两个流使用相同的 (小) 线程池, 因此会相互等待而被阻塞. 这个非常不好, 可以改进. 我们以一个流为例:
- final List<Integer> firstRange = buildIntRange();
- firstRange.parallelStream().forEach((number) -> {
- try {
- // do something slow
- Thread.sleep(5);
- } catch (InterruptedException e) { }
- });
完整的代码可以在 gist 上查看.
在执行期间, 我获取了一份线程 dump 的文件. 这是相关的线程(在我的 Macbook 上):
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-2
- ForkJoinPool.commonPool-worker-3
- ForkJoinPool.commonPool-worker-4
现在, 我要并行的执行这两个并行流
- Runnable firstTask = () - >{
- firstRange.parallelStream().forEach((number) - >{
- try {
- // do something slow
- Thread.sleep(5);
- } catch(InterruptedException e) {}
- });
- };
- Runnable secondTask = () - >{
- secondRange.parallelStream().forEach((number) - >{
- try {
- // do something slow
- Thread.sleep(5);
- } catch(InterruptedException e) {}
- });
- };
- // run threads
完整的代码可以在 gist 上查看.
这次我们再看一下线程 dump 文件:
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-2
- ForkJoinPool.commonPool-worker-3
- ForkJoinPool.commonPool-worker-4
正如你所见, 结果是一样的. 我们只使用了 4 个线程.
一种变通方案
正如我所提到的, JVM 后台使用 fork/join 池, 在 ForkJoinTask 的文档中, 我们可以看到:
如果合适, 安排一个异步执行的任务到当前正在运行的池中. 如果任务不在 inForkJoinPool()中, 也可以调用 ForkJoinPool.commonPool()获取新的池来执行.
让我试一试......
- ForkJoinPool forkJoinPool = new ForkJoinPool(3);
- forkJoinPool.submit(() -> {
- firstRange.parallelStream().forEach((number) -> {
- try {
- Thread.sleep(5);
- } catch (InterruptedException e) { }
- });
- });
- ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
- forkJoinPool2.submit(() -> {
- secondRange.parallelStream().forEach((number) -> {
- try {
- Thread.sleep(5);
- } catch (InterruptedException e) {
- }
- });
- });
完整的代码可以在 gist 上查看.
现在, 我们再次查看线程池:
- ForkJoinPool-1-worker-1
- ForkJoinPool-1-worker-2
- ForkJoinPool-1-worker-3
- ForkJoinPool-1-worker-4
- ForkJoinPool-2-worker-1
- ForkJoinPool-2-worker-2
- ForkJoinPool-2-worker-3
- ForkJoinPool-1-worker-4
因为我们创建自己的线程池, 所以可以避免共享线程池, 如果有需要, 甚至可以分配比处理机数量更多的线程.
ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);
来源: https://juejin.im/post/5abc9d5ff265da23994e9c83