1 消息确认
在一些场合, 如转账, 付费时每一条消息都必须保证成功的被处理. AMQP 是金融级的消息队列协议, 有很高的可靠性, 这里介绍在使用 RabbitMQ 时怎么保证消息被成功处理的. 消息确认可以分为两种: 一种是生产者发送消息到 Broke 时, Broker 给生产者发送确认回执, 用于告诉生产者消息已被成功发送到 Broker; 一种是消费者接收到 Broker 发送的消息时, 消费者给 Broker 发送确认回执, 用于告诉消费者消息已成功被消费者接收.
下边分别介绍生产者端和消费者端的消息确认方法. 准备条件: 使用 Web 管理工具添加 exchange,queue 并绑定, bindingKey 为 "mykey", 如下所示:
1 生产者端消息确认(tx 机制和 Confirm 模式)
生产者端的消息确认: 当生产者将消息发送给 Broker,Broker 接收到消息给生产者发送确认回执. 生产者端的消息确认有两种方式: tx 机制和 Confirm 模式.
1.tx 机制
tx 机制可以叫做事务机制, RabbitMQ 中有三个与 tx 机制的方法: txSelect(), txCommit()和 txRollback(). channel.txSelect() 用于将当前 channel 设置成 transaction 模式, channel.txCommit() 提交事务, channel.txRollback() 回滚事务. 使用 tx 机制, 我们首先要通过 txSelect 方法开启事务, 然后发布消息给 broker 服务器了, 如果 txCommit 提交成功了, 则说明消息成功被 broker 接收了; 如果在 txCommit 执行之前 broker 异常崩溃或者由于其他原因抛出异常, 这个时候我们可以捕获异常, 通过 txRollback 回滚事务. 看一个 tx 机制的简单实现:
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- // 创建连接 connection
- using (var connection = factory.CreateConnection())
- {
- // 创建通道 channel
- using (var channel = connection.CreateModel())
- {
- Console.WriteLine("生产者准备就绪....");
- string message = "";
- // 发送消息
- // 在控制台输入消息, 按 enter 键发送消息
- while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
- {
- message = Console.ReadLine();
- var body = Encoding.UTF8.GetBytes(message);
- try
- {
- // 开启事务机制
- channel.TxSelect();
- // 发送消息
- channel.BasicPublish(exchange: "myexchange",
- routingKey: "mykey",
- basicProperties: null,
- body: body);
- // 事务提交
- channel.TxCommit();
- Console.WriteLine($"[{message}] 发送到 Broke 成功!");
- }
- catch (Exception)
- {
- Console.WriteLine($"[{message}] 发送到 Broker 失败!");
- channel.TxRollback();
- }
- }
- }
- }
- Console.ReadKey();
- }
程序运行结果如下:
2 Confirm 模式
- C# 的 RabbitMQ API 中, 有三个与 Confirm 相关的方法: ConfirmSelect(),WaitForConfirms()和 WaitForConfirmOrDie. channel.ConfirmSelect() 表示开启 Confirm 模式; channel.WaitForConfirms() 等待所有消息确认, 如果所有的消息都被服务端成功接收返回 true, 只要有一条没有被成功接收就返回 false. channel.WaitForConfirmsOrDie() 和 WaitForConfirms 作用类型, 也是等待所有消息确认, 区别在于该方法没有返回值(Void), 如果有任意一条消息没有被成功接收, 该方法会立即抛出一个 OperationInterrupedException 类型异常. 看一个 Confirm 模式的简单实现:
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- // 创建连接 connection
- using (var connection = factory.CreateConnection())
- {
- // 创建通道 channel
- using (var channel = connection.CreateModel())
- {
- Console.WriteLine("生产者准备就绪....");
- string message = "";
- // 在控制台输入消息, 按 enter 键发送消息
- while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
- {
- message = Console.ReadLine();
- var body = Encoding.UTF8.GetBytes(message);
- // 开启 Confirm 模式
- channel.ConfirmSelect();
- // 发送消息
- channel.BasicPublish(exchange: "myexchange",
- routingKey: "mykey",
- basicProperties: null,
- body: body);
- //WaitForConfirms 确认消息 (可以同时确认多条消息) 是否发送成功, 如果返回 false 表示发送失败, 会自动重新发送
- if (channel.WaitForConfirms())
- {
- Console.WriteLine($"[{message}] 发送到 Broke 成功!");
- }
- }
- }
- }
- Console.ReadKey();
- }
程序运行结果:
2 消费者端消息确认(自动确认和显示确认)
从 Broke 发送到消费者时, RabbitMQ 提供了两种消息确认的方式: 自动确认和显示确认.
1 自动确认
自动确认: 当 RabbbitMQ 将消息发送给消费者后, 消费者端接收到消息后, 不等待消息处理结束, 立即自动回送一个确认回执. 自动确认的用法十分简单, 设置消费方法的参数 autoAck 为 true 即可, 我们前边的例子都是使用的自动确认, 这里不再详细演示, 如下:
channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);
注意: Broker 会在接收到确认回执时删除消息, 如果消费者接收到消息并返回了确认回执, 然后这个消费者在处理消息时挂了, 那么这条消息就再也找不回来了.
2 显示确认
我们知道自动确认可能会出现消息丢失的问题, 我们不免会想到: Broker 收到回执后才删除消息, 如果可以让消费者在接收消息时不立即返回确认回执, 等到消息处理完成后 (或者完成一部分的逻辑) 再返回确认回执, 这样就保证消费端不会丢失消息了! 这正是显式确认的思路. 使用显示确认也比较简单, 首先将 Resume 方法的参数 autoAck 设置为 false, 然后在消费端使用代码 channel.BasicAck()/BasicReject()等方法 来确认和拒绝消息. 看一个栗子:
生产者代码如下:
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- // 创建连接 connection
- using (var connection = factory.CreateConnection())
- {
- // 创建通道 channel
- using (var channel = connection.CreateModel())
- {
- Console.WriteLine("生产者准备就绪....");
- string message = "";
- // 发送消息
- // 在控制台输入消息, 按 enter 键发送消息
- while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
- {
- message = Console.ReadLine();
- var body = Encoding.UTF8.GetBytes(message);
- // 基本发布
- channel.BasicPublish(exchange: "myexchange",
- routingKey: "mykey",
- basicProperties: null,
- body: body);
- Console.WriteLine($"消息[{message}] 已发送到队列");
- }
- }
- }
- Console.ReadKey();
- }
消费者代码如下:
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- using (var connection = factory.CreateConnection())
- {
- using (var channel = connection.CreateModel())
- {
- // 定义消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- string message = Encoding.UTF8.GetString(ea.Body);
- Console.WriteLine($"接受到消息[{message}]");
- // 以 news 开头表示是新闻类型, 处理完成后确认消息
- if (message.StartsWith("news"))
- {
- // 这里处理消息 balabala
- Console.WriteLine($"[{message}] 是新闻消息, 处理消息并确认");
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- }
- // 不以 news 开头表示不是新闻类型, 不进行处理, 把消息退回到 queue 中
- else
- {
- Console.WriteLine($"[{message}] 不是新闻类型, 拒绝处理");
- channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
- }
- };
- Console.WriteLine("消费者准备就绪....");
- // 第五步: 处理消息
- channel.BasicConsume(queue: "myqueue",
- autoAck: false,
- consumer: consumer);
- Console.ReadKey();
- }
- }
- }
介绍一下代码中标红的两个方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用于确认消息, deliveryTag 参数是分发的标记, multiple 表示是否确认多条. channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用于拒绝消息, deliveryTag 也是指分发的标记, requeue 表示消息被拒绝后是否重新放回 queue 中, true 表示放回 queue 中, false 表示直接丢弃.
运行这两个应用程序, 通过生产者发送两条消息, 效果如下:
一些意外的情况: 使用显式确认时, 如果消费者处理完消息不发送确认回执, 那么消息不会被删除, 消息的状态一直是 Unacked, 这条消息也不会再发送给其他消费者. 如果一个消费者在处理消息时尚未发送确认回执的情况下挂掉了, 那么消息会被重新放入队列(状态从 Unacked 变成 Ready), 有其他消费者存时, 消息会发送给其他消费者.
2 消息持久化 / 优先级
1 消息持久化(Persistent)
在前边已经介绍了 exchange 和 queue 的持久化, 把 exchange 和 queue 的 durable 属性设置为 true, 重启 rabbitmq 服务时( 重启命令: rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange 和 queue 也会恢复. 我们需要注意的是: 如果 queue 设置 durable=true,rabbitmq 服务重启后队列虽然会存在, 但是队列内的消息会丢全部丢失. 那么怎么实现消息的持久化呢? 实现的方法很简单: 将 exchange 和 queue 都设置 durable=true, 然后在消息发布的时候设置 persistent=true 即可. 看一个栗子:
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- // 创建连接 connection
- using (var connection = factory.CreateConnection())
- {
- // 创建通道 channel
- using (var channel = connection.CreateModel())
- {
- Console.WriteLine("生产者准备就绪....");
- string message = "";
- // 在控制台输入消息, 按 enter 键发送消息
- while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
- {
- message = Console.ReadLine();
- var body = Encoding.UTF8.GetBytes(message);
- // 设置消息持久化
- var props = channel.CreateBasicProperties();
- props.Persistent = true;
- channel.BasicPublish(exchange: "myexchange",
- routingKey: "mykey",
- basicProperties: props,
- body: body);
- //WaitForConfirms 确认消息 (可以同时确认多条消息) 是否发送成功, 如果返回 false 表示发送失败, 会自动重新发送
- Console.WriteLine($"[{message}] 发送到 Broke 成功!");
- }
- }
- }
- Console.ReadKey();
- }
声明 exchange 和 queue 时设置 durable=true, 然后执行上边的代码, 传入一条消息. 重启 rabbitmq 后, exchange,queue 和消息都会恢复. 我们也可以在 Web 管理界面设置消息持久化, 如下:
2 消息优先级(Priority)
我们知道 queue 是先进先出的, 即先发送的消息, 先被消费. 但是在具体业务中可能会遇到要提前处理某些消息的需求, 如一个常见的需求: 普通客户的消息按先进先出的顺序处理, Vip 客户的消息要提前处理. 消息实现优先级控制的实现方式是: 首先在声明 queue 是设置队列的 x-max-priority 属性, 然后在 publish 消息时, 设置消息的优先级等级即可. 为了演示方便, 约定所有 vip 客户的信息都以 vip 开头, 看一下代码实现:
生产者代码:
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- // 创建连接 connection
- using (var connection = factory.CreateConnection())
- {
- // 创建通道 channel
- using (var channel = connection.CreateModel())
- {
- // 声明交换机 exchang
- channel.ExchangeDeclare(exchange: "myexchange",
- type: ExchangeType.Direct,
- durable: true,
- autoDelete: false,
- arguments: null);
- // 声明队列 queue
- channel.QueueDeclare(queue: "myqueue",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: new Dictionary<string, object>() {
- // 队列优先级最高为 10, 不加 x-max-priority 的话, 计算发布时设置了消息的优先级也不会生效
- {"x-max-priority",10 }
- });
- // 绑定 exchange 和 queue
- channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");
- Console.WriteLine("生产者准备就绪....");
- // 一些待发送的消息
- string[] msgs = { "vip1", "hello2", "world3","common4", "vip5" };
- // 设置消息优先级
- var props = channel.CreateBasicProperties();
- foreach (string msg in msgs)
- {
- //vip 开头的消息, 优先级设置为 9
- if (msg.StartsWith("vip"))
- {
- props.Priority = 9;
- channel.BasicPublish(exchange: "myexchange",
- routingKey: "mykey",
- basicProperties: props,
- body: Encoding.UTF8.GetBytes(msg));
- }
- // 其他消息的优先级为 1
- else
- {
- props.Priority = 1;
- channel.BasicPublish(exchange: "myexchange",
- routingKey: "mykey",
- basicProperties: props,
- body: Encoding.UTF8.GetBytes(msg));
- }
- }
- }
- }
- Console.ReadKey();
- }
消费者, 不需要对消费者做额外的配置, 代码如下:
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory()
- {
- //rabbitmq-server 所在设备 ip, 这里就是本机
- HostName = "127.0.0.1",
- UserName = "wyy",// 用户名
- Password = "123321"// 密码
- };
- using (var connection = factory.CreateConnection())
- {
- using (var channel = connection.CreateModel())
- {
- #region EventingBasicConsumer
- // 定义消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- Console.WriteLine(Encoding.UTF8.GetString(ea.Body));
- };
- Console.WriteLine("消费者准备就绪....");
- // 处理消息
- channel.BasicConsume(queue: "myqueue",
- autoAck: true,
- consumer: consumer);
- Console.ReadKey();
- #endregion
- }
- }
- }
运行程序, 结果如下, 我们看到 vip 开头的消息被率先处理了, 证明优先级是生效的
3 小结
本节简单介绍了 Rabbitmq 中的消息确认, 消息持久化, 消息优先级的实现方式, 这几个功能在开发中会经常用到, RabbitMQ 还有一些其他有用的功能, 如 Lazy queue 模式, dead letter 处理, queue 的消息条数, 字节数限制等, 这里没有具体演示, 有兴趣的园友可以自己研究一下.
来源: https://www.cnblogs.com/wyy1234/p/10868416.html