本文所有内容均个人从 RabbitMQ 官网教程 中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除.This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
工作队列
(使用 php-amqplib )
producer->queue->consumers
在第一个教程中我们写了程序去通过一个已经被 named(被命名)的 Queue(队列)发送和接收消息.在这个教程中,我们将会创建一个 Work Queue(工作队列) 用于在多个处理程序中分配耗时任务.
Work Queues(工作队列)(又名:Task Queues(任务队列))的主要思想是避免立即处理资源密集型(resource-intensive)任务,并且必须等待它完成.相反地,我们计划让任务稍后完成.我们把一个任务封装成消息,并且发送它到 Queue(队列).一个在后台运行的工作线程将会把任务出队并最终执行它.当你运行非常多的处理程序,这些(多个)任务将会被均分到它们之间.
这个概念对于不可能在短暂的 HTTP 请求中处理一个复杂运算的网站应用来说十分有用.
准备
在此教程的上一部分我们发送了一个包含 "Hello World!" 的消息.现在我们将会发送字符串来代替(模拟)复杂的任务.我们并没有一个真实的类似于缩放图片或者渲染 pdf 的(复杂)任务,所以让我们通过使用 sleep() 函数假装我们很忙来伪造此类复杂任务.我们以字符串中的点(.)来描述它的复杂度;每一个点(.)将会占用" 工作 " 的一秒中.例如,一个通过 Hello... 来描述的伪装任务将会花费三秒钟来处理.
为了允许发送来自命令行的任意消息我们将会稍微修改上一个例子的 send.php 代码.这个程序将会安排任务到我们的工作 Queue(队列)中,我们把他命名为 new_task.php:
$data = implode(' ',array_slice($argv,1));
if(empty($data)) $data = 'Hello World!';
$msg = new AMQPMessage($data);
$channel->basic_publish($msg,'','hello');
echo " [x] Sent ", $data, "\n";
我们旧的 receive.php 脚本同样需要一点修改:它需要去将消息中的每一个点 (.) 伪装成任务中花费一秒的时间.它将会把消息从 Queue(队列)中出队并且执行它,所以我们将它命名为 worker.php :
$callback = function($msg){
echo "[x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body,'.'));
echo " [x] Done","\n";
}
$channel->basic_consume('hello','',false,true,false,false,$callback);
注意我们伪装的任务将会模拟执行时间.
想在第一个教程中那样运行他们
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
循环调度
使用任务队列 (Task Queue) 的一个优点就是它可以轻松地并行化任务.如果我们积压了大量的任务,我们只需要添加更多的工作程序,这可以很轻松的进行扩展规模.
首先,让我们尝试同时运行两个 worker.php 脚本.他们都将会从 Queue(队列)中获得消息,但确切的情况又是如何呢?让我们来看看:
你需要打开三个控制台.其中两个运行 worker.php 脚本.这些控制台将会成为我们的两个 Consumer(消费者)--C1 和 C2.
# shell 1
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
# shell 2
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
在第三个控制台我们将会发送一些新的任务.一旦你运行了 Consumers(消费者)你就可以推送一些消息:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
让我们看一下任务交付情况:
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 将会按顺序地发送每一个消息到下一个 Consumer(消费者).平均每个消费者都会得到相同数量的消息.这个分发消息的方式叫做轮询调度 (round-robin).使用三个或更多的处理程序来尝试它吧.
消息确认
执行一个任务要花费一些时间.你可能会想如果其中一个 Consumer(消费者)开始了一个长任务并且只完成部分任务就挂掉了会发生什么.目前我们的代码,一旦 RabbitMQ 发送了一个消息去客户端它会立马标记他们为删除.在这一情况下,如果你杀掉其中一个工作程序我们将会失去那些只是在处理中(但尚未完成)的消息.我们同样失去了所有发送了给这个程序但还未处理的消息.
但是我们不想丢失任何任务.如果一个工作程序挂掉,我们想把这个任务发送到其他的工作程序.
为了确保消息永不消失,RabbitMQ 支持 消息确认 .Consumer 将会反回一个应答去告诉 RabbitMQ 一个特定的消息已经被接收,处理,这时 RabbitMQ 就可以自由地删除这一消息了.
如果一个 Consumer(消费者)在反回应答 (ack) 信号前挂掉了(例如它的 channel(频道) 关闭了,连接关闭了,或者 TCP 连接丢失了),RabbitMQ 将会知道该消息并没有被完全处理并且会重新把它插入 Queue(队列).如果这时有其他的 Consumers(消费者)在线,它会立马重新发送这条消息去其他的 Consumer(消费者).这一种方式即使处理程序意外挂掉你也能确保没有消息丢失.
这里没有任何消息会因为超时而丢失;当 Consumer(消费者) 挂掉 RabbitMQ 将会重新发送消息,即使处理一个消息要很长很长的时间.(一直等到能发送到下一个消费者?)
消息确认默认是关闭的.当设置 basic_consume 函数的第四个参数为 false(true 代表没有消息确认)并且当工作程序在完成一个任务的时候返回一个适当的应答信号将会开启消息确认.
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用这段代码我们将能确保没有任何消失会丢失,即使你用 ctrl+c 杀掉了一个正在处理消息的工作程序.很快在该工作程序挂掉之后,所有未被应答的消息将会被重新发送.
忘了确认
错过 ack 是一个很常见的错误.它是一个简单的错误,但是后果都会很严重.当你的客户端退出时候消息将会被重新发送(这可能会像是随机地重新发送给),但是如果不能够释放一些未被应答的消息, RabbitMQ 将会占用越来越多的内存.
为了调试这种错误你可以使用 rabbitmqctl 把
messages_unacknowledged
(未被确认的消息)都打印出来:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 平台,省略掉 sudo 即可:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学习了怎样做才能确保即使 Consumer(消费者)挂掉,任务也不会丢失.但是如果是 RabbitMQ 服务停止了,我们的任务也同样会丢失.
当 RabbitMQ 退出或者崩溃,它会把队列与消息统统忘掉,除非你告诉它不要这样做.为了确保消息不会丢失(即使 RabbitMQ 退出或崩溃),我们需要做两件事情:我们需要将 Queue(队列)和消息都标记为持久化的.
首先我们需要确保 RabbitMQ 将永不会丢失我们的 Queue(队列).为了实现这个,我们需要声明它为 durable(持久的).所以我们把(声明队列时候的)第三个参数 queue_declare 置为 true:
$channel->queue_declare('hello', false, true, false, false);
尽管这条命令它本身是正确无误的,但是就目前我们(RabbitMQ)的配置来看,这并不会真正生效.这是因为我们已经定义了一个并不持久化(not durable)的叫做 hello 的 Queue(队列).RabbitMQ 将不会允许你对已经存在的 Queue(队列)重新定义,并且当任何程序尝试这样做的时候会返回一个 error 错误.但这有一个快速的解决方法--让我们重新定义一个不同名字的 Queue(队列),例如 task_queus:
$channel->queue_declare('task_queue', false, true, false, false);
Producer(生产者)和 Consumer(消费者)的代码中这个标记都要被设为 true.
这时候我们就能确保即使 RabbitMQ 重启, task_queue 也不会丢失.现在我们需要通过设置 AMQPMessage 参数数组中的 delivery_mode = 2 这一消息属性来确保我们的消息是持久化的.
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
消息持久化需要注意的地方
标记消息为持久花的并不能完全保重一条消息都不会丢失.尽管它高数了 RabbitMQ 把消息保存到磁盘,在 RabbitMQ 接收了一条消息但并未将它保存(到磁盘)也会由一段很短的时间(可能发生错误).同样 RabbitMQ 不会对所有消息都执行 fsync(2) --它可能只是保存在缓存中而不是真正地写入到磁盘中.这种持久化的保证不是很强大,但对于我们的 Simple Task Queue (简单任务队列) 来说也已经完全足够了.如果你需要一个更加强大持久化,你可以是 Publisher Confirms (发送者确认)
公平调度 (Fair dispatch)
你可能已经注意到目前的调度并不能完全按照我们所希望的进行.例如在有两个处理程序( Consumer(消费者) )的时候,所有奇数的消息都十分繁杂但其他都都很轻松,这样,其中一个处理程序就会一直繁忙,然而另外一个几乎什么都不用做.然而,RabbitMQ 并不知道这种情况,它将会继续均匀地分发所有消息.
导致这种情况的原因是 RabbitMQ 在消息进入 Queue(队列)的时候仅仅是分发这些消息.它(RabbitMQ)并没有留意一个 Consumer(消费者)的未被应答消息数量.它只是盲目地将第 n 条消息发送到第 n 个 Consumer(消费者).
Producer->Queue->Consumers
为了解决这种问题,我们使用 basic_qos 函数并设置该函数的 (第二个参数)prefetch_count = 1.这会告诉 RabbitMQ 不要在同一时间分发不止一条消息到一个处理程序上.换句话说就是,不要在处理程序处理并应答了上一条消息前再分发一个新的消息给它.相反地,它会把这条新的消息分发到下一个并不繁忙的处理程序.
$channel->basic_qos(null, 1, null);
关于 Queue(队列) 的大小
如果所有的处理程序都在繁忙,你的队列将会被填满.你将会想关注这一点,添加更多的处理程序,或者更换一种策略.
让他们一起运行
最后我们的 new_task.php 文件是这样的:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array(
'delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT
)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
(new_task.php 源码)
以及我们的 worker.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
(worker.php 源码)
在这章教程中我们学习到使用消息确认以及 prefetch 你可以设立一个工作队列.持久化配置让让任务在即使 RabbitMQ 重启的情况下也能留存.
来源: http://www.jianshu.com/p/f0cf85b68d3c