背景
对于多线程的理解不是非常深刻, 工作中用到多线程代码的机会也不多, 前不久遇到了一个使用场景, 通过编码实现后对于多线程的理解和应用有了更加深刻的理解. 场景如下: 现有给用户发送产品调研的需求, 运营的同事拿来了一个 Excel 文件, 要求给 Excel 里面大约六万个手机号发送调研短信.
最简单的方法就是一个循环然后单线程顺序发送, 但是核心问题在于, 给短信运营商发短信的接口响应时间较长, 假设平均 100ms 的响应时间, 那么单线程发送的话需要 6 万 * 0.1 秒 = 6000 秒. 显然这个时间是不能接受的, 运营商系统的发送接口我们是不能优化的, 只得增强自己的发送和处理能力才能尽快的完成任务.
批量发短信
读取 Excel 中的信息
包依赖
工具类代码, Maven 中引入如下两个包
- <dependency>
- <groupId>org.apache.poi</groupId>
- <artifactId>poi-ooxml</artifactId>
- <version>3.17</version>
- </dependency>
- <dependency>
- <groupId>org.apache.xmlbeans</groupId>
- <artifactId>xmlbeans</artifactId>
- <version>2.6.0</version>
- </dependency>
读取 Excel 的工具类代码
- /**
- * 读取 Excel 的文件信息
- *
- * @param fileName
- */
- public static void readFromExcel(String fileName) {
- InputStream is = null;
- try {
- is = new FileInputStream(fileName);
- XSSFWorkbook workbook = new XSSFWorkbook(is);
- XSSFSheet sheet = workbook.getSheetAt(0);
- int num = 0;
- // 循环行 Row
- for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) {
- XSSFRow row = sheet.getRow(rowNum);
- String phoneNumber = getStringValueFromCell(row.getCell(0)).trim();
- phoneList.add(phoneNumber);
- }
- System.out.println(num);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * 读取 Excel 里面 Cell 内容
- *
- * @param cell
- * @return
- */
- private static String getStringValueFromCell(XSSFCell cell) {
- // 单元格内的时间格式
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
- // 单元格内的数字类型
- DecimalFormat decimalFormat = new DecimalFormat("#.#####");
- // 单元格默认为空
- String cellValue = "";
- if (cell == null) {
- return cellValue;
- }
- // 按类型读取
- if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) {
- cellValue = cell.getStringCellValue();
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) {
- // 日期转为时间形式
- if (DateUtil.isCellDateFormatted(cell)) {
- double d = cell.getNumericCellValue();
- Date date = DateUtil.getJavaDate(d);
- cellValue = dateFormat.format(date);
- } else {
- // 其他转为数字
- cellValue = decimalFormat.format((cell.getNumericCellValue()));
- }
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) {
- cellValue = "";
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) {
- cellValue = String.valueOf(cell.getBooleanCellValue());
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) {
- cellValue = "";
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) {
- cellValue = cell.getCellFormula().toString();
- }
- return cellValue;
- }
模拟运营商发送短信的方法
- /**
- * 外部接口耗时长, 通过多线程增强
- *
- * @param userPhone
- */
- public void sendMsgToPhone(String userPhone) {
- try {
- Thread.sleep(SEND_COST_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("send message to :" + userPhone);
- }
多线程发短信
简单的单线程发送
- /**
- * 单线程发送
- *
- * @param phoneList
- * @return
- */
- private long singleThread(List<String> phoneList) {
- long start = System.currentTimeMillis();
- /*// 直接主线程执行
- for (String phoneNumber : phoneList) {
- threadOperation.sendMsgToPhone(phoneNumber);
- }*/
- SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList);
- smet.start();
- long totalTime = System.currentTimeMillis() - start;
- System.out.println("单线程发送总时间:" + totalTime);
- return totalTime;
- }
对于大批量发短信的场景, 如果使用单线程将全部一千个号码发送完毕的话, 大约需要 103132ms, 可见效率低下, 耗费时间较长.
多线程发送短信中的一个核心要点是, 将全部手机号码拆分成多个组后, 分配给每个线程进行执行.
两个线程的示例
- /**
- * 两个线程发送
- *
- * @param phoneList
- * @return
- */
- private long twoThreads(List<String> phoneList) {
- long start = System.currentTimeMillis();
- List<String> list1 = phoneList.subList(0, phoneList.size() / 2);
- List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size());
- SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1);
- smet.start();
- SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2);
- smet1.start();
- return 0;
- }
另一种数据分组方式
- /**
- * 另外一种分配方式
- *
- * @param phoneList
- */
- private void otherThread(List<String> phoneList) {
- for (int threadNo = 0; threadNo <10; threadNo++) {
- int numbersPerThread = 10;
- List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
- SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list);
- smet.start();
- if (list.size() <numbersPerThread) {
- break;
- }
- }
- }
线程池发送
- /**
- * 线程池发送
- *
- * @param phoneList
- * @return
- */
- private void threadPool(List<String> phoneList) {
- for (int threadNo = 0; threadNo <THREAD_POOL_SIZE; threadNo++) {
- int numbersPerThread = 10;
- List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
- threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list));
- }
- threadOperation.executorService.shutdown();
- }
使用 Callable 发送
- /**
- * 多线程发送
- *
- * @param phoneList
- * @return
- */
- private void multiThreadSend(List<String> phoneList) {
- List<Future<Long>> futures = new ArrayList<>();
- for (int threadNo = 0; threadNo <THREAD_POOL_SIZE; threadNo++) {
- int numbersPerThread = 100;
- List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100);
- Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo)));
- futures.add(future);
- }
- for (Future<Long> future : futures) {
- try {
- System.out.println(future.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- threadOperation.executorService.shutdown();
- }
使用多线程发送, 将发送任务进行分割然后分配给每个线程执行, 执行完毕需要 10266ms, 可见执行效率明显提升, 消耗时间明显缩短.
完整代码
- package com.lingyejun.tick.authenticator;
- import org.apache.poi.ss.usermodel.DateUtil;
- import org.apache.poi.xssf.usermodel.XSSFCell;
- import org.apache.poi.xssf.usermodel.XSSFRow;
- import org.apache.poi.xssf.usermodel.XSSFSheet;
- import org.apache.poi.xssf.usermodel.XSSFWorkbook;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.InputStream;
- import java.text.DecimalFormat;
- import java.text.SimpleDateFormat;
- import java.util.*;
- import java.util.concurrent.*;
- public class ThreadOperation {
- // 发短信的同步等待时间
- private static final long SEND_COST_TIME = 100L;
- // 手机号文件
- private static final String FILE_NAME = "/Users/lingye/Downloads/phone_number.xlsx";
- // 手机号列表
- private static List<String> phoneList = new ArrayList<>();
- // 单例对象
- private static volatile ThreadOperation threadOperation;
- // 线程个数
- private static final int THREAD_POOL_SIZE = 10;
- // 初始化线程池
- private ExecutorService executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- public ThreadOperation() {
- // 从本地文件中读取手机号码
- readFromExcel(FILE_NAME);
- }
- public static void main(String[] args) {
- ThreadOperation threadOperation = getInstance();
- //threadOperation.singleThread(phoneList);
- threadOperation.multiThreadSend(phoneList);
- }
- /**
- * 单例获取对象
- *
- * @return
- */
- public static ThreadOperation getInstance() {
- if (threadOperation == null) {
- synchronized (ThreadOperation.class) {
- if (threadOperation == null) {
- threadOperation = new ThreadOperation();
- }
- }
- }
- return threadOperation;
- }
- /**
- * 读取 Excel 的文件信息
- *
- * @param fileName
- */
- public static void readFromExcel(String fileName) {
- InputStream is = null;
- try {
- is = new FileInputStream(fileName);
- XSSFWorkbook workbook = new XSSFWorkbook(is);
- XSSFSheet sheet = workbook.getSheetAt(0);
- int num = 0;
- // 循环行 Row
- for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) {
- XSSFRow row = sheet.getRow(rowNum);
- String phoneNumber = getStringValueFromCell(row.getCell(0)).trim();
- phoneList.add(phoneNumber);
- }
- System.out.println(num);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * 读取 Excel 里面 Cell 内容
- *
- * @param cell
- * @return
- */
- private static String getStringValueFromCell(XSSFCell cell) {
- // 单元格内的时间格式
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
- // 单元格内的数字类型
- DecimalFormat decimalFormat = new DecimalFormat("#.#####");
- // 单元格默认为空
- String cellValue = "";
- if (cell == null) {
- return cellValue;
- }
- // 按类型读取
- if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) {
- cellValue = cell.getStringCellValue();
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) {
- // 日期转为时间形式
- if (DateUtil.isCellDateFormatted(cell)) {
- double d = cell.getNumericCellValue();
- Date date = DateUtil.getJavaDate(d);
- cellValue = dateFormat.format(date);
- } else {
- // 其他转为数字
- cellValue = decimalFormat.format((cell.getNumericCellValue()));
- }
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) {
- cellValue = "";
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) {
- cellValue = String.valueOf(cell.getBooleanCellValue());
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) {
- cellValue = "";
- } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) {
- cellValue = cell.getCellFormula().toString();
- }
- return cellValue;
- }
- /**
- * 外部接口耗时长, 通过多线程增强
- *
- * @param userPhone
- */
- public void sendMsgToPhone(String userPhone) {
- try {
- Thread.sleep(SEND_COST_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("send message to :" + userPhone);
- }
- /**
- * 单线程发送
- *
- * @param phoneList
- * @return
- */
- private long singleThread(List<String> phoneList) {
- long start = System.currentTimeMillis();
- /*// 直接主线程执行
- for (String phoneNumber : phoneList) {
- threadOperation.sendMsgToPhone(phoneNumber);
- }*/
- SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList);
- smet.start();
- long totalTime = System.currentTimeMillis() - start;
- System.out.println("单线程发送总时间:" + totalTime);
- return totalTime;
- }
- /**
- * 另外一种分配方式
- *
- * @param phoneList
- */
- private void otherThread(List<String> phoneList) {
- for (int threadNo = 0; threadNo <10; threadNo++) {
- int numbersPerThread = 10;
- List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
- SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list);
- smet.start();
- if (list.size() <numbersPerThread) {
- break;
- }
- }
- }
- /**
- * 两个线程发送
- *
- * @param phoneList
- * @return
- */
- private long twoThreads(List<String> phoneList) {
- long start = System.currentTimeMillis();
- List<String> list1 = phoneList.subList(0, phoneList.size() / 2);
- List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size());
- SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1);
- smet.start();
- SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2);
- smet1.start();
- return 0;
- }
- /**
- * 线程池发送
- *
- * @param phoneList
- * @return
- */
- private void threadPool(List<String> phoneList) {
- for (int threadNo = 0; threadNo <THREAD_POOL_SIZE; threadNo++) {
- int numbersPerThread = 10;
- List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
- threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list));
- }
- threadOperation.executorService.shutdown();
- }
- /**
- * 多线程发送
- *
- * @param phoneList
- * @return
- */
- private void multiThreadSend(List<String> phoneList) {
- List<Future<Long>> futures = new ArrayList<>();
- for (int threadNo = 0; threadNo <THREAD_POOL_SIZE; threadNo++) {
- int numbersPerThread = 100;
- List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100);
- Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo)));
- futures.add(future);
- }
- for (Future<Long> future : futures) {
- try {
- System.out.println(future.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- threadOperation.executorService.shutdown();
- }
- public class SendMsgExtendThread extends Thread {
- private List<String> numberListByThread;
- public SendMsgExtendThread(List<String> numberList) {
- numberListByThread = numberList;
- }
- @Override
- public void run() {
- long startTime = System.currentTimeMillis();
- for (int i = 0; i <numberListByThread.size(); i++) {
- System.out.print("no." + (i + 1));
- sendMsgToPhone(numberListByThread.get(i));
- }
- System.out.println("== single thread send" + numberListByThread.size() + "execute time:" + (System.currentTimeMillis() - startTime) + "ms");
- }
- }
- public class SendMsgImplCallable implements Callable<Long> {
- private List<String> numberListByThread;
- private String threadName;
- public SendMsgImplCallable(List<String> numberList, String threadName) {
- numberListByThread = numberList;
- this.threadName = threadName;
- }
- @Override
- public Long call() throws Exception {
- Long startMills = System.currentTimeMillis();
- for (String number : numberListByThread) {
- sendMsgToPhone(number);
- }
- Long endMills = System.currentTimeMillis();
- return endMills - startMills;
- }
- }
- }
来源: https://juejin.im/post/5c596c4b51882562ea72160e