前面几节都是讲解 pcntl 扩展实现的多进程程序. 本节给大家介绍 swoole 扩展的 swoole_process 模块.
swoole 多进程
swoole_process 是 swoole 提供的进程管理模块, 用来替代 PHP 的 pcntl 扩展.
首先, 确保安装的 swoole 版本大于 1.7.2:
- $ php --ri swoole
- swoole
- swoole support => enabled
- Version => 1.10.1
注意: swoole_process 在最新的 1.8.0 版本已经禁止在 web 环境中使用了, 所以也只能支持命令行.
swoole 提供的多进程扩展基本功能和 pcntl 提供的一样, 但 swoole 更易简单上手, 并且提供了:
默认基于 unixsock 的进程间通信;
支持消息队列作为进程间通信;
基于 signalfd 和 eventloop 处理信号, 几乎没有任何额外消耗;
高精度微秒定时器;
配合 swoole_event 模块, 创建的 PHP 子进程可以异步的事件驱动模式
swoole_process 模块提供的方法 (Method) 主要分为四部分:
基础方法
- swoole_process::__construct
- swoole_process->start
- swoole_process->name
- swoole_process->exec
- swoole_process->close
- swoole_process->exit
- swoole_process::kill
- swoole_process::wait
- swoole_process::daemon
- swoole_process::setAffinity
管道通信
- swoole_process->write
- swoole_process->read
- swoole_process->setTimeout
- swoole_process->setBlocking
消息队列通信
- swoole_process->useQueue
- swoole_process->statQueue
- swoole_process->freeQueue
- swoole_process->push
- swoole_process->pop
信号与定时器
- swoole_process::signal
- swoole_process::alarm
基础应用
本例实现的是 tcp server, 特性:
多进程处理客户端连接
子进程退出, Master 进程会重新创建一个
支持事件回调
主进程退出, 子进程在干完手头活后退出
- <?php
- class TcpServer{
- const MAX_PROCESS = 3;// 最大进程数
- private $pids = []; // 存储子进程 pid
- private $socket;
- private $mpid;
- public function run(){
- $process = new swoole_process(function(){
- $this->mpid = $id = getmypid();
- echo time()."Master process, pid {$id}\n";
- // 创建 tcp server
- $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
- if(!$this->socket) exit("start server err: $errstr --- $errno");
- for($i=0; $i<self::MAX_PROCESS;$i++){
- $this->start_worker_process();
- }
- echo "waiting client...\n";
- //Master 进程等待子进程退出, 必须是死循环
- while(1){
- foreach($this->pids as $k=>$pid){
- if($pid){
- $res = swoole_process::wait();
- if ( $res ){
- echo time()."Worker process $pid exit, will start new... \n";
- $this->start_worker_process();
- unset($this->pids[$k]);
- }
- }
- }
- sleep(1);// 让出 1s 时间给 CPU
- }
- }, false, false); // 不启用管道通信
- swoole_process::daemon(); // 守护进程
- $process->start();// 注意: start 之后的变量子进程里面是获取不到的
- }
- /**
- * 创建 worker 进程, 接受客户端连接
- */
- private function start_worker_process(){
- $process = new swoole_process(function(swoole_process $worker){
- $this->acceptClient($worker);
- }, false, false);
- $pid = $process->start();
- $this->pids[] = $pid;
- }
- private function acceptClient(&$worker)
- {
- // 子进程一直等待客户端连接, 不能退出
- while(1){
- $conn = stream_socket_accept($this->socket, -1);
- if($this->onConnect) call_user_func($this->onConnect, $conn); // 回调连接事件
- // 开始循环读取消息
- $recv = ''; // 实际收到消息
- $buffer = ''; // 缓冲消息
- while(1){
- $this->checkMpid($worker);
- $buffer = fread($conn, 20);
- // 没有收到正常消息
- if($buffer === false || $buffer === ''){
- if($this->onClose) call_user_func($this->onClose, $conn); // 回调断开连接事件
- break;// 结束读取消息, 等待下一个客户端连接
- }
- $pos = strpos($buffer, "\n"); // 消息结束符
- if($pos === false){
- $recv .= $buffer;
- }else{
- $recv .= trim(substr($buffer, 0, $pos+1));
- if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); // 回调收到消息事件
- // 客户端强制关闭连接
- if($recv == "quit"){
- echo "client close conn\n";
- fclose($conn);
- break;
- }
- $recv = ''; // 清空消息, 准备下一次接收
- }
- }
- }
- }
- // 检查主进程是否存在, 若不存在子进程在干完手头活后退出
- public function checkMpid(&$worker){
- if(!swoole_process::kill($this->mpid,0)){
- $worker->exit();
- // 这句提示, 实际是看不到的. 需要写到日志中
- echo "Master process exited, I [{$worker['pid']}] also quit\n";
- }
- }
- function __destruct() {
- @fclose($this->socket);
- }
- }
- $server = new TcpServer();
- $server->onConnect = function($conn){
- echo "onConnect -- accepted" . stream_socket_get_name($conn,true) . "\n";
- fwrite($conn,"conn success\n");
- };
- $server->onMessage = function($conn,$msg){
- echo "onMessage --" . $msg . "\n";
- fwrite($conn,"received".$msg."\n");
- };
- $server->onClose = function($conn){
- echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
- fwrite($conn,"onClose"."\n");
- };
- $server->run();
运行后可以使用 telnet 连接:
telnet 127.0.0.1 9201
由于设置了最大三个子进程, 最多只能接受 3 个客户端连接.
进程间通信
前面讲解的例子里, 主进程和子进程直接是没有直接的数据交互的. 如果主进程需要得到的来自子进程的反馈, 或者子进程接受来自主进程的数据, 那么就需要进程间通信了.
swoole 内置了管道通信和消息队列通信.
管道通信
管道通信主要是数据传输: 一个进程需要将数据发送给另外一个进程.
这个 swoole 封装后, 使用非常简单:
- <?php
- $workers = [];
- for ($i=0; $i<3; $i++) {
- $process = new swoole_process(function(swoole_process $worker){
- // 子进程逻辑
- $cmd = $worker->read();
- ob_start();
- passthru($cmd);// 执行外部程序并且显示未经处理的, 原始输出, 会直接打印输出.
- $return = ob_get_clean() ? : ' ';
- $return = trim($return).". worker pid:".$worker->pid."\n";
- // $worker->write($return);// 写入数据到管道
- echo $return;// 写入数据到管道. 注意: 子进程里 echo 也是写入到管道
- }, true); // 第二个参数为 true, 启用管道通信
- $pid = $process->start();
- $workers[$pid] = $process;
- }
- foreach($workers as $pid=>$worker){
- $worker->write('whoami'); // 通过管道发数据到子进程. 管道是单向的: 发出的数据必须由另一端读取. 不能读取自己发出去的
- $recv = $worker->read();// 同步阻塞读取管道数据
- echo "recv result: $recv";
- }
- // 回收子进程
- while(count($workers)){
- foreach($workers as $pid=>$worker){
- $ret = swoole_process::wait();
- if($ret){
- echo "worker exit: $pid\n";
- unset($workers[$pid]);
- }
- }
- }
运行:
- $ php swoole_process_pipe.php
- recv result: Linux
recv result: 2018 年 06 月 24 日 星期日 16:18:01 CST
- recv result: yjc
- worker exit: 14519
- worker exit: 14522
- worker exit: 14525
注意点:
1, 管道数据读取是同步阻塞的; 上面的例子里如果子进程里再加一句 $worker->read(), 会一直阻塞. 可以使用 swoole_event_add 将管道加入到事件循环中, 变为异步模式.
2, 子进程里的输出 (例如 echo) 与 write 效果相同.
3, 通过管道发数据到子进程. 管道是单向的: 发出的数据必须由另一端读取. 不能读取自己发出去的.
防盗版声明: 本文系原创文章, 发布于公众号飞鸿影的博客 (fhyblog) 及博客园, 转载需作者同意.
消息队列通信
消息队列与管道有些不一样: 消息队列是全局的, 所有进程都可以发送, 读取. 你可以把它看做 redis list 结构.
消息队列更常见的用途是主进程分配任务, 子进程消费执行.
- <?php
- $workers = [];
- for ($i=0; $i<3; $i++) {
- $process = new swoole_process(function(swoole_process $worker){
- // 子进程逻辑
- sleep(1); // 防止父进程还未往消息队列中加入内容直接退出
- while($cmd = $worker->pop()){
- // echo "recv from master: $cmd\n";
- ob_start();
- passthru($cmd);// 执行外部程序并且显示未经处理的, 原始输出, 会直接打印输出.
- $return = ob_get_clean() ? : ' ';
- $return = "res:".trim($return).". worker pid:".$worker->pid."\n";
- echo $return;
- // sleep(1);
- }
- $worker->exit(0);
- }, false, false); // 不创建管道
- $process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); // 使用消息队列
- $pid = $process->start();
- $workers[$pid] = $process;
- }
- // 由于所有进程是共享使用一个消息队列, 所以只需向一个子进程发送消息即可
- $worker = current($workers);
- for ($i=0; $i<3; $i++) {
- $worker->push('whoami'); // 发送消息
- }
- // 回收子进程
- while(count($workers)){
- foreach($workers as $pid=>$worker){
- $ret = swoole_process::wait();
- if($ret){
- echo "worker exit: $pid\n";
- unset($workers[$pid]);
- }
- }
- }
运行结果:
- $ php swoole_process_quene.php
- res: yjc. worker pid: 15885
- res: yjc. worker pid: 15886
- res: yjc. worker pid: 15887
- worker exit: 15885
- worker exit: 15886
- worker exit: 15887
注意点:
1, 所有进程共享使用一个消息队列;
2, 消息队列的读取操作是阻塞的, 可以在 useQueue 的时候第 2 个参数 mode 改为
2 | swoole_process::IPC_NOWAIT
, 则是异步的. mode 仅仅设置为 2 是阻塞的, 示例里去掉
swoole_process::IPC_NOWAIT
后读取消息的 while 会死循环.
3, 子进程前面加了个 sleep(1);, 这是为了防止父进程还未往消息队列中加入内容直接退出.
4, 子进程末尾也加了 sleep, 这是为了防止一个进程把所有消息都消费完了, 实际应用需要去掉.
信号与定时器
swoole_process::alarm
支持微秒定时器:
- <?php
- function ev_timer(){
- static $i = 0;
- echo "#{$i}\talarm\n";
- $i++;
- if ($i> 5) {
- // 清除定时器
- swoole_process::alarm(-1);
- // 退出进程
- swoole_process::kill(getmypid());
- }
- }
- // 安装信号
- swoole_process::signal(SIGALRM, 'ev_timer');
- // 触发定时器信号: 单位为微秒. 如果为负数表示清除定时器
- swoole_process::alarm(100 * 1000);//100ms
- echo getmypid()."\n"; // 该句会顺序执行, 后续无需使用 while 循环防止进程直接退出
运行:
- $ php swoole_process_alarm.php
- 13660
- #0 alarm
- #1 alarm
- #2 alarm
- #3 alarm
- #4 alarm
- #5 alarm
注: alarm 不能和 Swoole\Timer 同时使用.
参考
1,Process-Swoole-Swoole 文档中心
https://wiki.swoole.com/wiki/page/p-process.html
推广时间
最近在 GitChat 开了一期 Chat , 主题是PHP Socket 编程进阶指南, 主要和大家讲解 PHP Socket 编程相关知识. 通过参与本场 Chat, 您将学到如下内容:
熟悉 Socket 系列函数.
熟悉 stream_socket 系列函数.
如何使用 Socket 系列函数实现 TCP 服务端和客户端?
如何使用 socket_select 实现 I/O 多路复用?
如何使用 stream_socket 系列函数实现 TCP 服务端和客户端?
如何使用 stream_select 实现 I/O 多路复用?
来源: https://www.cnblogs.com/52fhy/p/9227023.html