前言
近期接到一个任务, 需要改造现有从 MySQL 往 Elasticsearch 导入数据 MTE(mysqlToEs) 小工具, 由于之前采用单线程导入, 千亿数据需要两周左右的时间才能导入完成, 导入效率非常低. 所以楼主花了 3 天的时间, 利用 java 线程池框架 Executors 中的 FixedThreadPool 线程池重写了 MTE 导入工具, 单台服务器导入效率提高十几倍 (合理调整线程数据, 效率更高).
关键技术栈
- Elasticsearch
- jdbc
- ExecutorService\Thread
- sql
工具说明
maven 依赖
- MySQL
- MySQL-connector-java
- ${MySQL.version}
- org.Elasticsearch
- Elasticsearch
- ${Elasticsearch.version}
- org.Elasticsearch.client
- transport
- ${Elasticsearch.version}
- org.projectlombok
- lombok
- ${lombok.version}
- com.alibaba
- fastjson
- ${fastjson.version}
java 线程池设置
默认线程池大小为 21 个, 可调整. 其中 POR 为处理流程已办数据线程池, Ruby on Rails 为处理流程已阅数据线程池.
- private static int THREADS = 21;
- public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
- public static ExecutorService Ruby on Rails = Executors.newFixedThreadPool(THREADS);
定义已办生产者线程 / 已阅生产者线程: ZlPendProducer/ZlReadProducer
- public class ZlPendProducer implements Runnable {
- ...
- @Override
- public void run() {
- System.out.println(threadName + ":: 启动...");
- for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
- try {
- ....
- int size = 1000;
- for (int i = 0; i < count; i += size) {
- if (i + size > count) {
- // 作用为 size 最后没有 100 条数据则剩余几条 newList 中就装几条
- size = count - i;
- }
- String sql = "select * from" + tableName + "limit" + i + "," + size;
- System.out.println(tableName + "::sql::" + sql);
- rs = statement.executeQuery(sql);
- List lst = new ArrayList<>();
- while (rs.next()) {
- HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
- lst.add(p);
- }
- MteExecutor.POR.submit(new ZlPendConsumer(lst));
- Thread.sleep(2000);
- }
- ....
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public class ZlReadProducer implements Runnable {
... 已阅生产者处理逻辑同已办生产者
}
定义已办消费者线程 / 已阅生产者线程: ZlPendConsumer/ZlReadConsumer
- public class ZlPendConsumer implements Runnable {
- private String threadName;
- private List lst;
- public ZlPendConsumer(List lst) {
- this.lst = lst;
- }
- @Override
- public void run() {
- ...
- lst.forEach(v -> {
- try {
- String JSON = new Gson().toJson(v);
- EsClient.addDataInJSON(JSON, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
- Const.COUNTER.LD_P.incrementAndGet();
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("err::PendingId::" + v.getPendingId());
- }
- });
- ...
- }
- }
- public class ZlReadConsumer implements Runnable {
- // 已阅消费者处理逻辑同已办消费者
- }
定义导入 Elasticsearch 数据监控线程: Monitor
监控线程 - Monitor 为了计算每分钟导入 Elasticsearch 的数据总条数, 利用监控线程, 可以调整线程池的线程数的大小, 以便利用多线程更快速的导入数据.
- public void monitorToES() {
- new Thread(() -> {
- while (true) {
- StringBuilder sb = new StringBuilder();
- sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT)
- .append(":: 已办总数::").append(Const.COUNTER.LD_P_TOTAL)
- .append(":: 已办入库总数::").append(Const.COUNTER.LD_P);
- sb.append("~~~~ 已阅表数::").append(Const.TBL.TBL_READ_COUNT);
- sb.append(":: 已阅总数::").append(Const.COUNTER.LD_R_TOTAL)
- .append(":: 已阅入库总数::").append(Const.COUNTER.LD_R);
- if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
- ldPrevPendCount = Const.COUNTER.LD_P.get();
- ldPrevReadCount = Const.COUNTER.LD_R.get();
- start = System.currentTimeMillis();
- } else {
- long end = System.currentTimeMillis();
- if ((end - start) / 1000 >= 60) {
- start = end;
- sb.append("\n#########################################\n");
- sb.append("已办每分钟 TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条");
- sb.append(":: 已阅每分钟 TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条");
- ldPrevPendCount = Const.COUNTER.LD_P.get();
- ldPrevReadCount = Const.COUNTER.LD_R.get();
- }
- }
- System.out.println(sb.toString());
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
初始化 Elasticsearch:EsClient
- String cName = meta.get("cName");//es 集群名字
- String esNodes = meta.get("esNodes");//es 集群 ip 节点
- Settings esSetting = Settings.builder()
- .put("cluster.name", cName)
- .put("client.transport.sniff", true)// 增加嗅探机制, 找到 ES 集群
- .put("thread_pool.search.size", 5)// 增加线程池个数, 暂时设为 5
- .build();
- String[] nodes = esNodes.split(",");
- client = new PreBuiltTransportClient(esSetting);
- for (String node : nodes) {
- if (node.length() > 0) {
- String[] hostPort = node.split(":");
- client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
- }
- }
初始化数据库连接
conn = DriverManager.getConnection(url, user, password);
启动参数
nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:MySQL://ip:3306/mte 130 130 >> ./mte.log 2>&1 &
参数说明
ES-Cluster2019 为 Elasticsearch 集群名字
node1:9300,node2:9300,node3:9300 为 es 的节点 IP
130 130 为已办已阅分表的数据
程序入口: MteMain
- // 监控线程
- Monitor monitorService = new Monitor();
- monitorService.monitorToES();
- // 已办生产者线程
- Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));
- pendProducerThread.start();
- // 已阅生产者线程
- Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));
- readProducerThread.start();
来源: http://news.51cto.com/art/201907/599607.htm