先决条件
本教程假定 RabbitMQ 已经安装, 并运行在 localhost 标准端口(5672). 如果你使用不同的主机, 端口或证书, 则需要调整连接设置.
从哪里获得帮助
如果您在阅读本教程时遇到困难, 可以通过邮件列表联系我们 https://groups.google.com/forum/#!forum/rabbitmq-users .
1. 工作队列
(使用. NET 客户端)
在第一篇教程中, 我们编写了两个程序, 用于从一个指定的队列发送和接收消息. 在本文中, 我们将创建一个工作队列, 用于在多个工作线程间分发耗时的任务.
工作队列 (又名: 任务队列) 背后的主要想法是避免立即执行资源密集型, 且必须等待其完成的任务. 相反的, 我们把这些任务安排在稍后完成. 我们可以将任务封装为消息并把它发送到队列中, 在后台运行的工作进程将从队列中取出任务并最终执行. 当您运行多个工作线程, 这些任务将在这些工作线程之间共享.
这个概念在 web 应用程序中特别有用, 因为在一个 HTTP 请求窗口中无法处理复杂的任务.
2. 准备
我们将略微修改上一个示例中的 Send 程序, 以其可以在命令行发送任意消息.
这个程序将调度任务到我们的工作队列中, 所以让我们把它命名为 NewTask:
像教程[1], 我们需要生成两个项目:
- dotnet new console --name NewTask
- mv NewTask/Program.cs NewTask/NewTask.cs
- dotnet new console --name Worker
- mv Worker/Program.cs Worker/Worker.cs
- cd NewTask
- dotnet add package RabbitMQ.Client
- dotnet restore
- cd ../Worker
- dotnet add package RabbitMQ.Client
- dotnet restore
- var message = GetMessage(args);
- var body = Encoding.UTF8.GetBytes(message);
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
- channel.BasicPublish(exchange: "",
- routingKey: "task_queue",
- basicProperties: properties,
- body: body);
从命令行参数获取消息的帮助方法:
- private static string GetMessage(string[] args)
- {
- return ((args.Length> 0) ? string.Join("", args) :"Hello World!");
- }
我们旧的 Receive.cs 脚本也需要进行一些更改: 它需要为消息体中的每个点模拟一秒种的时间消耗. 它将处理由 RabbitMQ 发布的消息, 并执行任务, 因此我们把它复制到 Worker 项目并修改:
- // 构建消费者实例.
- var consumer = new EventingBasicConsumer(channel);
- // 绑定消息接收事件.
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine("[x] Received {0}", message);
- // 模拟耗时操作.
- int dots = message.Split('.').Length - 1;
- Thread.Sleep(dots * 1000);
- Console.WriteLine("[x] Done");
- };
- channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
模拟虚拟任务的执行时间:
- int dots = message.Split('.').Length - 1;
- Thread.Sleep(dots * 1000);
3. 循环调度
使用任务队列的优点之一是能够轻松地并行工作. 如果我们正在积累积压的工作, 我们仅要增加更多的工作者, 并以此方式可以轻松扩展.
首先, 我们尝试同时运行两个 Worker 实例. 他们都会从队列中获取消息, 但究竟如何? 让我们来看看.
您需要打开三个控制台, 两个运行 Worker 程序, 这些控制台作为我们的两个消费者 - C1 和 C2.
- # shell 1
- cd Worker
- dotnet run
- # => [*] Waiting for messages. To exit press CTRL+C
- # shell 2
- cd Worker
- dotnet run
- # => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中, 我们将发布一些新的任务. 一旦你已经运行了消费者, 你可以尝试发布几条消息:
- # shell 3
- cd NewTask
- dotnet run "First message."
- dotnet run "Second message.."
- dotnet run "Third message..."
- dotnet run "Fourth message...."
- dotnet run "Fifth message....."
让我们看看有什么发送到了我们的 Worker 程序:
- # shell 1
- # => [*] Waiting for messages. To exit press CTRL+C
- # => [x] Received 'First message.'
- # => [x] Received 'Third message...'
- # => [x] Received 'Fifth message.....'
- # shell 2
- # => [*] Waiting for messages. To exit press CTRL+C
- # => [x] Received 'Second message..'
- # => [x] Received 'Fourth message....'
默认情况下, RabbitMQ 会按顺序将每条消息发送给下一个消费者. 消费者数量平均的情况下, 每个消费者将会获得相同数量的消息. 这种分配消息的方式称为循环(Round-Robin). 请尝试开启三个或更多的 Worker 程序来验证.
4. 消息确认
处理一项任务可能会需要几秒钟的时间. 如果其中一个消费者开启了一项长期的任务并且只完成了部分就挂掉了, 您可能想知道会发生什么? 在我们当前的代码中, 一旦 RabbitMQ 把消息分发给了消费者, 它会立即将这条消息标记为删除. 在这种情况下, 如果您停掉某一个 Worker, 我们将会丢失这条正在处理的消息, 也将丢失所有分发到该 Worker 但尚未处理的消息.
但是我们不想丢失任何一个任务. 如果一个 Worker 挂掉了, 我们希望这个任务能被重新分发给其他 Worker.
为了确保消息永远不会丢失, RabbitMQ 支持消息确认 http://www.rabbitmq.com/confirms.html 机制. 消费者回发一个确认信号 Ack(nowledgement)给 RabbitMQ, 告诉它某个消息已经被接收, 处理并且可以自由删除它.
如果一个消费者在还没有回发确认信号之前就挂了(其通道关闭, 连接关闭或者 TCP 连接丢失),RabbitMQ 会认为该消息未被完全处理, 并将其重新排队. 如果有其他消费者同时在线, 该消息将会被会迅速重新分发给其他消费者. 这样, 即便 Worker 意外挂掉, 也可以确保消息不会丢失.
没有任何消息会超时; 当消费者死亡时, RabbitMQ 将会重新分发消息. 即使处理消息需要非常非常长的时间也没关系.
默认情况下, 手动消息确认 http://www.rabbitmq.com/confirms.html 模式是开启的. 在前面的例子中, 我们通过将 autoAck("自动确认模式")参数设置为 true 来明确地关闭手动消息确认模式. 一旦完成任务, 是时候删除这个标志并且从 Worker 手动发送一个恰当的确认信号给 RabbitMQ.
- // 构建消费者实例.
- var consumer = new EventingBasicConsumer(channel);
- // 绑定消息接收事件.
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine("[x] Received {0}", message);
- // 模拟耗时操作.
- int dots = message.Split('.').Length - 1;
- Thread.Sleep(dots * 1000);
- Console.WriteLine("[x] Done");
- // 手动发送消息确认信号.
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
- // autoAck:false - 关闭自动消息确认, 调用 `BasicAck` 方法进行手动消息确认.
- // autoAck:true - 开启自动消息确认, 当消费者接收到消息后就自动发送 ack 信号, 无论消息是否正确处理完毕.
- channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
使用上面这段代码, 我们可以确定的是, 即使一个 Worker 在处理消息时, 我们通过使用 CTRL + C 来终止它, 也不会丢失任何消息. Worker 挂掉不久, 所有未确认的消息将会被重新分发.
忘记确认
遗漏 BasicAck 是一个常见的错误. 这是一个很简单的错误, 但导致的后果却是严重的. 当客户端退出时(看起来像是随机分发的), 消息将会被重新分发, 但是 RabbitMQ 会吃掉越来越多的内存, 因为它不能释放未确认的消息.
为了调试这种错误, 您可以使用 rabbitmqctl 来打印
messages_unacknowledged
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上, 删除 sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
5. 消息持久化
我们已经学习了如何确保即使消费者挂掉, 任务也不会丢失. 但是如果 RabbitMQ 服务器停止, 我们的任务还是会丢失.
当 RabbitMQ 退出或崩溃时, 它会忘记已存在的队列和消息, 除非告诉它不要这样做. 为了确保消息不会丢失, 有两件事是必须的: 我们需要将队列和消息标记为持久.
首先, 我们需要确保 RabbitMQ 永远不会丢失我们的队列. 为了做到这一点, 我们需要把队列声明是持久的(Durable):
- // 声明队列, 通过指定 durable 参数为 `true`, 对消息进行持久化处理.
- channel.QueueDeclare(queue: "hello",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
虽然这个命令本身是正确的, 但是它在当前设置中不会起作用. 那是因为我们已经定义过一个名为 hello 的队列, 并且这个队列不是持久化的. RabbitMQ 不允许使用不同的参数重新定义已经存在的队列, 并会向尝试执行该操作的程序返回一个错误. 但有一个快速的解决办法 - 让我们用不同的名称声明一个队列, 例如 task_queue:
- channel.QueueDeclare(queue: "task_queue",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
注意, 该声明队列 QueueDeclare 方法的更改需要同时应用于生产者和消费者代码.
此时, 我们可以确定的是, 即使 RabbitMQ 重新启动, task_queue 队列也不会丢失. 现在我们需要将我们的消息标记为持久的(Persistent) - 通过将
IBasicProperties.Persistent
设置为 true.
- // 将消息标记为持久性.
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
关于消息持久性的说明
将消息标记为 Persistent 并不能完全保证消息不会丢失. 尽管它告诉 RabbitMQ 将消息保存到磁盘, 但当 RabbitMQ 接收到消息并且尚未保存消息时仍有一段时间间隔. 此外, RabbitMQ 不会为每条消息执行 fsync(2) - 它可能只是保存到缓存中, 并没有真正写入磁盘. 消息的持久化保证并不健壮, 但对于简单的任务队列来说已经足够了. 如果您需要一个更加健壮的保证, 可以使用发布者确认 https://www.rabbitmq.com/confirms.html .
6. 公平调度
您可能已经注意到调度仍然无法完全按照我们期望的方式工作. 例如, 在有两个 Worker 的情况下, 假设所有奇数消息都很庞大, 偶数消息都很轻量, 那么一个 Worker 将会一直忙碌, 而另一个 Worker 几乎不做任何工作. 是的, RabbitMQ 并不知道存在这种情况, 它仍然会平均地分发消息.
发生这种情况是因为 RabbitMQ 只是在消息进入队列后就将其分发. 它不会去检查每个消费者所拥有的未确认消息的数量. 它只是盲目地将第 n 条消息分发给第 n 位消费者.
为了改变上述这种行为, 我们可以使用参数设置 prefetchCount = 1 的 basicQos 方法.
这就告诉 RabbitMQ 同一时间不要给一个 Worker 发送多条消息. 或者换句话说, 不要向一个 Worker 发送新的消息, 直到它处理并确认了前一个消息.
相反, 它会这个消息调度给下一个不忙碌的 Worker.
channel.BasicQos(0, 1, false);
关于队列大小的说明
如果所有的 Worker 都很忙, 您的队列可能会被填满. 请留意这一点, 可以尝试添加更多的 Worker, 或者使用其他策略.
7. 组合在一起
我们 NewTask.cs 类的最终代码:
- using System;
- using RabbitMQ.Client;
- using System.Text;
- class NewTask
- {
- public static void Main(string[] args)
- {
- // 实例化连接工厂.
- var factory = new ConnectionFactory() { HostName = "localhost" };
- // 创建连接, 信道.
- using(var connection = factory.CreateConnection())
- using(var channel = connection.CreateModel())
- {
- // 声明队列, 标记为持久性.
- channel.QueueDeclare(queue: "task_queue",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // 获取发送消息.
- var message = GetMessage(args);
- var body = Encoding.UTF8.GetBytes(message);
- // 将消息标记为持久性.
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
- // 发送数据包.
- channel.BasicPublish(exchange: "",
- routingKey: "task_queue",
- basicProperties: properties,
- body: body);
- Console.WriteLine("[x] Sent {0}", message);
- }
- Console.WriteLine("Press [enter] to exit.");
- Console.ReadLine();
- }
- private static string GetMessage(string[] args)
- {
- return ((args.Length> 0) ? string.Join("", args) :"Hello World!");
- }
- }
- (NewTask.cs 源码) https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/dotnet/NewTask/NewTask.cs
还有我们的 Worker.cs:
- using System;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System.Text;
- using System.Threading;
- class Worker
- {
- public static void Main()
- {
- // 实例化连接工厂.
- var factory = new ConnectionFactory() { HostName = "localhost" };
- // 创建连接, 信道.
- using(var connection = factory.CreateConnection())
- using(var channel = connection.CreateModel())
- {
- // 声明队列, 标记为持久性.
- channel.QueueDeclare(queue: "task_queue",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // 告知 RabbitMQ, 在未收到当前 Worker 的消息确认信号时, 不再分发给消息, 确保公平调度.
- channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
- Console.WriteLine("[*] Waiting for messages.");
- // 构建消费者实例.
- var consumer = new EventingBasicConsumer(channel);
- // 绑定消息接收事件.
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine("[x] Received {0}", message);
- // 模拟耗时操作.
- int dots = message.Split('.').Length - 1;
- Thread.Sleep(dots * 1000);
- Console.WriteLine("[x] Done");
- // 手动发送消息确认信号.
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
- channel.BasicConsume(queue: "task_queue",
- autoAck: false,
- consumer: consumer);
- Console.WriteLine("Press [enter] to exit.");
- Console.ReadLine();
- }
- }
- }
- (Worker.cs 源码) https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/dotnet/Worker/Worker.cs
使用消息确认机制和 BasicQ 您可以创建一个工作队列. 即使 RabbitMQ 重新启动, 通过持久性选项也可让任务继续存在.
有关 IModel 方法和 IBasicProperties 的更多信息, 您可以在线浏览 RabbitMQ .NET 客户端 API 参考 http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.6.10/rabbitmq-dotnet-client-3.6.10-client-htmldoc/html/index.html .
现在, 我们可以继续阅读教程[3], 学习如何向多个消费者发送相同的消息.
8. 写在最后
本文翻译自 RabbitMQ 官方教程 C# 版本. 本文介绍如与官方有所出入, 请以官方最新内容为准.
水平有限, 翻译的不好请见谅, 如有翻译错误还请指正.
来源: https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html