- 一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:
- 首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器
- http://www.rabbitmq.com/download.html
- 默认的端口是55672 访问地址http://127.0.0.1:55672/
- 默认帐号密码 guest guest
- 你可以看到rabbitmq 的管理界面
- mq的任务是一个不浪费资源,的一个队列系统!
- php使用需要下载一个amqp扩展
- 或者直接点击下面的地址找到适合自己的版本,下载
- http://pecl.php.net/package/amqp/1.2.0/windows
- rabbitmq.1.dll 放在C盘windows下
- php_amqp.dll 放入php扩展中
- 开启php_amqp.dll的引用
- 重启服务器
- 用phpinfo();
- 查看是否引用成功,如果出现以下的amqp扩展,那就说明成功了
- 首先是rabbitmq的生产者:
- 创建第一个index文件:然后去mq中查看,如果添加一个test001的队列名信息,就说明已经添加进去了,xx22的信息已经在mq中存储!
- 接下来就需要跑数据了。
- createQueue(array('xxx','2222'),'test001');
- echo "ok";
- function createQueue($message,$queueName,$exchangeName = '', $queueKey = '')
- {
- $queueName = self::getQueueName($queueName);
- $conn_args = array('host' =>'localhost', 'port'=> '5672',
- 'login' =>'guest', //mq帐号
- 'password'=> '', //mq密码
- 'vhost' => '/');
- $conn = new AMQPConnection($conn_args);
- $conn->connect();
- $channel = new AMQPChannel($conn);
- if (!$exchangeName) {
- $exchangeName = $queueName;
- }
- $queueName = $queueName;
- if (!$queueKey) {
- $queueKey = $queueName;
- }
- $ex = new AMQPExchange($channel);
- $ex->setName($exchangeName);
- $ex->setType(AMQP_EX_TYPE_TOPIC);
- $ex->setFlags(AMQP_DURABLE); //exchange持久化
- $ex->declareExchange();
- $q = new AMQPQueue($channel);
- $q->setName($queueName);
- $q->setFlags(AMQP_DURABLE); //queue持久化
- $q->declareQueue();
- $q->bind($exchangeName, $queueKey);
- $channel->startTransaction();
- /**
- * 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。
- * 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php
- */
- $result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));
- $channel->commitTransaction();
- $conn->disconnect();
- }
- 有了生产者,那就有消费者。
- 脚本如果没有其他的修改或问题,基本上都是常年启动的:
- 消费者基类:
- class WorkerCommand{
- function qInit($q_name,$e_name='',$k_route=''){
- $q_name = Utils::getQueueName($q_name);
- $conn_args = array(
- 'host' => '127.0.0.1', //mq的配置
- 'port' => '5672',
- 'login' => 'guest',
- 'password' => 'huoxingxing',
- 'vhost' => '/'
- );
- //创建连接和channel
- $conn = new AMQPConnection($conn_args);
- if (!$conn->connect()) {
- die("Cannot connect to the broker!\\n");
- }
- $channel = new AMQPChannel($conn);
- //创建交换机
- $ex = new AMQPExchange($channel);
- if (!$e_name) {
- $e_name = $q_name;
- }
- $ex->setName($e_name);
- $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
- $ex->setFlags(AMQP_DURABLE); //持久化
- // echo "Exchange Status:" . $ex->declareExchange() . "\\n";
- //创建队列
- $q = new AMQPQueue($channel);
- $q->setName($q_name);
- $q->setFlags(AMQP_DURABLE); //持久化
- // echo "Message Total:" . $q->declareExchange() . "\\n";
- if (!$k_route) {
- $k_route = $q_name;
- }
- //绑定交换机与队列,并指定路由键
- // echo 'Queue Bind: ' . $q->declareQueue($e_name, $k_route) . "\\n";
- //阻塞模式接收消息
- echo "Message:\\n";
- while (True) {
- $q->consume(array($this,'processMessage'));
- //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
- }
- $conn->disconnect();
- }
- }
- 消费者:
- class WorkerWareSyncBackUpCommand extends WorkerCommand {
- function actionIndex()
- {
- $this->qInit('SyncWareBackup');
- }
- function processMessage($envelope, $queue)
- {
- $msg = json_decode($envelope->getBody());
- Utils::doBackUp('back',$msg,'');
- $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
- }
- }
- //该片段来自于http://www.codesnippet.cn/detail/0106201512720.html
来源: http://www.codesnippet.cn/detail/0106201512720.html