今天给大家介绍一款新武器. 我自研的一个 java 组件 easyTask-L. 这个是做啥的呢? 我之前研发了一款单机版本的 easyTask, 这次是要介绍另外一款 easyTask-L. 区别就是后者支持分布式环境, 任务数据支持多个备份, 具备了真正意义上的高可用. 同时它又是轻量级的分布式应用, 原因是因为它还不是一个独立的中间件, 它需要一个宿主程序才能使用. 做成独立的中间件是我后面要继续做的一个版本.
组件开源地址: https://github.com/liuche51/easyTask-L
废话不多说, 先来介绍下 easyTask-L 组件的特性.
高可用: 因为我们是分布式 leader-follow 集群, 每个任务多有多个备份数据, 所以可靠性非常高
秒级触发: 我们是采用时钟秒级分片的数据结构, 支持秒级触发任务. 不早也不迟
分布式: 组件支持分布式
高并发: 支持多线程同时提交任务, 支持多线程同时执行任务
数据一致性: 使用 TCC 事务机制, 保障数据在集群中的强一致性
海量任务: 节点可以存储非常多的任务, 只要内存和磁盘足够. 触发效率也是极高. 需要配置好分派任务线程池和执行任务线程池大小即可
开源: 组件完全在 GitHub 上开源. 任何人都可以随意使用, 在不侵犯著作权情况下
易使用: 无需独立部署集群, 嵌入式开发. 不过多的依赖于第三方中间件, 除了 zookeeper.
easyTask-L 组件的整体架构如下:
整体采用分布式设计, leader-follow 风格. 集群中每一个节点都是 leader, 同时也可能是其他某个节点的 follow. 每个 leader 都有若干个 follow.leader 上提交的新任务都会强制同步到 follow 中, 删除任务同时也会强制删除 follow 中的备份任务. 集群中所有节点都会在 zookeeper 中注册并维持心跳.
easyTask-L 组件的核心 "环形队列" 的设计架构如下:
环形队列在之前单机版的 easyTask 中也讲过, 原理都是类似的. 客户端提交任务, 服务端先将任务进行持久化, 再添加上环形队列这个数据结构中去, 等待时间片轮询的到来. 不同的是这里的持久化机制, 改成了分布式存储了. 不仅 leader 自己存储起来, 还要同步存储到其 follow 中去. 删除一个任务也是类似的过程.
任务添加时会计算其触发所属的时间分片槽, 等环形队列的始终秒针到达时会判断任务是否可以被执行了. 如果可以执行了, 则分派任务线程池将其丢入执行任务线程池等待执行. 只要执行任务线程池线程数足够, 任务将立即得到执行.
大概的原理清晰了, 接下来就是写个 HelloWorld 程序了!
easyTask-L 不是一个中间件, 所以需要一个宿主程式. 建议在微服务框架如: dubbo,spring-cloud 中使用此组件, 并建立一个独立的专门用于处理延时任务的服务模块. 这样可以使服务尽可能少的频繁更新重启. 保持集群的稳定性. 下面我将以一个 springboot 应用为例来给大家演示如何使用 easyTask-L 组件
第一步: 引入 jar 包
如果你是 Maven 项目, 可以使用如下方式配置引入 jar 包. 这可以让项目自动引入 easyTask-L 中依赖的其他第三方 jar 包. 最新版本请在 maven 中央仓库中查询. 请在 pom.xml 中加入以下引用
- <dependency>
- <groupId>com.GitHub.liuche51</groupId>
- <artifactId>easyTask-L</artifactId>
- <version>1.0.1</version>
- </dependency>
第二步: 配置启动环形队列
这里以 springboot 应用为例, 在 application.YAML 中做如下配置
- server:
- port: 8081
- spring:
- application:
- name: easyTask-L
- easyTaskL:
- zkAddress: 127.0.0.1:2181
- taskStorePath: C:/db/node1
- serverPort: 2021
- sQLlitePoolSize: 5
- backupCount: 2
- dispatchPool:
- corePoolSize: 5
- maximumPoolSize: 50
- workPool:
- corePoolSize: 5
- maximumPoolSize: 50
新建一个启动配置类 EasyTaskLConf.java
- package com.GitHub.liuche51.easyTaskL.config;
- import com.GitHub.liuche51.easyTask.core.AnnularQueue;
- import com.GitHub.liuche51.easyTask.core.EasyTaskConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- @Configuration
- public class EasyTaskLConf {
- private static Logger log = LoggerFactory.getLogger(EasyTaskLConf.class);
- @Value("${easyTaskL.zkAddress}")
- private String zkAddress;
- @Value("${easyTaskL.taskStorePath}")
- private String taskStorePath;
- @Value("${easyTaskL.serverPort}")
- private int serverPort;
- @Value("${easyTaskL.sQLlitePoolSize}")
- private int sQLlitePoolSize;
- @Value("${easyTaskL.backupCount}")
- private int backupCount;
- @Value("${easyTaskL.dispatchPool.corePoolSize}")
- private int dispatchCorePoolSize;
- @Value("${easyTaskL.dispatchPool.maximumPoolSize}")
- private int dispatchMaximumPoolSize;
- @Value("${easyTaskL.workPool.corePoolSize}")
- private int workPoolCorePoolSize;
- @Value("${easyTaskL.workPool.maximumPoolSize}")
- private int workPoolMaximumPoolSize;
- @Bean
- public AnnularQueue initAnnularQueue(){
- try {
- EasyTaskConfig config =new EasyTaskConfig();
- config.setTaskStorePath(taskStorePath);
- config.setServerPort(serverPort);
- config.setSQLlitePoolSize(sQLlitePoolSize);
- //config.setBackupCount(backupCount);
- config.setZkAddress(zkAddress);
- AnnularQueue annularQueue = AnnularQueue.getInstance();
- config.setDispatchs(new ThreadPoolExecutor(dispatchCorePoolSize, dispatchMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- config.setWorkers(new ThreadPoolExecutor(workPoolCorePoolSize, workPoolMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- annularQueue.start(config);
- return annularQueue;
- }catch (Exception e){
- log.error("",e);
- return null;
- }
- }
- }
EasyTaskLConf.java
第三步: 建立延时任务处理类
- package com.GitHub.liuche51.easyTaskL.task;
- import com.GitHub.liuche51.easyTask.dto.Task;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.time.ZonedDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.*;
- import java.util.concurrent.ConcurrentLinkedQueue;
- public class CusTask1 extends Task implements Runnable {
- private static Logger log = LoggerFactory.getLogger(CusTask1.class);
- @Override
- public void run() {
- Map<String, String> param = getParam();
- if (param != null && param.size()> 0) {
- log.info("任务 1 已执行! 姓名:{} 生日:{} 年龄:{} 线程 ID:{}", param.get("name"), param.get("birthday"), param.get("age"), param.get("threadid"));
- }
- }
- }
第四步: 向环形队列中添加任务
新建一个 Controller, 增加以下 Action 方法.
- @RequestMapping("/once")
- @ResponseBody
- public String once(@RequestParam("name") String name, @RequestParam("time") int time) {
- CusTask1 task1 = new CusTask1();
- task1.setEndTimestamp(ZonedDateTime.now().plusSeconds(time).toInstant().toEpochMilli());
- Map<String, String> param = new HashMap<String, String>() {
- {
- put("name", name);
- put("birthday", "1996-1-1");
- put("age", "28");
- put("threadid", String.valueOf(Thread.currentThread().getId()));
- }
- };
- task1.setParam(param);
- return AnnularQueue.getInstance().submitAllowWait(task1);
- }
完整的 demo 可以使用 Git 克隆我的一个开源项目: https://gitee.com/liuche/DubboServer.git 找到子项目 easyTask-L-demo 即可
来源: https://www.cnblogs.com/liuche/p/13360396.html