RabbitMQ 是使用 Erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(Advanced Message Queuing Protocol, AMQP).
与 Kafka 等消息队列相比, RabbitMQ 最大的优势在于其较高的可靠性:
提供确认 (ACK) 和重传机制保证消息完成消费, 消费者异常不会导致消息丢失
提供消息持久化机制, broker 崩溃不会导致消息丢失
集群模式下工作, 保证高可用
因为具有较高可靠性和一致性, RabbitMQ 可以胜任订单处理, 秒杀等一致性要求较高的业务场景.
RabbitMQ 概念与机制
RabbitMQ 中的概念模型:
Broker: 消息中间件实例, 可能是单个节点也可能是运行在多节点集群上的逻辑实体
消息(Message): 消息由消息头和消息体两部分组成. 消息头中包括 routing-key,priority 等标准消息头以及其它自定义消息头, 用于定义 RabbitMQ 对消息行为. 消息体是字节流, 包含消息内容.
连接(Connection): 客户端与 Broker 之间的 TCP 连接
信道 (Channel): Channel 是建立在 TCP 连接上的逻辑(虚拟) 连接. 多个 Channel 复用同一个 TCP 连接, 以避免建立 TCP 连接的巨大开销. RabbitMQ 官方要求每个线程使用独立的 Channel, 禁止多个线程共用 Channel.
生产者(Publisher): 发送消息的客户端线程
消费者(Consumer): 处理消息的客户端线程
交换机(Exchange): 交换机负责将消息投递到相应的队列
队列(Queue): 接收并保存交换机投递的消息, 直至被消费者成功消费. 逻辑结构遵循先进先出 FIFO.
绑定 (Binding): 将队列(Queue) 注册到交换机 (Exchange) 的路由表
虚拟主机(Vhost): 每个 Broker 下可建立多个 vhost, 每个 vhost 可建立独立的 Exchange,Queue, 绑定及权限系统. 同一个 Broker 下的 vhost 共享 Connection,Channel 和 用户系统, 就是说可以使用同一个用户身份使用同一个 Channel 访问不同 vhost.
交换机(Exchange)
生产者发送的消息会首先送到交换机(Exchange), 交换机根据自身类型和消息的 routing-key 等信息将消息投递到绑定的消息队列中.
RabbitMQ 中的四种标准交换机:
direct: 如果消息的 routing-key 与队列的 binding-key 完全相同, direct 类型的交换机则会将消息投递到该队列中.
多个队列可以使用相同的 binding-key 绑定到同一个 direct 交换机, direct 交换机会把消息投递到所有 binding-key 与消息 routing-key 相同的队列
topic: 允许队列的 binding-key 中包含通配符 * 和 #, topic 交换机会将消息投递到 binding-key 与 routing-key 匹配的队列中.
通配符按照关键字进行匹配, 如 news.cn.a 中的关键字是 news,cn 和 a, 即关键字按照. 分割
# 通配符匹配 0 个或多个关键字, news.#.a 可以匹配 news.a, news.cn.a 和 news.asia.cn.a 等
* 通配符匹配一个关键字, news.*.a 匹配 news.cn.a 不匹配 news.a,news.asia.cn.a
fanout: fanout 交换机不进行任何匹配, 将消息投递到所有绑定的队列
header: header 交换机根据消息头进行投递, 现在已较少使用
我们可以使用 RabbitMQ 的插件机制使用第三方交换机或自行开发交换机. 如实现延时投递的.
消息头中的 delivery-mode 可以设置为 persistent(持久化) 或者 transient(易失). Exchange 和 Queue 在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理, 即使 RabbitMQ 崩溃也不会丢失.
消费者客户端通常使用的 channel.basicConsume 使用推 (push) 模式投递消息, 即当有新消息时 Broker 通过 channel 主动向客户端发送消息. 客户端也可以使用 channel.basicGet 从 Broker 拉取消息.
ACK 机制
RabbitMQ 提供了确认送达 (acknowledge) 机制保证消息被正确处理不会丢失.
确认送达的回执有三种:
ACK: 消息已被成功处理
NACK: 消息处理异常, 需要重新投递
REJECT: 消息非法, 丢弃消息
RabbitMQ 的 Queue 可以设置 no_ack=true, 则消息被投递后即删除不等待回执.
channel.basicConsume 可以指定 auto_ack 模式, 若 auto_ack=true 当客户端收到完整消息后即会自动发出 ACK 回执, 否则必须显式的发出回执.
Java 代码示例
首先安装并启动 RabbitMQ 实例, Mac 用户可以使用 Homebrew 进行安装:
brew install rabbitmq
启动服务:
brew services start rabbitmq
或者使用官方 docker 镜像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
RabbitMQ 官网 http://www.rabbitmq.com/install-windows.html 提供了 Ubuntu,RPM 以及 Windows 等多种平台安装方式.
RabbitMQ 默认 TCP 端口为 5672, web 控制台默认端口 15672.
在 Maven 中添加依赖:
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.5.1</version>
- </dependency>
编写生产者:
- package rabbit;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- /**
- * @author finley
- */
- public class RabbitProducer {
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setHost("localhost");
- try (Connection conn = factory.newConnection();
- Channel channel = conn.createChannel()) {
- String exchangeName = "test-exchange";
- channel.exchangeDeclare(exchangeName, "direct", true);
- String routingKey = "hello";
- byte[] msg = "hello world".getBytes();
- AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
- propsBuilder.deliveryMode(2); // persistent
- propsBuilder.priority(0); // normal
- propsBuilder.contentType("text/plain");
- channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
- }
- }
- }
编写消费者:
- package rabbit;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import com.rabbitmq.client.*;
- /**
- * @author finley
- */
- public class RabbitConsumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setHost("localhost");
- try (Connection conn = factory.newConnection();
- Channel channel = conn.createChannel()) {
- String exchangeName = "test-exchange";
- channel.exchangeDeclare(exchangeName, "direct", true);
- String queueName = channel.queueDeclare().getQueue();
- String bindingKey = "hello";
- channel.queueBind(queueName, exchangeName, bindingKey);
- while(true) {
- channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String routingKey = envelope.getRoutingKey();
- String contentType = properties.getContentType();
- String bodyStr = new String(body, "UTF-8");
- System.out.println("routingKey:" + routingKey + ", contentType:" + contentType + ", body:" + bodyStr);
- long deliveryTag = envelope.getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- }
- });
- }
- }
- }
- }
RabbitMQ 的消息为字节, 可以将 Java 对象序列化后作为消息体发送.
来源: https://www.cnblogs.com/Finley/p/10126315.html