一, 前言
最近做了电子发票的需求, 分省开票接口和发票下载接口都有一定的延迟. 为了完成开票后自动将发票插入用户微信卡包, 目前的解决方案是利用线程池, 将开票后插入卡包的任务 (轮询分省发票接口, 直到获取到发票相关信息或者轮询次数用完, 如果获取到发票信息, 执行发票插入微信卡包, 结束任务) 放入线程池异步执行. 仔细想一想, 这种实现方案存在一个问题, 线程池没有充分的利用. 为什么没有充分的利用? 下面详细的分析.
二, 异步线程池和异步任务包装
AsyncConfigurerSupport 可以帮我们指定异步任务 (注有 @Async 注解) 对应的线程池.
- @Configuration
- public class MyAsyncConfigurer extends AsyncConfigurerSupport {
- private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class);
- @Override
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- taskExecutor.setCorePoolSize(2);
- taskExecutor.setMaxPoolSize(4);
- taskExecutor.setQueueCapacity(10);
- taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("异步线程池拒绝任务..." + runnable));
- taskExecutor.setThreadFactory(new MyAsyncThreadFactory());
- taskExecutor.initialize();
- return taskExecutor;
- }
- static class MyAsyncThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
- MyAsyncThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- namePrefix = "myasync-pool-" +
- poolNumber.getAndIncrement() +
- "-thread-";
- }
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- }
- }
异步任务包装, 除了异步, 还加入了 retry 功能, 实现指定次数的接口轮询.
- @Component
- public class AsyncWrapped {
- protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class);
- @Async
- public void asyncProcess(Runnable runnable, Callback callback, Retry retry) {
- try {
- if (retry == null) {
- retry = new Retry(1);
- }
- retry.execute(ctx -> {
- runnable.run();
- return null;
- }, ctx -> {
- if (callback != null) {
- callback.call();
- }
- return null;
- });
- } catch (Exception e) {
- LOGGER.error("异步调用异常...", e);
- }
- }
- }
业务代码大致逻辑如下.
- asyncWrapped.asyncProcess(() -> {
- // 调用分省接口获取发票信息
- // 如果发票信息异常, 抛出异常(进入下次重试)
- // 否则, 插入用户微信卡包
- }, () -> {
- // 轮询次数用尽, 用户插入卡包失败
- }
- , new Retry(2, 1000)
- );
这里说一下为什么线程池没有充分的利用. 异步任务中包含轮询操作, 轮询有一定的时间间隔, 导致在这段时间间隔内, 线程一直处于被闲置的状态. 所以为了能更好的利用线程池资源, 我们得想办法解决时间间隔的问题. 假如有个延迟队列, 队列里放着我们的异步任务 (不包含重试机制), 然后延迟(轮询的时间间隔) 一定时间之后, 将任务放入线程池中执行, 任务执行完毕之后根据是否需要再次执行决定是否再次放入到延迟队列去, 这样每个线程池中的线程都不会闲着, 达到了充分利用的目的.
三, 定时任务线程池和实现轮询机制
@EnableScheduling 帮助开启 @Scheduled 注解解析. 注册一个名字是 ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME 的定时任务线程池.
- @Configuration
- @EnableScheduling
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public class TaskConfiguration {
- @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public ScheduledExecutorService scheduledAnnotationProcessor() {
- return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
- }
- private static class DefaultThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
- DefaultThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- namePrefix = "pool-" +
- poolNumber.getAndIncrement() +
- "-schedule-";
- }
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon()) {
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
- }
- }
实现轮询任务, 实现接口 SchedulingConfigurer, 获取 ScheduledTaskRegistrar 并指定定时任务线程池.
- @Override
- public void configureTasks(ScheduledTaskRegistrar registrar) {
- this.registrar = registrar;
- this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));
- scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper();
- }
scheduledFutures 提交定时任务时返回结果集, periodTasks 定时任务结果集.
- private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
- private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();
定时任务包装类, 包含任务的执行次数(重试次数), 重试间隔, 具体任务, 重试次数用尽之后的回调等, 以及自动结束定时任务, 重试计数重置功能.
- private static class TimingTask {
- // 重试次数
- private Integer retry;
- // 任务标识
- private String taskId;
- // 重试间隔
- private Long period;
- // 具体任务
- private ScheduledRunnable task;
- // 结束回调
- private ScheduledCallback callback;
- // 重试计数
- private AtomicInteger count = new AtomicInteger(0);
- // 父线程 MDC
- private Map<String, String> curContext;
- public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {
- this.retry = retry;
- this.taskId = taskId;
- this.period = period;
- this.task = task;
- this.callback = callback;
- this.curContext = MDC.getCopyOfContextMap();
- }
- public Long getPeriod() {
- return period;
- }
- public void setPeriod(Long period) {
- this.period = period;
- }
- public String getTaskId() {
- return taskId;
- }
- public void setTaskId(String taskId) {
- this.taskId = taskId;
- }
- public Integer getRetry() {
- return retry;
- }
- public void setRetry(Integer retry) {
- this.retry = retry;
- }
- public AtomicInteger getCount() {
- return count;
- }
- public boolean reset() {
- for (int cnt = this.count.intValue(); cnt <this.retry; cnt = this.count.intValue()) {
- if (this.count.compareAndSet(cnt, 0)) {
- return true;
- }
- }
- return false;
- }
- public void process() {
- Map<String, String> preContext = MDC.getCopyOfContextMap();
- try {
- if (this.curContext == null) {
- MDC.clear();
- } else {
- // 将父线程的 MDC 内容传给子线程
- MDC.setContextMap(this.curContext);
- }
- this.task.run();
- exitTask(false);
- } catch (Exception e) {
- LOGGER.error("定时任务异常..." + this, e);
- if (count.incrementAndGet()>= this.retry) {
- exitTask(true);
- }
- } finally {
- if (preContext == null) {
- MDC.clear();
- } else {
- MDC.setContextMap(preContext);
- }
- }
- }
- // 定时任务退出
- private void exitTask(boolean execCallback) {
- scheduledFutures.get(this.taskId).cancel(false);
- scheduledFutures.remove(this.getTaskId());
- periodTasks.remove(this.getTaskId());
- LOGGER.info("结束定时任务:" + this);
- if (execCallback && callback != null) {
- callback.call();
- }
- }
- @Override
- public String toString() {
- return ReflectionToStringBuilder.toString(this
- , ToStringStyle.JSON_STYLE
- , false
- , false
- , TimingTask.class);
- }
- }
注意上面定时任务是如何退出的, 是在某一次任务执行成功之后 (没有异常抛出) 或者定时任务执行次数用尽才退出的. 直接调用 ScheduledFuture 的 cancel 方法可以退出定时任务. 还有就是定时任务中的日志需要父线程中的日志变量, 所以需要对 MDC 进行一下处理.
- @Scope("prototype")
- @Bean
- public AspectTimingTask aspectTimingTask() {
- return new AspectTimingTask();
- }
- @Aspect
- @Component
- public static class ScheduledAspect {
- @Around("target(AspectTimingTask)")
- public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
- if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {
- MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;
- Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod();
- if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {
- LOGGER.info("电子发票定时任务日志同步...");
- // 其他处理
- }
- }
- return proceedingJoinPoint.proceed();
- }
- }
- public static class AspectTimingTask implements Runnable {
- private TimingTask timingTask;
- @Override
- @ScheduledTask
- public void run() {
- timingTask.process();
- }
- public void setTimingTask(TimingTask timingTask) {
- this.timingTask = timingTask;
- }
- }
AspectTimingTask 是对 TimingTask 的包装类, 实现了 Runnable 接口. 主要是为了对 run 接口做一层切面, 获取 ProceedingJoinPoint 实例(公司中的日志调用链系统需要这个参数).AspectTimingTask 的 bean 实例的 scope 是 prototype, 这个注意下.
- public static void register(Integer retry
- , Long period
- , String taskId
- , ScheduledRunnable task
- , ScheduledCallback callback) {
- scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback);
- }
- private class ScheduledTaskRegistrarHelper {
- public void register(Integer retry
- , String taskId
- , Long period
- , ScheduledRunnable task
- , ScheduledCallback callback) {
- // 是否可以重置定时任务
- TimingTask preTask = periodTasks.get(taskId);
- if (null != preTask
- && preTask.reset()
- && existTask(taskId)) {
- return;
- }
- TimingTask curTask = new TimingTask(retry, taskId, period, task, callback);
- AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class);
- aspectTimingTask.setTimingTask(curTask);
- ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);
- scheduledFutures.put(taskId, scheduledFuture);
- periodTasks.put(taskId, curTask);
- LOGGER.info("注册定时任务:" + curTask);
- }
- private boolean existTask(String taskId) {
- return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId);
- }
- }
如果 taskId 的定时任务已经存在则重置定时任务, 否则注册新的定时任务. AspectTimingTask 实例通过 ApplicationContext 获取, 每次获取都是一个新的实例.
由 异步轮询任务 优化成 定时任务, 充分利用了线程池. 修改之后的业务代码如下.
- ScheduledTaskRegistrarHelper.register(10
- , 5*1000L
- , "taskId"
- , () -> {
- // 调用分省接口获取发票信息
- // 如果发票信息异常, 抛出异常(进入下次重试)
- // 否则, 插入用户微信卡包
- }
- () -> {
- // 轮询次数用尽, 用户插入卡包失败
- }
- );
针对电子发票插入微信卡包定时任务, 重试执行次数 10 次, 每隔 5 秒执行一次. 任务完成之后结束定时任务, 执行次数用尽之后触发插入卡包失败动作.
四, 参考
Spring 异步调用原理及 SpringAop 拦截器链原理
Springboot 定时任务原理及如何动态创建定时任务
来源: https://www.cnblogs.com/hujunzheng/p/10660479.html