人生终将是场单人旅途, 孤独之前是迷茫, 孤独过后是成长.
楔子
先给大家说声抱歉, 最近一周都没有发文, 有一些比较要紧重要的事需要处理.
今天正好得空, 本来说准备写 SpringIOC 相关的东西, 但是发现想要梳理一遍还是需要很多时间, 所以我打算慢慢写, 先把 MQ 给写了, 再慢慢写其他相关的, 毕竟偏理论的东西一遍要比较难写, 像 MQ 这种偏实战的大家可以 clone 代码去玩一玩, 还是比较方便的.
同时 MQ 也是 Java 进阶不必可少的技术栈之一, 所以 Java 开发从业者对它是必须要了解的.
现在市面上有三种消息队列比较火分别是: RabbitMQ,RocketMQ 和 Kafka.
今天要讲的消息队列中我会以 RabbitMQ 作为案例来入门, 因为 SpringBoot 的 amqp 中默认只集成了 RabbitMQ, 用它来讲会方便许多, 且 RabbitMQ 的性能和稳定性都很不错, 是一款经过时间考验的开源组件.
祝有好收获.
本文代码: 码云地址GitHub 地址
1. 消息队列?
消息队列 (MQ) 全称为 Message Queue, 是一种应用程序对应用程序的通信方法.
翻译一下就是: 在应用之间放一个消息组件, 然后应用双方通过这个消息组件进行通信.
好端端的为啥要在中间放个组件呢?
小系统其实是用不到消息队列的, 一般分布式系统才会引入消息队列, 因为分布式系统需要抗住高并发, 需要多系统解耦, 更需要对用户比较友好的响应速度, 而消息队列的特性可以天然解耦, 方便异步更能起到一个顶住高并发的削峰作用, 完美解决上面的三个问题.
然万物抱阳负阴, 系统之间突然加了个中间件, 提高系统复杂度的同时也增加了很多问题:
消息丢失怎么办?
消息重复消费怎么办?
某些任务需要消息的顺序消息, 顺序消费怎么保证?
消息队列组件的可用性如何保证?
这些都是使用消息队列过程中需要思考需要考虑的地方, 消息队列能给你带来很大的便利, 也能给你带来一些对应的麻烦.
上面说了消息队列带来的好处以及问题, 而这些不在我们今天这篇的讨论范围之内, 我打算之后再写这些, 我们今天要做的是搭建出一个消息队列环境, 让大家感受一下基础的发消息与消费消息, 更高级的问题会放在以后讨论.
2. RabbitMQ 一览
RabbitMQ 是一个消息组件, 是一个 erlang 开发的 AMQP(Advanced Message Queue)的开源实现.
AMQP, 即 Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计.
RabbitMQ 采用了 AMQP 协议, 至于这协议怎么怎么样, 我们关心的是 RabbitMQ 结构如何且怎么用.
还是那句话, 学东西需要先观其大貌, 我们要用 RabbitMQ 首先要知道它整体是怎么样, 这样才有利于我们接下来的学习.
我们先来看看我刚画的架构图, 因为 RabbitMQ 实现了 AMQP 协议, 所以这些概念也是 AMQP 中共有的.
Broker: 中间件本身. 接收和分发消息的应用, 这里指的就是 RabbitMQ Server.
Virtual host: 虚拟主机. 出于多租户和安全因素设计的, 把 AMQP 的基本组件划分到一个虚拟的分组中, 类似于网络中的 namespace 概念. 当多个不同的用户使用同一个 RabbitMQ server 提供的服务时, 可以划分出多个 vhost, 每个用户在自己的 vhost 创建 exchange/queue 等.
Connection: 连接. publisher/consumer 和 broker 之间的 TCP 连接. 断开连接的操作只会在 client 端进行, Broker 不会断开连接, 除非出现网络故障或 broker 服务出现问题.
Channel: 渠道. 如果每一次访问 RabbitMQ 都建立一个 Connection, 在消息量大的时候建立 TCP Connection 的开销会比较大且效率也较低. Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程, 通常每个 thread 创建单独的 channel 进行通讯, AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel, 所以 channel 之间是完全隔离的. Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销.
Exchange: 路由. 根据分发规则, 匹配查询表中的 routing key, 分发消息到 queue 中去.
Queue: 消息的队列. 消息最终被送到这里等待消费, 一个 message 可以被同时拷贝到多个 queue 中.
Binding: 绑定. exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key.Binding 信息被保存到 exchange 中的查询表中, 用于 message 的分发依据.
看完了这些概念, 我再给大家梳理一遍其流程:
当我们的生产者端往 Broker(RabbitMQ)中发送了一条消息, Broker 会根据其消息的标识送往不同的 Virtual host, 然后 Exchange 会根据消息的路由 key 和交换器类型将消息分发到自己所属的 Queue 中去.
然后消费者端会通过 Connection 中的 Channel 获取刚刚推送的消息, 拉取消息进行消费.
Tip: 某个 Exchange 有哪些属于自己的 Queue, 是由 Binding 绑定关系决定的.
3. RabbitMQ 环境
上面讲了 RabbitMQ 大概的结构图和一个消息的运行流程, 讲完了理论, 这里我们就准备实操一下吧, 先进行 RabbitMQ 安装.
官网下载地址: http://www.rabbitmq.com/download.html
由于我还没有属于自己 Mac 电脑, 所以这里的演示就按照 Windows 的来了, 不过大家都是程序员, 安装个东西总归是难不倒大家的吧
Windows 下载地址: https://www.rabbitmq.com/install-windows.html
进去之后可以直接找到 Direct Downloads, 下载相关 EXE 程序进行安装就可以了.
由于 RabbitMQ 是由 erlang 语言编写的, 所以安装之前我们还需要安装 erlang 环境, 你下载 RabbitMQ 之后直接点击安装, 如果没有相关环境, 安装程序会提示你, 然后会让你的浏览器打开 erlang 的下载页面 https://www.erlang.org/downloads , 在这个页面上根据自己的系统类型点击下载安装即可, 安装完毕后再去安装 RabbitMQ.
这两者的安装都只需要一直 NEXT 下一步就可以了.
安装完成之后可以按一下 Windows 键看到效果如下:
Tip: 其中 Rabbit-Command 后面会用到, 是 RabbitMQ 的命令行操作台.
安装完 RabbitMQ 我们需要对我们的开发环境也导入 RabbitMQ 相关的 JAR 包.
为了方便起见, 我们可以直接使用 Spring-boot-start 的方式导入, 这里面也会包含所有我们需要用到的 RabbitMQ 相关的 JAR 包.
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
直接引入 spring-boot-starter-amqp 即可.
4. Hello World
搭建好环境之后, 我们就可以上手了.
考虑到这是一个入门文章, 读者很多可能没有接触过 RabbitMQ, 直接使用自动配置的方式可能会令大家很迷惑, 因为自动配置会屏蔽很多细节, 导致大家只看到了被封装后的样子, 不利于大家理解.
所以在本节 Hello World 这里, 我会直接使用最原始的连接方式就行演示, 让大家看到最原始的连接的样子.
Tip: 这种方式演示的代码我都在放在 prototype 包下面.
4.1 生产者
先来看看生产者代码, 也就是我们 push 消息的代码:
- public static final String QUEUE_NAME = "erduo";
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 连接到本地 server
- connectionFactory.setHost("127.0.0.1");
- // 通过连接工厂创建连接
- Connection connection = connectionFactory.newConnection();
- // 通过连接创建通道
- Channel channel = connection.createChannel();
- // 创建一个名为耳朵的队列, 该队列非持久(RabbitMQ 重启后会消失), 非独占(非仅用于此链接), 非自动删除(服务器将不再使用的队列删除)
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String msg = "hello, 我是耳朵." + LocalDateTime.now().toString();
- // 发布消息
- // 四个参数为: 指定路由器, 指定 key, 指定参数, 和二进制数据内容
- channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
- System.out.println("生产者发送消息结束, 发送内容为:" + msg);
- channel.close();
- connection.close();
代码我都给了注释, 但是我还是要给大家讲解一遍, 梳理一下.
先通过 RabbitMQ 中的 ConnectionFactory 配置一下将要连接的 server-host, 然后创建一个新连接, 再通过此连接创建通道(Channel), 通过这个通道创建队列和发送消息.
这里看上去还是很好理解的, 我需要把创建队列和发送消息这里再拎出来说一下.
创建队列
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
创建队列的方法里面有五个参数, 第一个是参数是队列的名称, 往后的三个参数代表不同的配置, 最后一个参数是额外参数.
durable: 代表是否将此队列持久化.
exclusive: 代表是否独占, 如果设置为独占队列则此队列仅对首次声明它的连接可见, 并在连接断开时自动删除.
autoDelete: 代表断开连接后是否自动删除此队列.
arguments: 代表其他额外参数.
这些参数中 durable 经常会用到, 它代表了我们可以对队列做持久化, 以保证 RabbitMQ 宕机恢复后此队列也可以自行恢复.
发送消息
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
发送消息的方法里是四个参数, 第一个是必须的指定 exchange, 上面的示例代码中我们传入了一个空字符串, 这代表我们交由默认的匿名 exchange 去帮我们路由消息.
第二个参数是路由 key,exchange 会根据此 key 对消息进行路由转发, 第三个参数是额外参数, 讲消息持久化时会用到一下, 最后一个参数就是我们要发送的数据了, 需要将我们的数据转成字节数组的方式传入.
测试
讲完了这些 API 之后, 我们可以测试一下我们的代码了, run 一下之后, 会在控制台打出如下:
这样之后我们就把消息发送到了 RabbitMQ 中去, 此时可以打开 RabbitMQ 控制台 (前文安装时提到过) 去使用命令 rabbitmqctl.bat list_queues 去查看消息队列现在的情况:
可以看到有一条 message 在里面, 这就代表我们的消息已经发送成功了, 接下来我们可以编写一个消费者对里面的 message 进行消费了.
4.2 消费者
消费者代码和生产者的差不多, 都需要建立连接建立通道:
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 连接到本地 server
- connectionFactory.setHost("127.0.0.1");
- // 通过连接工厂创建连接
- Connection connection = connectionFactory.newConnection();
- // 通过连接创建通道
- Channel channel = connection.createChannel();
- // 创建消费者, 阻塞接收消息
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("-------------------------------------------");
- System.out.println("consumerTag :" + consumerTag);
- System.out.println("exchangeName :" + envelope.getExchange());
- System.out.println("routingKey :" + envelope.getRoutingKey());
- String msg = new String(body, StandardCharsets.UTF_8);
- System.out.println("消息内容 :" + msg);
- }
- };
- // 启动消费者消费指定队列
- channel.basicConsume(Producer.QUEUE_NAME, consumer);
- // channel.close();
- // connection.close();
建立完通道之后, 我们需要创建一个消费者对象, 然后用这个消费者对象去消费指定队列中的消息.
这个示例中我们就是新建了一个 consumer, 然后用它去消费队列 - erduo 中的消息.
最后两句代码我给注释掉了, 因为一旦把连接也关闭了, 那我们的消费者就不能保持消费状态了, 所以要开着连接, 监听此队列.
ok, 运行这段程序, 然后我们的消费者会去队列 - erduo 拿到里面的消息, 效果如下:
consumerTag: 是这个消息的标识.
exchangeName: 是这个消息所发送 exchange 的名字, 我们先前传入的是空字符串, 所以这里也是空字符串.
exchangeName: 是这个消息所发送路由 key.
这样我们的程序就处在一个监听的状态下, 你再次调用生产者发送消息消费者就会实时的在控制上打印消息内容.
5. 消息接收确认(ACK)
上面我们演示了生产者和消费者, 我们生产者发送一条消息, 消费者消费一条信息, 这个时候我们的 RabbitMQ 应该有多少消息?
理论上来说发送一条, 消费一条, 现在里面应该是 0 才对, 但是现在的情况并不是:
消息队列里面还是有 1 条信息, 我们重启一下消费者, 又打印了一遍我们消费过的那条消息, 通过消息上面的时间我们可以看出来还是当时我们发送的那条信息, 也就是说我们消费者消费过了之后这条信息并没有被删除.
这种状况出现的原因是因为 RabbitMQ 消息接收确认机制, 也就是说一条信息被消费者接收到了之后, 需要进行一次确认操作, 这条消息才会被删除.
RabbitMQ 中默认消费确认是手动的, 也可以将其设置为自动删除, 自动删除模式消费者接收到消息之后就会自动删除这条消息, 如果消息处理过程中发生了异常, 这条消息就等于没被处理完但是也被删除掉了, 所以这里我们会一直使用手动确认模式.
消息接受确认 (ACK) 的代码很简单, 只要在原来消费者的代码里加上一句就可以了:
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("-------------------------------------------");
- System.out.println("consumerTag :" + consumerTag);
- System.out.println("exchangeName :" + envelope.getExchange());
- System.out.println("routingKey :" + envelope.getRoutingKey());
- String msg = new String(body, StandardCharsets.UTF_8);
- System.out.println("消息内容 :" + msg);
- // 消息确认
- channel.basicAck(envelope.getDeliveryTag(), false);
- System.out.println("消息已确认");
- }
- };
我们将代码改成如此之后, 可以再 run 一次消费者, 可以看看效果:
再来看看 RabbitMQ 中的队列情况:
从图中我们可以看出消息消费后已经成功被删除了, 其实大胆猜一猜, 自动删除应该是在我们的代码还没执行之前就帮我们返回了确认, 所以这就导致了消息丢失的可能性.
我们采用手动确认的方式之后, 可以先将逻辑处理完毕之后(可能出现异常的地方可以 try-catch 起来), 把手动确认的代码放到最后一行, 这样如果出现异常情况导致这条消息没有被确认, 那么这条消息会在之后被重新消费一遍.
后记
今天的内容就到这里, 下一篇将会我们将会撇弃传统的手动建立连接的方式进行发消息收消息, 而转用 Spring 帮我们定义好的注解和 Spring 提供的 RabbitTemplate, 更方便的收发消息.
消息队列呢, 其实用法都是一样的, 只是各个开源消息队列的侧重点稍有不同, 我们应该根据我们自己的项目需求来决定我们应该选取什么样的消息队列来为我们的项目服务, 这个项目选型的工作一般都是开发组长帮你们做了, 一般是轮不到我们来做的, 但是面试的时候可能会考察相关知识, 所以这几种消息队列我们都应该有所涉猎.
来源: https://segmentfault.com/a/1190000023473302