Redis 发布 / 订阅命令
Redis 通过 PUBLISH , SUBSCRIBE 等命令实现了发布订阅模式. 该功能提供两种信息机制, 分别是 "发布订阅到频道" 和 "发布订阅到模式".
PUBLISH 命令和 SUBSCRIBE 命令
PUBLISH channel message
Redis 的 PUBLISH 命令可以让客户端把指定的消息发送到指定的频道中.
SUBSCRIBE channel [channel ...]
Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端.
下面我们就演示一下 PUBLISH 命令和 SUBSCRIBE 命令的用法:
首先是订阅单个频道:
然后是订阅多个频道:
PSUBSCRIBE 模式订阅命令
Redis 的发布与订阅实现支持模式匹配(pattern matching).
客户端可以订阅一个带 * 号的模式, 如果某个 / 某些频道的名字和这个模式匹配, 那么当有信息发送给这个 / 这些频道的时候, 客户端也会收到这个 / 这些频道的信息.
客户端订阅的模式里面可以包含多个 glob 风格的通配符, 比如 * , ? 和 [...] 等.
比如执行命令:
PSUBSCRIBE t.*
客户端将收到来自 t.java, t.db 等频道的信息.
Redis 发布 / 订阅的存储结构
每个 Redis 服务器进程都维持着一个表示服务器状态的 Redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息:
- struct redisServer {
- // ...
- dict *pubsub_channels;
- // ...
- }
其中, 字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端.
当调用 PUBLISH channel message 命令的时候, 程序首先根据 channel 定位到字典的键, 然后将信息发送给字典值链表中的所有客户端.
Redis 发布 / 订阅存储结构如下图所示:
Spring Data Redis 实现发布 / 订阅模式
下面带你一步步通过 Spring Data Redis 来实现发布与订阅.
由于篇幅原因下面就不再演示项目搭建和集成 Redis 的过程了
MessagePublisher
首先定义一个发布者接口, 接口只有一个 void publish(String message)方法, 用于发布消息.
- public interface MessagePublisher {
- /**
- * publish message
- * @param message
- */
- void publish(String message);
- }
然后提供一个基于 Redis 的 MessagePublisher 实现.
其中最核心的是这个方法: redisTemplate.convertAndSend(topic.getTopic(), message), 用于把消息发送到指定 topic 的 channel 之中.
- import lombok.Setter;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.listener.ChannelTopic;
- /**
- * Redis message publisher
- *
- * @author ijiangtao
- * @create 2019-05-01 19:36
- **/
- @Setter
- public class RedisMessagePublisher implements MessagePublisher {
- private RedisTemplate<String, String> redisTemplate;
- private ChannelTopic topic;
- private RedisMessagePublisher() { }
- public RedisMessagePublisher(RedisTemplate<String, String> redisTemplate, ChannelTopic topic) {
- this.redisTemplate = redisTemplate;
- this.topic = topic;
- }
- public void publish(String message) {
- redisTemplate.convertAndSend(topic.getTopic(), message);
- }
- }
- MessageListener
RedisMessageSubscriber 是一个订阅者, 它实现了 MessageListener 接口, 并通过一个 messageList 来存 / 取监听到的消息.
- import lombok.AccessLevel;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import org.springframework.data.Redis.connection.Message;
- import org.springframework.data.Redis.connection.MessageListener;
- import org.springframework.stereotype.Component;
- import java.util.List;
- /**
- * Redis Message Subscriber
- * <p>
- * RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface
- *
- * @author ijiangtao
- * @create 2019-05-01 19:39
- **/
- @AllArgsConstructor
- @NoArgsConstructor(access = AccessLevel.PRIVATE)
- @Data
- @Component
- public class RedisMessageSubscriber implements MessageListener {
- private List<String> messageList;
- public void onMessage(Message message, byte[] pattern) {
- messageList.add("[pattern:" + new String(pattern) + ",message:" + message.toString() + "]");
- }
- }
- RedisPubSubConfig
下面定义了两个 "topic", 并且通过两个 "publisher` 将"message"发布到"channel"指定的"topic" 上.
然后我们定义了两个 "subscriber","subscriber1" 订阅了 "topic1" 和 "topic2","subscriber2" 只订阅了 "topic2".
最后我们将这些发布者和订阅者都注册到了 Spring Data Redis 提供的容器 (RedisMessageListenerContainer) 中.
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.ComponentScan;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.PropertySource;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.listener.ChannelTopic;
- import org.springframework.data.Redis.listener.RedisMessageListenerContainer;
- import org.springframework.data.Redis.listener.adapter.MessageListenerAdapter;
- import org.springframework.data.Redis.repository.configuration.EnableRedisRepositories;
- import java.util.ArrayList;
- /**
- * config
- *
- * @author ijiangtao
- * @create 2019-05-01 19:57
- **/
- @Configuration
- @ComponentScan("net.ijiangtao.tech.framework.spring.ispringboot.redis")
- @EnableRedisRepositories(basePackages = "net.ijiangtao.tech.framework.spring.ispringboot")
- @PropertySource("classpath:application.properties")
- public class RedisPubSubConfig {
- @Autowired
- private RedisTemplate<String, String> redisTemplate;
- @Bean
- RedisMessageListenerContainer redisContainer() {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(redisTemplate.getConnectionFactory());
- container.addMessageListener(messageListenerAdapter1() , topic1());
- container.addMessageListener(messageListenerAdapter1() , topic2());
- container.addMessageListener(messageListenerAdapter2(), topic2());
- return container;
- }
- @Bean
- MessageListenerAdapter messageListenerAdapter1() {
- return new MessageListenerAdapter(messageListener1());
- }
- @Bean
- public RedisMessageSubscriber messageListener1() {
- return new RedisMessageSubscriber(new ArrayList<>());
- }
- @Bean
- MessageListenerAdapter messageListenerAdapter2() {
- return new MessageListenerAdapter(messageListener2());
- }
- @Bean
- public RedisMessageSubscriber messageListener2() {
- return new RedisMessageSubscriber(new ArrayList<>());
- }
- @Bean
- MessagePublisher redisPublisherForTopic1() {
- return new RedisMessagePublisher(redisTemplate, topic1());
- }
- @Bean
- MessagePublisher redisPublisherForTopic2() {
- return new RedisMessagePublisher(redisTemplate, topic2());
- }
- @Bean
- ChannelTopic topic1() {
- return new ChannelTopic("topic1");
- }
- @Bean
- ChannelTopic topic2() {
- return new ChannelTopic("topic2");
- }
- }
- Unit Test
下面我们通过单元测试, 往 "topic1" 和 "topic2" 分别发布了十条消息, 然后遍历 "subscriber1" 和 "subscriber2" 监听到的消息内容.
- import lombok.extern.slf4j.Slf4j;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import java.util.List;
- import java.util.UUID;
- /**
- * Redis Pub/Sub tests
- *
- * @author ijiangtao
- * @create 2019-05-01 19:12
- **/
- @RunWith(SpringRunner.class)
- @SpringBootTest
- @Slf4j
- public class RedisPubSub {
- @Autowired
- @Qualifier("redisPublisherForTopic1")
- private MessagePublisher redisPublisher1;
- @Autowired
- @Qualifier("redisPublisherForTopic2")
- private MessagePublisher redisPublisher2;
- @Autowired
- @Qualifier("messageListener1")
- private RedisMessageSubscriber subscriber1;
- @Autowired
- @Qualifier("messageListener2")
- private RedisMessageSubscriber subscriber2;
- @Test
- public void test1() {
- // 循环发布 10 次消息, 主要方法 redisTemplate.convertAndSend
- for (int i = 0; i <10; i++) {
- String message = "Topic1 Message :" + UUID.randomUUID();
- redisPublisher1.publish(message);
- }
- // 循环发布 10 次消息, 主要方法 redisTemplate.convertAndSend
- for (int i = 0; i < 10; i++) {
- String message = "Topic2 Message :" + UUID.randomUUID();
- redisPublisher2.publish(message);
- }
- // 获取存储的订阅消息
- List<String> messageList1 = subscriber1.getMessageList();
- for (int i = 0; i <messageList1.size(); i++) {
- log.info(messageList1.get(i));
- }
- // 获取存储的订阅消息
- List<String> messageList2 = subscriber2.getMessageList();
- for (int i = 0; i < messageList2.size(); i++) {
- log.info(messageList2.get(i));
- }
- }
- }
"subscriber1" 监听到了 "redisPublisher1" 和 "redisPublisher2" 发布的共 20 条消息:
- [pattern:topic1,message:Topic1 Message : 2239af04-8e91-4adf-8e1e-98261a44ff77]
- [pattern:topic1,message:Topic1 Message : 85107f06-2cae-4d6c-8123-9e8dc6e7a608]
- [pattern:topic1,message:Topic1 Message : 0b80b9b8-8eee-476e-8462-bb6cbbbcf863]
- [pattern:topic1,message:Topic1 Message : 0983f28d-d220-4538-b15e-dc66c0d3e491]
- [pattern:topic1,message:Topic1 Message : 0f2d863c-00b9-4406-8e49-020c78a3632d]
- [pattern:topic1,message:Topic1 Message : b8a0bb35-6cc2-4393-9136-2390de80f709]
- [pattern:topic1,message:Topic1 Message : 027f1ca5-39cc-42c6-a4d8-87dc138260b1]
- [pattern:topic1,message:Topic1 Message : ff85595e-2864-4dec-96c1-9dd29c69f670]
- [pattern:topic1,message:Topic1 Message : 77471855-f04b-437d-bd1b-afb801a33cf9]
- [pattern:topic1,message:Topic1 Message : feba4b0f-70c1-4c14-8ecb-bf4c6956f374]
- [pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
- [pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
- [pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
- [pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
- [pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
- [pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
- [pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
- [pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
- [pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
- [pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
"subscriber2" 监听到了 "redisPublisher2" 发布的共 10 条消息:
- [pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
- [pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
- [pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
- [pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
- [pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
- [pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
- [pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
- [pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
- [pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
- [pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
总结
本文从 Redis 发布和订阅相关的命令开始, 逐步讲解了 Redis 发布订阅的存储结构, 以及如何通过 Spring Data Redis 实现发布订阅模式.
来源: http://www.jianshu.com/p/c1a907651fa8