邮件发送是很常见的需求, 由于发送邮件的操作一般是比较耗时的, 所以我们一般采用异步处理来提升用户体验, 而异步通常我们使用消息队列来实现.
传统 MVC 框架由于缺少多进程开发能力, 通常是采用同一个脚本执行多次, 产生多个进程的方式, mixphp 封装了 TaskExecutor 专用于多进程开发, 用户能非常简单的开发出功能完善的高可用多进程应用.
下面演示一个异步邮件发送系统的开发过程, 涉及知识点:
异步
消息队列
多进程
守护进程
如何使用消息队列实现异步
PHP 使用消息队列通常是使用中间件来实现, 常用的消息中间件有:
- redis
- rabbitmq
- kafka
本次我们选用 redis 来实现异步邮件发送, redis 的数据类型中有一个 list 类型, 可实现消息队列, 使用以下命令:
- // 入列
- $redis->lpush($key, $data);
- // 出列
- $data = $redis->rpop($key);
- // 阻塞出列
- $data = $redis->brpop($key, 10);
架构设计
本实例由传统 MVC 框架投递邮件发送需求, MixPHP 多进程执行发送任务.
邮件发送库选型
以往我们通常使用框架提供的邮件发送库, 或者网上下载别的用户分享的库, https://www.phpcomposer.com/ 出现后, https://packagist.org/ 上有大量优质的库, 我们只需选择一个最好的即可, 本例选择 swiftmailer.
由于发送任务是由 MixPHP 执行, 所以 swiftmailer 是安装在 MixPHP 项目中, 在项目根目录中执行以下命令安装:
composer require swiftmailer/swiftmailer
生产者开发
在邮件发送这个需求中生产者是指投递发送任务的一方, 这一方通常是一个接口或网页, 这个部分并不一定需 mixphp 开发, TP,CI,YII 这些都可以, 只需在接口或网页中把任务信息投递到消息队列中即可.
在传统 MVC 框架的控制器中增加如下代码:
通常框架中使用 redis 会安装一个类库来使用, 本例使用原生代码, 便于理解.
- // 连接
- $redis = new \Redis();
- if (!$redis->connect('127.0.0.1', 6379)) {
- throw new \Exception('Redis Connect Failure');
- }
- $redis->auth('');
- $redis->select(0);
- // 投递任务
- $data = [
- 'to' => ['***@qq.com' => 'A name'],
- 'body' => 'Here is the message itself',
- 'subject' => 'The title content',
- ];
- $redis->lpush('queue:email', serialize($data));
通常异步开发中, 投递完成后就会立即响应一个消息给用户, 当然此时该任务并没有执行.
消费者开发
本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求, 通常使用常驻进程来处理队列的消费, 所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型, MODE_PUSH 模式.
TaskExecutor 的 MODE_PUSH 模式有二种进程:
左进程: 负责从消息队列取出任务数据, 投放给中进程.
中进程: 负责执行邮件发送任务.
PushCommand.php 代码如下:
- <?php
- namespace apps\daemon\commands;
- use mix\console\ExitCode;
- use mix\facades\Input;
- use mix\facades\Redis;
- use mix\task\CenterProcess;
- use mix\task\LeftProcess;
- use mix\task\TaskExecutor;
- /**
- * 推送模式范例
- * @author 刘健 <coder.liu@qq.com>
- */
- class PushCommand extends BaseCommand
- {
- // 配置信息
- const HOST = 'smtpdm.aliyun.com';
- const PORT = 465;
- const SECURITY = 'ssl';
- const USERNAME = '****@email.***.com';
- const PASSWORD = '****';
- // 初始化事件
- public function onInitialize()
- {
- parent::onInitialize(); // TODO: Change the autogenerated stub
- // 获取程序名称
- $this->programName = Input::getCommandName();
- // 设置 pidfile
- $this->pidFile = "/var/run/{$this->programName}.pid";
- }
- /**
- * 获取服务
- * @return TaskExecutor
- */
- public function getTaskService()
- {
- return create_object(
- [
- // 类路径
- 'class' => 'mix\task\TaskExecutor',
- // 服务名称
- 'name' => "mix-daemon: {$this->programName}",
- // 执行类型
- 'type' => \mix\task\TaskExecutor::TYPE_DAEMON,
- // 执行模式
- 'mode' => \mix\task\TaskExecutor::MODE_PUSH,
- // 左进程数
- 'leftProcess' => 1,
- // 中进程数
- 'centerProcess' => 5,
- // 任务超时时间 (秒)
- 'timeout' => 5,
- ]
- );
- }
- // 启动
- public function actionStart()
- {
- // 预处理
- if (!parent::actionStart()) {
- return ExitCode::UNSPECIFIED_ERROR;
- }
- // 启动服务
- $service = $this->getTaskService();
- $service->on('LeftStart', [$this, 'onLeftStart']);
- $service->on('CenterStart', [$this, 'onCenterStart']);
- $service->start();
- // 返回退出码
- return ExitCode::OK;
- }
- // 左进程启动事件回调函数
- public function onLeftStart(LeftProcess $worker)
- {
- try {
- // 模型内使用长连接版本的数据库组件, 这样组件会自动帮你维护连接不断线
- $queueModel = Redis::getInstance();
- // 保持任务执行状态, 循环结束后当前进程会退出, 主进程会重启一个新进程继续执行任务, 这样做是为了避免长时间执行内存溢出
- for ($j = 0; $j <16000; $j++) {
- // 从消息队列中间件阻塞获取一条消息
- $data = $queueModel->brpop('queue:email', 10);
- if (empty($data)) {
- continue;
- }
- list(, $data) = $data;
- // 将消息推送给中进程去处理, push 有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
- $worker->push($data, false);
- }
- } catch (\Exception $e) {
- // 休息一会, 避免 CPU 出现 100%
- sleep(1);
- // 抛出错误
- throw $e;
- }
- }
- // 中进程启动事件回调函数
- public function onCenterStart(CenterProcess $worker)
- {
- // 保持任务执行状态, 循环结束后当前进程会退出, 主进程会重启一个新进程继续执行任务, 这样做是为了避免长时间执行内存溢出
- for ($j = 0; $j <16000; $j++) {
- // 从进程消息队列中抢占一条消息
- $data = $worker->pop();
- if (empty($data)) {
- continue;
- }
- // 处理消息
- try {
- // 处理消息, 比如: 发送短信, 发送邮件, 微信推送
- var_dump($data);
- $ret = self::sendEmail($data);
- var_dump($ret);
- } catch (\Exception $e) {
- // 回退数据到消息队列
- $worker->rollback($data);
- // 休息一会, 避免 CPU 出现 100%
- sleep(1);
- // 抛出错误
- throw $e;
- }
- }
- }
- // 发送邮件
- public static function sendEmail($data)
- {
- // Create the Transport
- $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
- ->setUsername(self::USERNAME)
- ->setPassword(self::PASSWORD);
- // Create the Mailer using your created Transport
- $mailer = new \Swift_Mailer($transport);
- // Create a message
- $message = (new \Swift_Message($data['subject']))
- ->setFrom([self::USERNAME => '** 网'])
- ->setTo($data['to'])
- ->setBody($data['body']);
- // Send the message
- $result = $mailer->send($message);
- return $result;
- }
- }
测试
在 shell 中启动 push 常驻程序.
- [root@localhost bin]# ./mix-daemon push start
- mix-daemon 'push' start successed.
调用接口往消息队列投放任务.
此时 shell 终端将打印:
成功收到测试邮件:
- MixPHP
- GitHub: https://github.com/mixstart/mixphp
官网: http://www.mixphp.cn/
来源: https://juejin.im/entry/5b33362fe51d45588b1dc16b