RabbitMQ 概述
RabbitMQ 是遵从 AMQP 协议的 通信协议都设计到报文交互, 换句话说 RabbitMQ 就是 AMQP 协议的 Erlang 的实现.
AMQP 说到底还是一个通信协议从 low-level 层面举例来说, AMQP 本身是应用层的协议, 其填充于 TCP 协议的数据部分.
从 high-level 层面来说, AMQP 是通过协议命令进行交互的. 命令类似 HTTP 中的方法(GET PUT POST DELETE 等).
信道 (Channel) 在 AMQP 是一个很重要的概念, 大多数操作都是在信道这个层面展开的
我们完全可以用 Connection 就能完成信道的工作, 为什么还要引入信道?
试想: 一个程序中有很多个线程需要从 RabbitMQ 中消费消息, 或者生产消息, 那么必然需要建立很多个 Connection, 也就是多个 TCP 连接.
建立和销毁 TCP 连接开销很昂贵. 所以 RabbitMQ 采用类似 NIO 的做法, 选择 TCP 连接复用. 不仅可以减少性能开销, 同时也便于管理.
发布订阅模式
广播模式 topic
所谓广播指的是一条消息将被所有的消费者进行处理.
直连模式 director
直连模式的特点主要就是 routingkey 的使用, 如果现在该消息就要求指定一个具备有指定 Routingkey 的操作者进行处理, 那么只需要两个的 Routingkey 匹配即可.
可以将 Routingkey 比喻一个唯一标记, 这样就可以将消息准确的推送到消费者手中了.
主题模式 fanout
主题模式类似于广播模式与直连模式的整合操作, 所有的消费者都可以接收到主题信息, 但是如果要想进行正确的处理, 则一定需要有一个合适的 Routingkey 完成操作.
交换器相当于投递包裹的邮箱, Routingkey 相当于包裹的地址, BindingKey 相当于包裹的目的地.
当填写在包裹上的地址和要投递的地址相匹配时, 那么这个包裹就会正确投递到目的地, 最后这个目的地的主人 (队列) 可以保留这个包裹.
如果填写地址出错, 邮递员不能正确的投递到目的地, 包裹可能被退回给寄件人, 也有可能被丢弃.
RabbitMQ 官方文档和 API 都把 Routingkey 和 BingdingKey 都看做 Routingkey 下面代码中红色部分 就都当 Routingkey 使用
消息生产者
- public class MessageProducer {
- private static final String EXCHANGE_NAME ="com.sunkun.topic";// 消息队列名称
- private static final String HOST="192.168.1.105";
- private static final int PORT=5672;
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();// 建立一个连接工厂
- factory.setHost(HOST);
- factory.setPort(PORT);
- factory.setUsername("sunkun");
- factory.setPassword("123456");
- //factory.setVirtualHost(virtualHost) 使用虚拟主机的最大好处 可以区分不同用户的操作空间 每一个虚拟主机有一个自己的空间管理
- Connection conn = factory.newConnection();// 定义一个新的 RabbitMQ 的连接
- Channel channel = conn.createChannel();// 创建一个通讯的通道
- // 定义该通道要使用的队列名称 此时队列已经创建过了
- // 第一个参数 队列名称(这个队列可能存在也可能不存在)
- // 第二个参数 是否持久保存
- // 第三个参数 此队列是否为专用的队列信息
- // 第四个参数 是否允许自动删除
- //channel.queueDeclare(QUENE_NAME, true, false, true,null);
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- long start = System.currentTimeMillis();
- System.out.println("消息开始"+start);
- for(int i=0;i<1000;i++){
- String message = "sk -"+i;
- if(i%2==0){
- //MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
- channel.basicPublish(EXCHANGE_NAME, "sk1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// 进行消息发送
- }else{
- channel.basicPublish(EXCHANGE_NAME, "sk2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// 进行消息发送
- }
- }
- long end = System.currentTimeMillis();
- System.out.println("消息花费时间"+(end-start));
- channel.close();
- }
- }
消息消费者
- public class MessageConsumer {
- private static final String EXCHANGE_NAME ="com.sunkun.topic";// 消息队列名称
- private static final String HOST="192.168.1.105";
- private static final int PORT=15672;
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();// 建立一个连接工厂
- factory.setHost(HOST);
- factory.setPort(PORT);
- factory.setUsername("sunkun");
- factory.setPassword("123456");
- Connection conn = factory.newConnection();// 定义一个新的 RabbitMQ 的连接
- Channel channel = conn.createChannel();// 创建一个通讯的通道
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String queueName = channel.queueDeclare().getQueue();// 通过通道获取一个队列名称
- channel.queueBind(queueName, EXCHANGE_NAME, "sk2");// 进行绑定处理
- // 在 RabbitMQ 里面, 所有的消费者信息是通过一个回调方法完成的
- Consumer consumer = new DefaultConsumer(channel){// 需要复写指定的方法实现消息处理
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body);
- System.out.println("消费者 sk2:"+message);// 可以启动多个消费者
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(queueName,consumer);
- }
- }
RabbitMQ 如何保证消息的可靠性
1)持久化
持久化可以提高 RabbitMQ 的可靠性, 防止在异常情况 (重启, 关闭, 宕机) 下的数据丢失.
持久化可分为三个部分: 交换器的持久化, 队列的持久化和消息的持久化.
交换器的持久化: 是通过声明交换器时将 druable 参数设置为 true 来实现的. 如果交换器不设置持久化, 那么在 RabbitMQ 重启之后相关的交换器元数据会丢失, 不过消息不会丢失, 只是不能将消息发送到这个交换器中了.
对于一个长期使用的交换器来说, 建议其置为持久化.(消息不直接往队列发, 往 exchange 发送 可以实现广播模式)
队列的持久化: 是通过声明队列时将 durable 参数置为 true 实现的. 如果队列不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关队列的元数据会丢失, 此时数据也会丢失.
消息的持久化: 因为队列的持久化能保证其本身的元数据不会因为异常情况而丢失, 但是不能保证内部存储的消息不会丢失. 要确保消息不会丢失, 需求将其设置为持久化.
通过将消息的投递模式 (MessageProperties.PERSISTENT_TEXT_PLAIN) 即可实现消息的持久化
2)集群
RabbitMQ 的集群本身不带有所谓的 HA 机制以及负载均衡机制 http://blog.51cto.com/server110/1920371
本文主要讲镜像队列
在持久化的消息正确存入到 RabbitMQ 之后 还需要一段时间 (虽然时间很短, 但不可忽视) 才能存入磁盘中, 如果这段时间发生了宕机, 消息保存还没来得及落盘, 那么这些消息将会丢失.
这里可以引入 RabbitMQ 的镜像队列机制, 相当于配置了副本, 如果主节点 (master) 在此特殊时间内挂掉, 可以自动切换到从节点(Slave), 在实际生产环境中的关键业务队列都会设置镜像队列.
提醒: 所谓的镜像队列只是进行数据的副本而已, 在所谓的 RabbitMQ 集群里面并不支持 HA 机制以及所谓的负载均衡, 如果说现在一台主机挂掉了, 那么其他主机肯定无法进行合理读取的.
如果想要安全的使用 RabbitMQ 就要继续追加负载均衡组件, 列如 HAProxy LVS 等等, 如果要保证负载均衡组件的高可用, 还应该继续追加 KeepAlive 组件(就像 tomcat 实现负载均衡 需要 nginx 一样).
3)生产者确认
除上面两个问题外 我们还遇到一个新问题: 当消息的生产者将消息发送出去之后, 消息到底有没有正确的到达服务器呢?
如果消息到达服务器之前就丢失, 那么持久化也解决不了问题, 因为消息就没有到达服务器, 何谈持久化呢.
通常会有两种方法解决此问题一时事物机制, 只有消息被成功接收, 事物才能提交成功, 否则便可在捕获异常之后进行事物回滚, 于此同时可以进行消息重发.
但使用事物机制会大大降低 RabbitMQ 的性能, 我们一般采取发送方确认机制.
发送方确认机制: 生产者将信道设置成 confirm 模式, 一旦信道进入 confirm 模式, 所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),
一旦消息被投递到所有的匹配队列之后, RabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID), 这就使得生产者知晓消息已经到达目的地了.
如果消息和队列是持久化的, 那么消息确认会在消息写入磁盘后发出.
来源: https://www.cnblogs.com/ssskkk/p/9653330.html