环境: SpringBoot + jdk1.8
基础配置参考
https://blog.csdn.net/llll234/article/details/80966952
查看了基础配置那么会遇到一下几个问题:
1. 实际应用中可能会订阅多个通道, 而一下这种写法不太通用
container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
2. 使用过程中使用 new RedisPmpSub() 配置消息接收对象会有问题.
如果 RedisPmpSub 既是消息接收类, 也是消息处理类. 那么如果此时需要注入 Bean, 会成功吗?
3. 考虑后期的扩展性是否能尽量不改变原有代码的基础上, 进行扩展
额外的配置文件
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>RELEASE</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
由于 GsonUtil 依赖的是某个 SDK,GsonUtil.toJson(this, BasePubMessage.class) 可替换为
new Gson().toJson(this, BasePubMessage.class);
lombok 需要下载插件
发布者
枚举定义
考虑到可维护性, 采用枚举的方式定义管道 RedisChannelEnums
- public enum RedisChannelEnums {
- /**Redis 频道 code 定义 需要与发布者一致 */
- LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播信息改变"),
- ;
- /** 枚举定义 + 描述 */
- private String code;
- private String description;
- RedisChannelEnums(String code, String description) {
- this.code = code;
- this.description = description;
- }
- /** 根据 code 获取对应的枚举对象 */
- public static RedisChannelEnums getEnum(String code) {
- RedisChannelEnums[] values = RedisChannelEnums.values();
- if (null != code && values.length> 0) {
- for (RedisChannelEnums value : values) {
- if (value.code == code) {
- return value;
- }
- }
- }
- return null;
- }
- /** 该 code 在枚举列表 code 属性是否存在 */
- public static boolean containsCode(String code) {
- RedisChannelEnums anEnum = getEnum(code);
- return anEnum != null;
- }
- /** 判断 code 与枚举中的 code 是否相同 */
- public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
- return calendarSourceEnum.code == code;
- }
- public String getCode() {
- return code;
- }
- public String getDescription() {
- return description;
- }
- }
消息模板
为了兼容不同的业务场景, 需要定义消息模板对象 BasePubMessage
其中 ToString 方法的作用是将对象转成 JSON 字符
- @Data
- public abstract class BasePubMessage {
- /** 发布订阅频道名称 */
- protected String channel;
- protected String extra;
- @Override
- public String toString() {
- return GsonUtil.toJson(this, BasePubMessage.class);
- }
- }
消息对象 LiveChangeMessage
其中 ToString 方法的作用是将对象转成 JSON 字符
- @Data
- public class LiveChangeMessage extends BasePubMessage {
- /** 直播 Ids*/
- private String liveIds;
- @Override
- public String toString() {
- return GsonUtil.toJson(this, LiveChangeMessage.class);
- }
- }
发布者服务
- public interface RedisPub {
- /**
- * 集成 Redis 实现消息发布订阅模式 - 双通道
- * @param redisChannelEnums 枚举定义
- * @param basePubMessage 消息
- */
- void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);
- }
- @Service
- public class RedisPubImpl implements RedisPub {
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- @Override
- public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {
- if(redisChannelEnums ==null || basePubMessage ==null){
- return;
- }
- basePubMessage.setChannel(redisChannelEnums.getCode());
- stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());
- System.out.println("发布成功!");
- }
- }
订阅者
注解配置
RedisConfig 作为订阅者的配置类, 主要作用是: Redis 消息监听器容器, 配置消息接收处理类
同时新加入的功能解决了我们上面提出的几个问题
- @Service
- @Configuration
- @EnableCaching
- public class RedisConfig {
- /**
- * 存放策略实例
- * classInstanceMap : key-beanName value - 对应的策略实现
- */
- private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20);
- /**
- * 注入所有实现了 Strategy 接口的 Bean
- *
- * @param strategyMap
- * 策略集合
- */
- @Autowired
- public RedisConfig(Map<String, BaseSub> strategyMap) {
- this.classInstanceMap.clear();
- strategyMap.forEach((k, v) ->
- this.classInstanceMap.put(k.toLowerCase(), v)
- );
- }
- /**
- * Redis 消息监听器容器
- *
- * @param connectionFactory
- *
- * @return
- */
- @Bean
- RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();
- if (redisChannelEnums.length> 0) {
- for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {
- if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {
- continue;
- }
- // 订阅了一个叫 pmp 和 channel 的通道, 多通道
- // 一个订阅者接收一个频道信息, 新增订阅者需要新增 RedisChannelEnums 定义 + BaseSub 的子类
- String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
- BaseSub baseSub = classInstanceMap.get(toLowerCase);
- container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
- }
- }
- return container;
- }
- /**
- * 配置消息接收处理类
- *
- * @param baseSub
- * 自定义消息接收类
- *
- * @return MessageListenerAdapter
- */
- @Bean()
- @Scope("prototype")
- MessageListenerAdapter listenerAdapter(BaseSub baseSub) {
- // 这个地方 是给 messageListenerAdapter 传入一个消息接受的处理器, 利用反射的方法调用 "receiveMessage"
- // 也有好几个重载方法, 这边默认调用处理器的方法 叫 handleMessage 可以自己到源码里面看
- // 注意 2 个通道调用的方法都要为 receiveMessage
- return new MessageListenerAdapter(baseSub, "receiveMessage");
- }
- }
- @Autowired
public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是将所有的配置消息接收处理类注入进来, 那么消息接收处理类里面的注解对象也会注入进来.
解决了我们提出的第二个问题
而 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
- BaseSub baseSub = classInstanceMap.get(toLowerCase);
- container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
是根据不同的管道对应不同的订阅者, 也就是一个订阅者对应一个管道. 方便根据不同的业务场景进行处理.
使用这种方式主需要配置 redisChannelEnum 枚举即可, 解决了我们提出的第一个问题.
这样一来, 订阅者就变得比较通用了
枚举
RedisChannelEnums 作用: 定义不同管道对应的订阅者, 后期增加一个管道类型只需要增加一个枚举即可
- public enum RedisChannelEnums {
- /**Redis 频道名称定义 需要与发布者一致 */
- LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播信息改变"),
- ;
- /** 枚举定义 + 描述 */
- private String code;
- private Class<? extends BaseSub> className;
- private String description;
- RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) {
- this.code = code;
- this.className=className;
- this.description = description;
- }
- /** 根据 code 获取对应的枚举对象 */
- public static RedisChannelEnums getEnum(String code) {
- RedisChannelEnums[] values = RedisChannelEnums.values();
- if (null != code && values.length> 0) {
- for (RedisChannelEnums value : values) {
- if (value.code == code) {
- return value;
- }
- }
- }
- return null;
- }
- /** 该 code 在枚举列表 code 属性是否存在 */
- public static boolean containsCode(String code) {
- RedisChannelEnums anEnum = getEnum(code);
- return anEnum != null;
- }
- /** 判断 code 与枚举中的 code 是否相同 */
- public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
- return calendarSourceEnum.code == code;
- }
- public String getCode() {
- return code;
- }
- public String getDescription() {
- return description;
- }
- public Class<? extends BaseSub> getClassName() {
- return className;
- }
- }
消息模板
BaseSubMessage 定义通用的字段, 与 JSON 字符的通用转换
- @Data
- abstract class BaseSubMessage {
- /** 发布订阅频道名称 */
- private String channel;
- private String extra;
- private String JSON;
- BaseSubMessage(String JSON) {
- if(StringUtils.isEmpty(JSON)){
- return;
- }
- this.JSON = JSON;
- Map map = new Gson().fromJson(this.JSON, Map.class);
- BeanHelper.populate(this, map);
- }
- }
LiveChangeMessage 定义当前业务场景的字段
- @Data
- @ToString(callSuper = true)
- public class LiveChangeMessage extends BaseSubMessage {
- /** 直播 Ids */
- private String liveIds;
- public LiveChangeMessage(String JSON) {
- super(JSON);
- }
- }
订阅者服务
BaseSub 定义接收消息的通用方法
- public interface BaseSub {
- /**
- * 接收消息
- * @param jsonMessage JSON 字符
- */
- void receiveMessage(String jsonMessage);
- }
LiveChangeSub 具体消息接收对象
- @Component
- public class LiveChangeSub implements BaseSub {
- /** 只是定义的注解测试, 可以换成自己的 */
- @Autowired
- private CategoryMapper categoryMapper;
- @Override
- public void receiveMessage(String jsonMessage) {
- System.out.println("项目 aries-server.....................");
- // 注意通道调用的方法名要和 RedisConfig2 的 listenerAdapter 的 MessageListenerAdapter 参数 2 相同
- System.out.println("这是 LiveChangeSub" + "-----" + jsonMessage);
- LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);
- System.out.println(liveChangeMessage);
- Category category = categoryMapper.get(1L);
- System.out.println("category:" + category);
- }
- }
总结
发布者配置场景: 独立的服务器, 独立的项目, A Redis 缓存服务器
订阅者配置场景: 不同于发布者的独立的服务器, 独立的项目, A Redis 缓存服务器
使用场景: 一个发布者, 一个或者多个订阅者. 发布者负责发布消息, 订阅者负责接收消息. 一旦发布者消息发布出来, 那么
订阅者可以通过管道进行监听. 同时可以根据不同的管道设置不同的消息接收者或者叫消息处理者.
优点: 容易配置, 好管理
缺点: 由于基于 Redis 去做, 不同的 Redis 服务就不适用了. 需要考虑消息丢失, 持久化的问题.
来源: https://www.cnblogs.com/IT-study/p/11352254.html