前言
在大家的项目中, 想必都有那种, 启动时候要去其他服务拉一些数据的情况, 如果我们启动时, 其他服务没启动, 按岂不是就起不来了吗, 如果这段拉数据的代码, 并不是核心业务, 那你这就有点说不过去了: 不能因为对方没启动, 我们也不能启动吧?
经过一些思考后, 我觉得可以这样, 启动的时候:
启动一个定时的线程池, 让它去执行拉数据的任务, 如果任务执行失败, 会过一段时间后再次执行
我们希望, 一旦某一次执行任务, 成功后, 就不要再去拉数据了, 浪费网络流量和 CPU
我这边可以大概就大家演示下.
示例代码
服务端
随便写了个 spring boot 服务端, 监听本机 8082 端口. 模拟第三方服务
- @RestController
- @Slf4j
- public class BusinessController {
- @GetMapping("/")
- public String test() {
- return "success";
- }
- }
- @SpringBootApplication
- @Slf4j
- public class WebDemoApplicationServer {
- public static void main(String[] args) {
- ConfigurableApplicationContext context = SpringApplication.run(WebDemoApplicationServer.class, args);
- }
- }
客户端
客户端程序, 依赖第三方服务, 启动时, 要去上面的服务端拉数据.
代码和上面差不多, 唯一是在启动时, 会执行以下逻辑:
- @Component
- public class InitRunner implements CommandLineRunner{
- private static final Logger log = LoggerFactory.getLogger(InitRunner.class);
- @Autowired
- private RestTemplate restTemplate;
- @Override
- public void run(String... args) throws Exception {
- ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
- String s = entity.toString();
- log.info("get data:{}",s);
- }
- }
在上面的服务没启动的时候, 这个客户端是起不来的.
怎么解决呢, 很简单.
方案 1
- public class InitRunnerV2 implements CommandLineRunner {
- @Autowired
- private RestTemplate restTemplate;
- // 1
- ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
- new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));
- @Override
- public void run(String... args) {
- //2
- TestTask task = new TestTask(restTemplate);
- //3
- ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
- 0, 10, TimeUnit.SECONDS);
- // 4
- task.setScheduledFuture(scheduledFuture);
- }
- }
1 处, new 了一个线程池, ScheduledThreadPoolExecutor 类型, 可周期执行某个任务
2 处, new 了一个任务, 这个任务会执行我们的拉数据逻辑.
这个任务的代码如下:
- @Slf4j
- public class TestTask implements Runnable{
- private RestTemplate restTemplate;
- private volatile ScheduledFuture<?> scheduledFuture;
- public TestTask(RestTemplate restTemplate) {
- this.restTemplate = restTemplate;
- }
- ...
- public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
- this.scheduledFuture = scheduledFuture;
- }
- }
其实很简单, 就是定义了 2 个字段, 一个是 RestTemplate, 请求数据时要用; 另一个是 ScheduledFuture<?> 类型, 这个字段在上面的 InitRunnerV2 代码的第三处被赋值.
3 处, 让这个任务循环执行, 每 10s 一次.
4 处, 给 task 的 ScheduledFuture 赋值, 注意的是, 在 task 中, 这个字段我们定义为 volatile, 保证线程可见.
下面是任务代码的剖析:
- @Override
- public void run() {
- try {
- ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
- String s = entity.toString();
- log.info("get data:{}",s);
- } catch (Exception e) {
- // log.error("e:{}",e);
- log.error("error");
- return;
- }
- /**
- * 1 有可能任务执行太快, future 还没被赋值
- */
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- }
- }
唯一有什么要说的, 就是 1 处, 如果成功了, 我们就会调用 scheduledFuture.cancel(true);, 这样, 这个 scheduled 任务就不会继续执行了, 也就达到了我们的目的, 经济实惠.
到此, 代码基本就这样了, 详细代码见:
不成熟方案 2
因为上面的方案挺简单实用, 但感觉没啥干货, 于是我想着是否可以自己来实现一个定制的线程池, 把这些事情给自动化了.
希望实现的最终效果如下, 给 future 增加一个回调, 需要在任务执行成功时, 该回调自动被调用:
- public class InitRunnerV3 implements CommandLineRunner {
- @Autowired
- private RestTemplate restTemplate;
- CustomScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
- new CustomScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));
- @Override
- public void run(String... args) {
- // 1
- TestTaskV3 task = new TestTaskV3(restTemplate);
- // 2
- CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
- 0, 10, TimeUnit.SECONDS);
- // 3
- scheduledFuture.setCustomFutureCallBack(new CustomFutureCallBack() {
- @Override
- public void onSuccess(CustomScheduledFuture customScheduledFuture) {
- log.info("onSuccess");
- // 4
- customScheduledFuture.cancel(true);
- }
- @Override
- public void onException(Throwable throwable) {
- log.error("e:{}",throwable);
- }
- });
- }
1 处, 执行任务, 任务内部如下, 去除了设置 future 的逻辑, 和取消的逻辑
- @Slf4j
- public class TestTaskV3 implements Runnable{
- private RestTemplate restTemplate;
- public TestTaskV3(RestTemplate restTemplate) {
- this.restTemplate = restTemplate;
- }
- @Override
- public void run() {
- try {
- ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
- String s = entity.toString();
- log.info("get data:{}",s);
- } catch (Exception e) {
- // log.error("e:{}",e);
- log.error("error");
- throw e;
- }
- }
- }
2 处, 循环执行任务, 这里的 scheduled 线程池, 是我们自定义的, 回头再说; 获取其返回的 future
3 处, 给 future 增加回调, 在回调中, 如果成功, 则取消该任务.
- @Override
- public void onSuccess(CustomScheduledFuture customScheduledFuture) {
- log.info("onSuccess");
- // 4
- customScheduledFuture.cancel(true);
- }
寻找扩展点
这里, afterExecute 是个空实现, 就是留给子线程池扩展用的:
protected void afterExecute(Runnable r, Throwable t) { }
那我们可以考虑下, 要怎么才能实现我们的目标呢, 我们要在这个方法内, 通过传进来的 Runnable r, 获取到下面这个 future 才能实现目的:
- CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
- 0, 10, TimeUnit.SECONDS);
获取到 future, 就能拿到在 future 上设置的 callback 对象, 就能调用 callback, 所以, 现在问题是, 要在传进来的 Runnable 中, 获取到 scheduledFuture.
所以, 我们就得包装一下, 传进来的 runnable, 我们定义了如下的 Runnable:
- @Data
- public class CustomDecoratedRunnable implements Runnable {
- Runnable runnable;
- CustomScheduledFuture customScheduledFuture;
- public CustomDecoratedRunnable(Runnable runnable,CustomScheduledFuture customScheduledFuture) {
- this.runnable = runnable;
- this.customScheduledFuture = customScheduledFuture;
- }
- @Override
- public void run() {
- this.runnable.run();
- }
- }
定制线程池
我们具体看看, 我们定制的线程池对象, 我们的线程池, 直接继承了 ScheduledThreadPoolExecutor:
- public class CustomScheduledThreadPoolExecutor<V> extends ScheduledThreadPoolExecutor {
- public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
- super(corePoolSize, threadFactory);
- }
- ...
- }
其 scheduleAtFixedRate 方法, 我们进行了重写:
- @Override
- public CustomScheduledFuture<V> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- /**
- * 1
- */
- CustomScheduledFuture customScheduledFuture = new CustomScheduledFuture();
- // 2 将 future 设置到 task 中
- CustomDecoratedRunnable customDecoratedRunnable = new CustomDecoratedRunnable(command,customScheduledFuture);
- // 3
- ScheduledFuture<?> scheduledFuture = super.scheduleAtFixedRate(customDecoratedRunnable,
- initialDelay, period, unit);
- /**
- * 4 将返回的 future, 设置到我们包装过的 future
- */
- customScheduledFuture.setScheduledFuture((RunnableScheduledFuture) scheduledFuture);
- return customScheduledFuture;
- }
1 处, 新建一个自定义的 future
2 处, 将自定义的 future, 设置到上面说的 task 中
3 处, 把包装过的 task, 丢给线程池
4 处, 返回一个定制的 future, 这个 future, 包装了原有的 future, 同时, 支持设置 callback
- public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> {
- /**
- * 其实是下面这种类型:
- * {@link java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
- *
- */
- RunnableScheduledFuture<V> scheduledFuture;
- // 设置 callback 时, 赋值
- CustomFutureCallBack customFutureCallBack;
- Runnable runnable;
- }
丢给定制线程池的 task
本来, 我以为, 丢给线程池什么 Runnable 对象, 在 afterExecute 就能拿到什么样的 Runnable 对象, 结果:
发现, 传进来的, 已经被包装过了, 应该是为了支持周期执行.
所以, 没办法, 看起来路被堵死了, 通过这个传进来的 Runnable, 也拿不到我们原始的 Runnable.
后边找了半天, 找到下面这个点:
- #java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit) {
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period));
- // 1
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
1 处, 会调用 decorateTask 来包装 task, 默认实现, 就是如下:
- protected <V> RunnableScheduledFuture<V> decorateTask(
- Runnable runnable, RunnableScheduledFuture<V> task) {
- return task;
- }
这里的 task, 就是前面那个代码里的 ScheduledFutureTask<Void> sft:
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period));
- // 1
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
所以, 我们得想办法重载这个方法:
- @Override
- protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
- CustomScheduledFuture<V> future = new CustomScheduledFuture<>();
- future.setRunnable(runnable);
- future.setScheduledFuture(task);
- return future;
- }
这里, 利用 CustomScheduledFuture, 封装了 task 和 runnable 两个对象.
同时, 我们自定义的这个 CustomScheduledFuture, 也是实现了这个方法的返回值, 指定的接口:
- @Data
- public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V>
目前为止, 经过包装后, 在 afterExecute 处, 拿到的 Runnable 如下:
afterExecute 的逻辑, 调用回调
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- CustomScheduledFuture future;
- CustomDecoratedRunnable runnable = null;
- if (r instanceof CustomScheduledFuture) {
- future = (CustomScheduledFuture) r;
- // 1
- runnable = (CustomDecoratedRunnable) future.getRunnable();
- }
- // 2
- CustomScheduledFuture customScheduledFuture = runnable.getCustomScheduledFuture();
- // 3
- CustomFutureCallBack customFutureCallBack = customScheduledFuture.getCustomFutureCallBack();
- if (customFutureCallBack != null) {
- if (t != null) {
- customFutureCallBack.onException(t);
- } else {
- // 4
- customFutureCallBack.onSuccess(customScheduledFuture);
- }
- }
- }
1 处, 获取 runnable
2 处, 根据 runnable, 获取我们的 future
3 处, 通过 future, 获取回调
4 处, 调用回调
效果展示
- 2020-04-10 09:45:28.068 INFO 14456 --- [ main] No active profile set, falling back to default profiles: default
- 2020-04-10 09:45:28.822 INFO 14456 --- [ main] Started WebDemoApplication in 1.153 seconds (JVM running for 1.805)
- 2020-04-10 09:45:36.933 ERROR 14456 --- [init-data-from-third-sys-1-thread-1] error
- 2020-04-10 09:48:48.975 INFO 14456 --- [init-data-from-third-sys-1-thread-1] onSuccess
可以看到, 任务执行失败了, 但为啥会调用 onSuccess 呢; 另外, 大家可以看到, 都是在线程池的线程中执行的.
为啥会 error 了, 还执行 success 呢, 我发现, 即使我在 task 中抛出了异常, 但是上层没捕获.
我猜测, 是因为:
- public interface Runnable {
- /**
- * When an object implementing interface <code>Runnable</code> is used
- * to create a thread, starting the thread causes the object's
- * <code>run</code> method to be called in that separately executing
- * thread.
- * <p>
- * The general contract of the method <code>run</code> is that it may
- * take any action whatsoever.
- *
- * @see java.lang.Thread#run()
- */
- public abstract void run();
- }
这里没有抛出异常, 所以, 即使实现的 runnable 中抛了, 上层也不管.
具体还要验证.
注意点
另一个点是, 执行失败了, 等了 10s, 并没有再次执行, 猜测是我的定制 task, 导致了周期执行的问题. 这个待验证和解决.
但, 一个简单的回调, 我们已经实现了.
总结
大家使用方案 1 就可以了; 后面的方案, 是折腾着玩的. 希望对大家有帮助.
全部代码都在:
来源: https://www.cnblogs.com/grey-wolf/p/12671471.html