1. 前言
随着数据量和调用量的增长, 用户对应用的性能要求越来越高. 另外, 在实际的服务中, 还存在着这样的场景: 系统在组装数据的时候, 对于数据的各个部分的获取实际上是没有前后依赖关系的. 这些问题都很容易让我们想到将这些同步调用全都改造为异步调用. 不过自己实现起来比较麻烦, 还容易出错. 好在 Spring 已经提供了该问题的解决方案, 而且使用起来十分方便.
2.Spring 异步执行框架的使用方法
2.1 maven 依赖
Spring 异步执行框架的相关 bean 包含在 spring-context 和 spring-aop 模块中, 所以只要引入上述的模块即可.
2.2 开启异步任务支持
Spring 提供了 @EnableAsync 的注解来标注是否启用异步任务支持. 使用方式如下:
- @Configuration
- @EnableAsync
- public class AppConfig {
- }
Note: @EnableAsync 必须要配合 @Configuration 使用, 否则会不生效
2.3 方法标记为异步调用
将同步方法的调用改为异步调用也很简单. 对于返回值为 void 的方法, 直接加上 @Async 注解即可. 对于有返回值的方法, 除了加上上述的注解外, 还需要将方法的返回值修改为 Future 类型和将返回值用 AsyncResult 包装起来. 如下所示:
- // 无返回值的方法直接加上注解即可.
- @Async
- public void method1() {
- ...
- }
- // 有返回值的方法需要修改返回值.
- @Async
- public Future<Object> method2() {
- ...
- return new AsyncResult<>(Object);
- }
2.4 方法调用
对于 void 的方法, 和普通的调用没有任何区别. 对于非 void 的方法, 由于返回值是 Future 类型, 所以需要用 get() 方法来获取返回值. 如下所示:
- public static void main(String[] args) {
- service.method1();
- Future<Object> futureResult = service.method2();
- Object result;
- try {
- result = futureResult.get();
- } catch (InterruptedException | ExecutionException e) {
- ...
- }
- }
3. 原理简介
这块的源码的逻辑还是比较简单的, 主要是 Spring 帮我们生成并管理了一个线程池, 然后方法调用的时候使用动态代理将方法的执行包装为 Callable 类型并提交到线程池中执行. 核心的实现逻辑在 AsyncExecutionInterceptor 类的 invoke() 方法中. 如下所示:
- @Override
- public Object invoke(final MethodInvocation invocation) throws Throwable {
- Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
- Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
- final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
- AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
- if (executor == null) {
- throw new IllegalStateException(
- "No executor specified and no default executor set on AsyncExecutionInterceptor either");
- }
- Callable<Object> task = new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- try {
- Object result = invocation.proceed();
- if (result instanceof Future) {
- return ((Future<?>) result).get();
- }
- }
- catch (ExecutionException ex) {
- handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
- }
- catch (Throwable ex) {
- handleError(ex, userDeclaredMethod, invocation.getArguments());
- }
- return null;
- }
- };
- return doSubmit(task, executor, invocation.getMethod().getReturnType());
- }
4. 自定义 taskExecutor 及异常处理
4.1 自定义 taskExecutor
Spring 查找 TaskExecutor 逻辑是:
1. 如果 Spring context 中存在唯一的 TaskExecutor bean, 那么就使用这个 bean.
2. 如果 1 中的 bean 不存在, 那么就会查找是否存在一个 beanName 为 taskExecutor 且是 java.util.concurrent.Executor 实例的 bean, 有则使用这个 bean.
3. 如果 1,2 中的都不存在, 那么 Spring 就会直接使用默认的 Executor, 即 SimpleAsyncTaskExecutor.
在第 2 节的实例中, 我们直接使用的是 Spring 默认的 TaskExecutor. 但是对于每一个新的任务, SimpleAysncTaskExecutor 都是直接创建新的线程来执行, 所以无法重用线程. 具体的执行的代码如下:
- @Override
- public void execute(Runnable task, long startTimeout) {
- Assert.notNull(task, "Runnable must not be null");
- Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
- if (isThrottleActive() && startTimeout> TIMEOUT_IMMEDIATE) {
- this.concurrencyThrottle.beforeAccess();
- doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
- }
- else {
- doExecute(taskToUse);
- }
- }
- protected void doExecute(Runnable task) {
- Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
- thread.start();
- }
所以我们在使用的时候, 最好是使用自定义的 TaskExecutor. 结合上面描述的 Spring 查找 TaskExecutor 的逻辑, 最简单的自定义的方法是使用 @Bean 注解. 示例如下:
- // ThreadPoolTaskExecutor 的配置基本等同于线程池
- @Bean("taskExecutor")
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
- taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
- taskExecutor.setQueueCapacity(CORE_POOL_SIZE * 10);
- taskExecutor.setThreadNamePrefix("wssys-async-task-thread-pool");
- taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
- taskExecutor.setAwaitTerminationSeconds(60 * 10);
- taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
- return taskExecutor;
- }
另外, Spring 还提供了一个 AsyncConfigurer 接口, 通过实现该接口, 除了可以实现自定义 Executor 以外, 还可以自定义异常的处理. 代码如下:
- @Configuration
- @Slf4j
- public class AsyncConfig implements AsyncConfigurer {
- private static final int MAX_POOL_SIZE = 50;
- private static final int CORE_POOL_SIZE = 20;
- @Override
- @Bean("taskExecutor")
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
- taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
- taskExecutor.setQueueCapacity(CORE_POOL_SIZE * 10);
- taskExecutor.setThreadNamePrefix("async-task-thread-pool");
- taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
- taskExecutor.setAwaitTerminationSeconds(60 * 10);
- taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
- return taskExecutor;
- }
- @Override
- public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
- return (ex, method, params) -> log.error("invoke async method occurs error. method: {}, params: {}",
- method.getName(), JSON.toJSONString(params), ex);
- }
- }
Note:
Spring 还提供了一个 AsyncConfigurerSupport 类, 该类也实现了 AsyncConfigurer 接口, 且方法的返回值都是 null, 旨在提供一个方便的实现.
当 getAsyncExecutor() 方法返回 null 的时候, Spring 会使用默认的处理器 (强烈不推荐).
当 getAsyncUncaughtExceptionHandler() 返回 null 的时候, Spring 会使用 SimpleAsyncUncaughtExceptionHandler 来处理异常, 该类会打印出异常的信息.
所以对该类的使用, 最佳的实践是继承该类, 并且覆盖实现 getAsyncExecutor() 方法.
4.2 异常处理
Spring 异步框架对异常的处理如下所示:
- // 所在类: AsyncExecutionAspectSupport
- protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
- if (Future.class.isAssignableFrom(method.getReturnType())) {
- ReflectionUtils.rethrowException(ex);
- }
- else {
- // Could not transmit the exception to the caller with default executor
- try {
- this.exceptionHandler.handleUncaughtException(ex, method, params);
- }
- catch (Throwable ex2) {
- logger.error("Exception handler for async method'" + method.toGenericString() +
- "'threw unexpected exception itself", ex2);
- }
- }
- }
从代码来看, 如果返回值是 Future 类型, 那么直接将异常抛出. 如果返回值不是 Future 类型 (基本上包含的是所有返回值 void 类型的方法, 因为如果方法有返回值, 必须要用 Future 包装起来), 那么会调用 handleUncaughtException 方法来处理异常.
注意: 在 handleUncaughtException() 方法中抛出的任何异常, 都会被 Spring Catch 住, 所以没有办法将 void 的方法再次抛出并传播到上层调用方的!!!
关于 Spring 这个设计的缘由我的理解是: 既然方法的返回值是 void, 就说明调用方不关心方法执行是否成功, 所以也就没有必要去处理方法抛出的异常. 如果需要关心异步方法是否成功, 那么返回值改为 boolean 就可以了.
4.4 最佳实践的建议
@Async 可以指定方法执行的 Executor, 用法:@Async("MyTaskExecutor"). 推荐指定 Executor, 这样可以避免因为 Executor 配置没有生效而 Spring 使用默认的 Executor 的问题.
实现接口 AsyncConfigurer 的时候, 方法 getAsyncExecutor() 必须要使用 @Bean, 并指定 Bean 的 name. 如果不使用 @Bean, 那么该方法返回的 Executor 并不会被 Spring 管理. 用 java doc API 的原话是:
- is not a fully managed Spring bean.
- (具体含义没有太理解, 不过亲测不加这个注解无法正常使用)
由于其本质上还是基于代理实现的, 所以如果一个类中有 A,B 两个异步方法, 而 A 中存在对 B 的调用, 那么调用 A 方法的时候, B 方法不会去异步执行的.
在异步方法上标注 @Transactional 是无效的.
future.get() 的时候, 最好使用 get(long timeout, TimeUnit unit) 方法, 避免长时间阻塞.
ListenableFuture 和 CompletableFuture 也是推荐使用的, 他们相比 Future, 提供了对异步调用的各个阶段或过程进行介入的能力.
参考:
- Spring Boot EnableAsync API doc
- Spring Task Execution and Scheduling
- https://www.atatech.org/articles/117225
来源: https://juejin.im/post/5c3fe71ff265da61223a966c