项目中经常用到线程池, 1000 个人有 1000 钟创建线程池的方式, 先背书一段阿里 ajva 开发规范上的话:
[强制] 线程池不允许使用 Executors 去创建, 而是通过 ThreadPoolExecutor 的方式, 这样
的处理方式让写的同学更加明确线程池的运行规则, 规避资源耗尽的风险.
说明: Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE, 可能会堆积大量的请求, 从而导致 OOM.
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程, 从而导致 OOM.
今天来说说 springboot 中通过线程池的使用
springboot 中通过线程池的使用
先创建一个自定义线程池
先创建一个自定义线程池, 继承 spring 并发包里面提供 ThreadPoolTaskExecutor, 演示用, 子类就不加别的逻辑了
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.util.concurrent.ListenableFuture;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Future;
- public class TestThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
- @Override
- public void execute(Runnable task) {
- super.execute(task);
- }
- @Override
- public void execute(Runnable task, long startTimeout) {
- super.execute(task, startTimeout);
- }
- @Override
- public Future<?> submit(Runnable task) {
- return super.submit(task);
- }
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return super.submit(task);
- }
- @Override
- public ListenableFuture<?> submitListenable(Runnable task) {
- return super.submitListenable(task);
- }
- @Override
- public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
- return super.submitListenable(task);
- }
- }
在创建一个线程 config 类, 注入到 spring 容器里面
创建一个 config 类, 在里面创建我们自定义的线程池, 在里面设置线程池各种参数, 具体参数就说了, 原来的文章里面
- @Configuration
- @EnableAsync
- @Slf4j
- public class ExecutorConfig {
- @Bean
- public Executor asyncServiceExecutor() {
- log.info("start asyncServiceExecutor");
- ThreadPoolTaskExecutor executor = new TestThreadPoolTaskExecutor();
- // 配置核心线程数
- executor.setCorePoolSize(5);
- // 配置最大线程数
- executor.setMaxPoolSize(200);
- // 配置队列大小
- executor.setQueueCapacity(10000);
- // 配置线程池中的线程的名称前缀
- executor.setThreadNamePrefix("test-thread-");
- // rejection-policy: 当 pool 已经达到 max size 的时候, 如何处理新任务
- // CALLER_RUNS: 不在新线程中执行任务, 而是有调用者所在的线程来执行
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- // 执行初始化
- executor.initialize();
- return executor;
- }
- }
最后就是来通过注解使用线程池了
通过 @Async 注解, 放到方法上, 就可以把方法加入到线程池中去执行, 就和线程类的 run 方法一样了.
@Async("asyncServiceExecutor")
@Async("asyncServiceExecutor"), 这个里面的值, 就是我们前面注册到容器里面的线程池, 来看一眼
容器中的线程
看一眼 spring 容器视图, 这个线程池对象时我们注册线程 config 类时候注入到容器里面的, 让我们来测试两个方法吧, 一个没有返回的, 一个有返回值, 直接上代码
- @Component
- public class TestTask {
- @Async("asyncServiceExecutor")
- public void task1(){
- System.out.println("测试一个没有返回值的任务 1");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("测试一个没有返回值的任务 2");
- }
- @Async("asyncServiceExecutor")
- public Future<String> task2(){
- System.out.println("测试一个有返回值的任务 1");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("测试一个有返回值的任务 2");
- // 返回当前线程的线程名称
- return new AsyncResult<>(Thread.currentThread().getName());
- }
- }
这个任务类中两个任务, 一个没有返回值的任务 task1, 就相当于 Runable 线程, 一个有返回值的任务 task2, 相当于 callable 线程, 我这边用了 AsyncResult 做了返回值, 看一眼 AsyncResult, 也是 spring 的并发包里面的 AsyncResult
AsyncResult
他实现了 ListenableFuture 接口, 类关系如下:
AsyncResult 类关系
来看一下 task1 没有返回值任务的测试结果
测试类如下:
- @RunWith( SpringRunner.class)
- @SpringBootTest(classes = BootStartApplication.class)
- public class TestTaskCase {
- @Autowired
- private TestTask testTask;
- @Test
- public void testVoid(){
- // 测试没有返回值的任务
- testTask.task1();
- try {
- System.out.println("正在测试没有返回值的任务 task1");
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
运行结果如下:
运行结果
看到结果了, 就是把一个 runable 线程加入到了线程中执行了
在来看一下 task2 有返回值任务的测试结果
测试类如下:
- @RunWith( SpringRunner.class)
- @SpringBootTest(classes = BootStartApplication.class)
- public class TestTaskCase {
- @Autowired
- private TestTask testTask;
- @Test
- public void testVoid(){
- Future<String> future = testTask.task2();
- System.out.println("正在测试有返回值的任务 task2");
- while (true) {
- //while true 中判断任务是否完成
- if (future.isDone() ) {
- System.out.println("task2 任务已经完成");
- try {
- System.out.println("task2 任务已经完成, 返回值为:" + future.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- break;
- }
- System.out.println("task2 任务没有完成正在等待");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
这里使用了 Future 的 isDone() 方法来判断任务是否已经完成, 看下运行结果:
task2 的运行结果
就是吧一个 callable 线程加入到了线程中执行
使用说完, 再来说说给线程池添加监控吧
线程池在用的时候, 有时候会因为线程数量的一些问题, 引发一些线上问题, 所以对线程池添加一些监控是很有必要的, 记得我们自定义的那个线程吧, 我那会说演示就不添加逻辑, 我们需要的监控就要加到那里面去了, 直接上代码吧
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.util.concurrent.ListenableFuture;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Future;
- import java.util.concurrent.ThreadPoolExecutor;
- public class TestThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
- private void showThreadPoolInfo(String method) {
- ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
- if (null == threadPoolExecutor) {
- return;
- }
- // 这里可以用使用存储到在线库, 打日志, 存缓存等方式把监控数据记录下来
- // 用到线程池里面具体哪个方法执行的线程任务
- System.out.println("使用到的方法:" + method);
- // 是我们配置在 config 里面线程前缀的名字 (用于多个线程池中区分线程池)
- System.out.println("线程的前缀名字:" +
- this.getThreadNamePrefix());
- System.out.println("线程池中的任务数量:" +
- threadPoolExecutor.getTaskCount());
- System.out.println("线程池中已经完成的任务数:" +
- threadPoolExecutor.getCompletedTaskCount());
- System.out.println("线程池中的线程数:" +
- threadPoolExecutor.getActiveCount());
- System.out.println("线程池中队列的长度:" +
- threadPoolExecutor.getQueue().size());
- }
- @Override
- public void execute(Runnable task) {
- showThreadPoolInfo("executeRunnable");
- super.execute(task);
- }
- @Override
- public void execute(Runnable task, long startTimeout) {
- showThreadPoolInfo("executeRunnableStartTimeout");
- super.execute(task, startTimeout);
- }
- @Override
- public Future<?> submit(Runnable task) {
- showThreadPoolInfo("submitRunnable");
- return super.submit(task);
- }
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- showThreadPoolInfo("submitCallable");
- return super.submit(task);
- }
- @Override
- public ListenableFuture<?> submitListenable(Runnable task) {
- showThreadPoolInfo("submitListenableRunnable");
- return super.submitListenable(task);
- }
- @Override
- public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
- showThreadPoolInfo("submitListenableCallable");
- return super.submitListenable(task);
- }
- }
这里主要用到了线程中的任务数, 线程数等属性, 这个属性可以自己按需要来添加, 添加监控的方式可以使用存储到数据库, 缓存, 打日志接 flume,kafka 等方式, 加上了监控日志, 我们再来看下测试类运行的结果:
监控的结果
这样每次把任务添加到线程池中都会把这些线程池属性进行实时监控, 可以实时关注应用的运行情况, 也可以出现线上问题时快速定位查询
springboot 通过注解使用线程池就为大家说到这里, 欢迎大家来交流, 指出文中一些说错的地方, 让我加深认识.
谢谢大家!
来源: http://www.jianshu.com/p/03e4dc249bfb