发布订阅简介
除了使用 List 实现简单的消息队列功能以外, Redis 还提供了发布订阅的消息机制. 在这种机制下, 消息发布者向指定频道 (channel) 发布消息, 消息订阅者可以收到指定频道的消息, 同一个频道可以有多个消息订阅者, 如下图:
Redis 也提供了一些命令支持这个机制, 接下来我们详细介绍一下这些命令.
发布订阅相关命令
在 Redis 中, 发布订阅相关命令有:
发布消息
订阅频道
取消订阅
按照模式订阅
按照模式取消订阅
查询订阅信息
发布消息
发布消息的命令是 publish, 语法是:
publish 频道名称 消息
比如, 要向 channel:one-more-study:demo 频道发布一条消息 "I am One More Study.", 命令如下:
- > publish channel:one-more-study:demo "I am One More Study."
- (integer) 0
返回的结果是订阅者的个数, 上例中没有订阅者, 所以返回结果为 0.
订阅消息
订阅消息的命令是 subscribe, 订阅者可以订阅一个或者多个频道, 语法是:
subscribe 频道名称 [频道名称 ...]
比如, 订阅一个 channel:one-more-study:demo 频道, 命令如下:
- > subscribe channel:one-more-study:demo
- Reading messages... (press Ctrl-C to quit)
- 1) "subscribe"
- 2) "channel:one-more-study:demo"
- 3) (integer) 1
返回结果中有 3 条, 分别表示: 返回值的类型(订阅成功), 订阅的频道名称, 目前已订阅的频道数量. 当订阅者接受到消息时, 就会显示:
- ) "message"
- ) "channel:one-more-study:demo"
- ) "I am One More Study."
同样也是 3 条结果, 分别表示: 返回值的类型(信息), 消息来源的频道名称, 消息内容.
新开启的订阅者, 是无法收到该频道之前的历史消息的, 因为 Redis 没有对发布的消息做持久化.
取消订阅
取消订阅的命令是 unsubscribe, 可以取消一个或者多个频道的订阅, 语法是:
unsubscribe [频道名称 [频道名称 ...]]
比如, 取消订阅 channel:one-more-study:demo 频道, 命令如下:
- > unsubscribe channel:one-more-study:demo
- 1) "unsubscribe"
- 2) "channel:one-more-study:demo"
- 3) (integer) 0
返回结果中有 3 条, 分别表示: 返回值的类型(取消订阅成功), 取消订阅的频道名称, 目前已订阅的频道数量.
- > psubscribe channel:*
- Reading messages... (press Ctrl-C to quit)
- 1) "psubscribe"
- 2) "channel*"
- 3) (integer) 1
- ) "pmessage"
- ) "channel*"
- ) "channel:one-more-study:demo"
- ) "I am One More Study."
- 1> punsubscribe channel:*
- 1) "punsubscribe"
- 2) "channel:*"
- 3) (integer) 0
- > pubsub channels
- 1) "channel:one-more-study:test"
- 2) "channel:one-more-study:demo"
- 3) "channel:demo"
- > pubsub channels *demo
- 1) "channel:one-more-study:demo"
- 2) "channel:demo"
- > pubsub channels *one-more-study*
- 1) "channel:one-more-study:test"
- 2) "channel:one-more-study:demo"
- > pubsub numsub channel:one-more-study:demo
- 1) "channel:one-more-study:demo"
- 2) (integer) 1
- > pubsub numpat
- (integer) 1
- package onemore.study;
- import Redis.clients.jedis.HostAndPort;
- import Redis.clients.jedis.JedisCluster;
- import Redis.clients.jedis.JedisPoolConfig;
- import java.util.HashSet;
- import java.util.Set;
- /**
- * Jedis 集群
- *
- * @author 万猫学社
- */
- public enum Cluster {
- INSTANCE;
- // 为了简单, 把 IP 和端口直接写在这里, 实际开发中写在配置文件会更好.
- private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
- private JedisCluster jedisCluster;
- Cluster() {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- // 最大连接数
- poolConfig.setMaxTotal(20);
- // 最大空闲数
- poolConfig.setMaxIdle(10);
- // 最小空闲数
- poolConfig.setMinIdle(2);
- // 从 jedis 连接池获取连接时, 校验并返回可用的连接
- poolConfig.setTestOnBorrow(true);
- // 把连接放回 jedis 连接池时, 校验并返回可用的连接
- poolConfig.setTestOnReturn(true);
- Set<HostAndPort> nodes = new HashSet<>();
- String[] hosts = hostAndPorts.split(";");
- for (String hostport : hosts) {
- String[] ipport = hostport.split(":");
- String ip = ipport[0];
- int port = Integer.parseInt(ipport[1]);
- nodes.add(new HostAndPort(ip, port));
- }
- jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
- }
- public JedisCluster getJedisCluster() {
- return jedisCluster;
- }
- }
- package onemore.study;
- import Redis.clients.jedis.JedisCluster;
- /**
- * 发布者
- *
- * @author 万猫学社
- */
- public class Publisher implements Runnable {
- private final String CHANNEL_NAME = "channel:one-more-study:demo";
- private final String QUIT_COMMAND = "quit";
- @Override
- public void run() {
- JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
- for (int i = 1; i <= 3; i++) {
- String message = "第" + i + "消息";
- System.out.println(Thread.currentThread().getName() + "发布:" + message);
- jedisCluster.publish(CHANNEL_NAME, message);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("------------------");
- }
- jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
- }
- }
- package onemore.study;
- import Redis.clients.jedis.JedisCluster;
- import Redis.clients.jedis.JedisPubSub;
- /**
- * 订阅者
- *
- * @author 万猫学社
- */
- public class Subscriber implements Runnable {
- private final String CHANNEL_NAME = "channel:one-more-study:demo";
- private final String QUIT_COMMAND = "quit";
- private final JedisPubSub jedisPubSub = new JedisPubSub() {
- @Override
- public void onMessage(String channel, String message) {
- System.out.println(Thread.currentThread().getName() + "接收:" + message);
- if (QUIT_COMMAND.equals(message)) {
- unsubscribe(CHANNEL_NAME);
- }
- }
- };
- @Override
- public void run() {
- JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
- jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
- }
- }
- package onemore.study;
- public class App {
- public static void main(String[] args) throws InterruptedException {
- // 创建 3 个订阅者
- new Thread(new Subscriber()).start();
- new Thread(new Subscriber()).start();
- new Thread(new Subscriber()).start();
- Thread.sleep(1000);
- // 创建发布者
- new Thread(new Publisher()).start();
- }
- }
来源: https://www.cnblogs.com/heihaozi/p/12779069.html