导读
集成 spring-kafka, 生产者生产邮件 message, 消费者负责发送
引入线程池, 多线程发送消息
多邮件服务器配置
定时任务生产消息; 计划邮件发送
实现过程
导入依赖
- <properties>
- <java.version>1.8</java.version>
- <MySQL.version>5.1.38</MySQL.version>
- <mapper.version>2.1.5</mapper.version>
- <mybatis.version>1.3.2</mybatis.version>
- <gson.version>2.8.2</gson.version>
- <lang3.version>3.4</lang3.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <!-- Spring Test -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>${mybatis.version}</version>
- </dependency>
- <!-- 数据库驱动 -->
- <dependency>
- <groupId>MySQL</groupId>
- <artifactId>MySQL-connector-java</artifactId>
- <version>${MySQL.version}</version>
- </dependency>
- <!-- 通用 Mapper 启动器 -->
- <dependency>
- <groupId>tk.mybatis</groupId>
- <artifactId>mapper-spring-boot-starter</artifactId>
- <version>${mapper.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <!-- 自定义配置文件需要 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
- </dependency>
- <!-- 使用 SLF4J + Logback 作为日志框架 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-logging</artifactId>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-mail</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>${gson.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${lang3.version}</version>
- </dependency>
- </dependencies>
2.application.YAML 配置 kafka, 邮箱, 数据库参数
本文采用的 YAML 配置方式, 若使用的 properties 的可使用 https://www.toyaml.com/index.html 这个在线工具转换;
邮箱配置, 若使用的 163, 或者 qq 等, 自己百度怎么申请授权码 (一大堆教程);
数据库采用的 Hikari 连接池, 号称 java 平台最快的;
- # 配置 Kafka 集群 IP 地址, 多个 IP 以逗号隔开:
- spring:
- kafka:
Bootstrap-servers: 你的 kafkaIP: 端口号
- producer:
- retries: 2 #发送失败后的重复发送次数
- key-serializer: org.apache.kafka.common.serialization.StringSerializer #key 序列化方式
- value-serializer: org.apache.kafka.common.serialization.StringSerializer #value 序列化方式
- compression-type: gzip #压缩格式
- batch-size: 16384 #批量发送的消息数量
- buffer-memory: 33554432 #32M 的批处理缓冲区
- consumer:
- auto-offset-reset: earliest #最早未被消费的 offset
- enable-auto-commit: false #是否开启自动提交
- #auto-commit-interval: 1000 #自动提交的时间间隔
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key 解码方式
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value 解码方式
- group-id: kafka.consumer.group.id.1
- max-poll-records: 50
- properties:
- session-timeout-ms: 20000 #连接超时时间
- max-poll-interval-ms: 15000 #手动提交设置与 poll 的心跳数, 如果消息队列中没有消息, 等待毫秒后, 调用 poll() 方法. 如果队列中有消息, 立即消费消息, 每次消费的消息的多少可以通过 max.poll.records 配置.
- max-partition-fetch-bytes: 15728640 #设置拉取数据的大小 15M
- client-id: kafkacli
- listener:
- ack-mode: manual_immediate
- datasource:
- type: com.zaxxer.hikari.HikariDataSource
- driver-class-name: com.MySQL.jdbc.Driver
- url: * #自己的
- username: * #账号
- password: * #密码
- hikari:
- minimum-idle: 5
- # 空闲连接存活最大时间, 默认 600000(10 分钟)
- idle-timeout: 180000
- # 连接池最大连接数, 默认是 10
- maximum-pool-size: 10
- # 此属性控制从池返回的连接的默认自动提交行为, 默认值: true
- auto-commit: true
- # 连接池名称
- pool-name: MyHikariCP
- # 此属性控制池中连接的最长生命周期, 值 0 表示无限生命周期, 默认 1800000 即 30 分钟
- max-lifetime: 1800000
- # 数据库连接超时时间, 默认 30 秒, 即 30000
- connection-timeout: 30000
- connection-test-query: SELECT 1
- # 邮箱服务器配置, 以 163 邮箱为例
- mail:
- host: smtp.163.com #邮箱服务器地址
- port: 25 #端口
- username: * #用户名
- password: * #授权密码
- default-encoding: UTF-8
- properties:
- from: * #用户名
- mail:
- smtp:
- connectiontimeout: 5000
- timeout: 3000
- writetimeout: 5000
- # 邮件模板
- thymeleaf:
- cache: false
- prefix: classpath:/views/
- # 邮件附件
- servlet:
- multipart:
- max-file-size: 10MB #限制单个文件大小
- max-request-size: 50MB #限制请求总量
- logging:
- level:
- com.example: debug
- pattern:
- # console: %d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n
- # file: %d{yyyy/MM/dd-HH:mm} [%thread] %-5level %logger- %msg%n
- path: C:\log
- # 邮件失败重试次数
- com:
- example:
- mail:
- sendNumber: 3 #邮件发送失败重试次数
- threadKillTime: 60 #线程超时杀死
- mybatis:
- type-aliases-package: com.example.mail.entity
- configuration:
- map-Underscore-to-camel-case: true
- mapper-locations: mappers/*Mapper.xml
- # 异步线程配置, 配置核心线程数
- async:
- executor:
- thread:
- core_pool_size: 15 #核心线程数量, 线程池创建时候初始化的线程数
- max_pool_size: 15 #最大线程数, 只有在缓冲队列满了之后才会申请超过核心线程数的线程
- queue_capacity: 99999 #缓冲队列, 用来缓冲执行任务的队列
- keep_alive_seconds: 60 #当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
- await_termination_seconds: 30 #设置线程池中任务的等待时间, 如果超过这个时候还没有销毁就强制销毁, 以确保应用最后能够被关闭, 而不是阻塞住.
- name:
- prefix: async-service-
- prefixson: async-service-son
3. 邮件消息格式
- {
- "mailUid": "邮件唯一标识",
- "fromName": "发件人别名",
- "fromMail": "发件人地址",
- "toMail": "收件人地址 (多个邮箱则用逗号","隔开)",
- "ccMail": "抄送人地址 (多个邮箱则用逗号","隔开)",
- "bccMail": "密送人地址 (多个邮箱则用逗号","隔开)",
- "planSendTime": "计划邮件时间",
- "mailSubject": "邮件主题",
- "mailContent": "邮件正文",
"sendNum": 发送次数,
- "serverFlag": "邮件服务器标识 (多邮件服务器用)"
- }
4.kafka 生产者, 消费者, mail 发送类等主要方法代码
- // 生产者
- public void sendToKafkaStandardMessageAsync(MailDTO mailDTO) {
- producer = new KafkaProducer<String, Object>(kafkaConfig.producerConfigs());
- producer.send(new ProducerRecord<String, Object>(topicName, gson.toJson(mailDTO)), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (metadata != null) {
- log.info("生产消息成功 {}, 发送次数 {},checksum:{},offset:{},partition:{},topic:{}", mailDTO.getMailUid(),mailDTO.getSendNum(),metadata.checksum(), metadata.offset(), metadata.partition(), metadata.topic());
- }
- if (exception != null) {
- log.info("生产消息失败 {}", exception.getMessage());
- }
- }
- });
- producer.close();
- }
- // 消费者
- /**
- * 监听一个 Kafka 主题
- **/
- @KafkaListener(topics = MQConstants.Topic.ITEM_EXCHANGE_NAME)
- public void receiveMessageFromKafka(ConsumerRecord<?, ?> record, Acknowledgment ack) {
- log.info("监听消息, MailUid:{}", gson.fromJson(String.valueOf(record.value()), MailDTO.class).getMailUid());
- Optional<?> kafkaMessage = Optional.ofNullable(record.value());
- if (kafkaMessage.isPresent()) {
- sendMessageService.sendMessages(gson.fromJson(String.valueOf(record.value()), MailDTO.class));
- }
- ack.acknowledge();// 手动提交偏移量
- }
- // 构建复杂邮件信息类
- public void sendMimeMail(MailVo mailVo) {
- try {
- MimeMessageHelper messageHelper = new MimeMessageHelper(mailSender.createMimeMessage(), true);//true 表示支持复杂类型
- mailVo.setFrom("这里读取配置文件中配的 from 地址");// 邮件发信人从配置项读取
- messageHelper.setFrom(mailVo.getFrom());// 邮件发信人
- messageHelper.setSentDate(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-07-18 12:45:48"));
- messageHelper.setTo(mailVo.getTo().split(","));// 邮件收信人
- messageHelper.setSubject(mailVo.getSubject());// 邮件主题
- messageHelper.setText(mailVo.getText());// 邮件内容
- if (!StringUtils.isEmpty(mailVo.getCc())) {// 抄送
- messageHelper.setCc(mailVo.getCc().split(","));
- }
- if (!StringUtils.isEmpty(mailVo.getBcc())) {// 密送
- messageHelper.setCc(mailVo.getBcc().split(","));
- }
- if (mailVo.getMultipartFiles() != null) {// 添加邮件附件
- for (MultipartFile multipartFile : mailVo.getMultipartFiles()) {
- messageHelper.addAttachment(multipartFile.getOriginalFilename(), multipartFile);
- }
- }
- if (StringUtils.isEmpty((CharSequence) mailVo.getSentDate())) {// 发送时间
- mailVo.setSentDate(new Date());
- messageHelper.setSentDate(mailVo.getSentDate());
- }
- mailSender.send(messageHelper.getMimeMessage());// 正式发送邮件
- mailVo.setStatus("ok");
- log.info("发送邮件成功:{}->{}", mailVo.getFrom(), mailVo.getTo());
- } catch (Exception e) {
- throw new RuntimeException(e);// 发送失败
- }
- }
5. 思路解析, 画图吧, 口述太费劲
完整代码: https://github.com/wwt729/mail.git
来源: https://www.cnblogs.com/729log/p/11283096.html