本文基于 Swoole-4.3.2 和 PHP-7.1.0 版本
Swoole 协程简介
Swoole4 为 PHP 语言提供了强大的 CSP 协程编程模式, 用户可以通过 go 函数创建一个协程, 以达到并发执行的效果, 如下面代码所示:
- <?PHP
- //Co::sleep()是 Swoole 提供的 API, 并不会阻塞当前进程, 只会阻塞协程触发协程切换.
- go(function (){
- Co::sleep(1);
- echo "a";
- });
- go(function (){
- Co::sleep(2);
- echo "b";
- });
- echo "c";
- // 输出结果: cab
- // 程序总执行时间 2 秒
其实在 Swoole4 之前就实现了多协程编程模式, 在协程创建, 切换以及结束的时候, 相应的操作 PHP 栈即可(创建, 切换以及回收 PHP 栈).
此时的协程实现无法完美的支持 PHP 语法, 其根本原因在于没有保存 c 栈信息.(vm 内部或者某些扩展提供的 API 是通过 c 函数实现的, 调用这些函数时如果发生协程切换, c 栈该如何处理?)
Swoole4 新增了 c 栈的管理, 在协程创建, 切换以及结束的同时会伴随着 c 栈的创建, 切换以及回收.
Swoole4 协程实现方案如下图所示:
其中:
API 层是提供给用户使用的协程相关函数, 比如 go()函数用于创建协程; Co::yield()使得当前协程让出 CPU;Co::resume()可恢复某个协程执行;
Swoole4 协程需要同时管理 c 栈与 PHP 栈, Coroutine 用于管理 c 栈, PHPCoroutine 用于管理 PHP 栈; 其中 Coroutine(),yield(),resume()实现了 c 栈的创建以及换入换出; create_func(),on_yield(),on_resume()实现了 PHP 栈的创建以及换入换出;
Swoole4 在管理 c 栈时, 用到了 boost.context 库, make_fcontext()和 jump_fcontext()函数均使用汇编语言编写, 实现了 c 栈上下文的创建以及切换;
Swoole4 对 boost.context 进行了简单封装, 即 Context 层, Context(),SwapIn()以及 SwapOut()
对应 c 栈的创建以及换入换出.
深入理解 C 栈
函数是对代码的封装, 对外暴露的只是一组指定的参数和一个可选的返回值; 假设函数 P 调用函数 Q,Q 执行后返回函数 P, 实现该函数调用需要考虑以下三点:
指令跳转: 进入函数 Q 的时候, 程序计数器必须被设置为 Q 的代码的起始地址; 在返回时, 程序计数器需要设置为 P 中调用 Q 后面那条指令的地址;
数据传递: P 能够向 Q 提供一个或多个参数, Q 能够向 P 返回一个值;
内存分配与释放: Q 开始执行时, 可能需要为局部变量分配内存空间, 而在返回前, 又需要释放这些内存空间;
大多数语言的函数调用都采用了栈结构实现, 函数的调用与返回即对应的是一系列的入栈与出栈操作, 我们通常称之为函数栈帧(stack frame). 示意图如下:
上面提到的程序计数器即寄存器 %rip, 另外还有两个寄存器需要重点关注:%rbp 指向栈帧底部,%rsp 指向栈帧顶部.
下面将通过具体的代码事例, 为读者讲解函数栈帧. c 代码与汇编代码如下:
- int add(int x, int y)
- {
- int a, b;
- a = 10;
- b = 5;
- return x+y;
- }
- int main()
- {
- int sum = add(1,2);
- }
- main:
- pushq %rbp
- movq %rsp, %rbp
- subq $16, %rsp
- movl $2, %esi
- movl $1, %edi
- call add
- movl %eax, -4(%rbp)
- leave
- ret
- add:
- pushq %rbp
- movq %rsp, %rbp
- movl %edi, -20(%rbp)
- movl %esi, -24(%rbp)
- movl $10, -4(%rbp)
- movl $5, -8(%rbp)
- movl -24(%rbp), %eax
- movl -20(%rbp), %edx
- addl %edx, %eax
- popq %rbp
- ret
分析汇编代码:
main 函数与 add 函数入口, 首先将寄存器 %rbp 压入栈中用于保存其值, 其次移动 %rbp 指向当前栈顶部(此时 %rbp,%rsp 都指向栈顶, 开始新的函数栈帧);
main 函数 "subq $16, %rsp", 是为 main 函数栈帧预留 16 个字节的内存空间;
调用 add 函数时, 第一个参数和第二个参数分别保存在寄存器 %edi 和 %esi, 返回值保存在寄存器 %eax;
call 指令用于函数调用, 实现了两个功能: 寄存器 %rip 压入栈中, 跳转到新的代码位置;
ret 指令用于函数返回, 弹出栈顶内容到寄存器 %rip, 依次实现代码跳转;
leave 指令等同于两条指令: movq %rsp,%rbp 和 popq %rbp, 用于释放 main 函数栈帧, 恢复前一个函数栈帧;
注意 add 函数栈帧, 并没有为其分配空间, 寄存器 %rsp 和 %rbp 都指向栈帧底部; 根本因为是 add 函数没有调用其他函数.
该程序的栈结构示意图如下:
问题: 观察上面的汇编代码, 输入参数分别使用的是寄存器 %edi 和 %esi, 返回值使用的是寄存器 %eax, 输入输出参数不应该保存在栈上吗? 寄存器比内存访问要快的多, 现代处理器寄存器数目也比较多, 因此倾向于将参数优先保存在寄存器. 比如 %rdi, %rsi, %rdx, %rcx, %r8d, %r9d 六个寄存器用于存储函数调用时的前 6 个参数, 那么当输入参数数目超过 6 个时, 如何处理? 这些输入参数只能存储在栈上了.
- (%rdi 等表示 64 位寄存器,%edi 等表示 32 位寄存器)
- //add 函数需要 9 个参数
- add(1,2,3,4,5,6,7,8,9);
- // 参数 7,8,9 存储在栈上
- movl $9, 16(%rsp)
- movl $8, 8(%rsp)
- movl $7, (%rsp)
- movl $6, %r9d
- movl $5, %r8d
- movl $4, %ecx
- movl $3, %edx
- movl $2, %esi
- movl $1, %edi
Swoole C 栈管理
通过学习 c 栈基本知识, 我们知道最主要有三个寄存器:%rip 程序计数器指向下一条需要执行的指令,%rbp 指向函数栈帧底部,%rsp 指向函数栈帧顶部. 这三个寄存器可以确定一个 c 栈执行上下文, c 栈的管理其实就是这些寄存器的管理.
第一节我们提到 Swoole 在管理 c 栈时, 用到了 boost.context 库, 其中 make_fcontext()和 jump_fcontext()函数均使用汇编语言编写, 实现了 c 栈执行上下文的创建以及切换; 函声明命如下:
- fcontext_t make_fcontext(void *sp, size_t size, void (*fn)(intptr_t));
- intptr_t jump_fcontext(fcontext_t *ofc, fcontext_t nfc, intptr_t vp, bool preserve_fpu = false);
make_fcontext 函数用于创建一个执行上下文, 其中参数 sp 指向内存最高地址处(在堆中分配一块内存作为该执行上下文的 c 栈), 参数 size 为栈大小, 参数 fn 是一个函数指针, 指向该执行上下文的入口函数; 代码主要逻辑如下:
- /*%rdi 表示第一个参数 sp, 指向栈顶 */
- movq %rdi, %rax
- // 保证 %rax 指向的地址按照 16 字节对齐
- andq $-16, %rax
- // 将 %rax 向低地址处偏移 0x48 字节
- leaq -0x48(%rax), %rax
- /* %rdx 表示第三个参数 fn, 保存在 %rax 偏移 0x38 位置处 */
- movq %rdx, 0x38(%rax)
- stmxcsr (%rax)
- fnstcw 0x4(%rax)
- leaq finish(%rip), %rcx
- movq %rcx, 0x40(%rax)
- // 返回值保存在 %rax 寄存器
- ret
make_fcontext 函数创建的执行上下文示意图如下(可以看到预留了若干字节用于保存上下文信息):
Swoole 协程实现的 Context 层封装了上下文的创建, 创建上下文函数实现如下:
- Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) :
- fn_(fn), stack_size_(stack_size), private_data_(private_data)
- {
- stack_ = (char*) sw_malloc(stack_size_);
- void* sp = (void*) ((char*) stack_ + stack_size_);
- ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func);
- }
可以看到 c 栈执行上下文是通过 sw_malloc 函数在堆上分配的一块内存, 默认大小为 2M 字节; 参数 sp 指向的是内存最高地址处; 执行上下文的入口函数为 Context::context_func().
jump_fcontext 函数用于切换 c 栈上下文: 1)函数会将当前上下文 (寄存器) 保存在当前栈顶 (push), 同时将 %rsp 寄存器内容保存在 ofc 地址; 2) 函数从 nfc 地址处恢复 %rsp 寄存器内容, 同时从栈顶恢复上下文信息(pop). 代码主要逻辑如下:
- //------------------- 保存当前 c 栈上下文 -------------------
- pushq %rbp /* save RBP */
- pushq %rbx /* save RBX */
- pushq %r15 /* save R15 */
- pushq %r14 /* save R14 */
- pushq %r13 /* save R13 */
- pushq %r12 /* save R12 */
- leaq -0x8(%rsp), %rsp
- stmxcsr (%rsp)
- fnstcw 0x4(%rsp)
- //%rdi 表示第一个参数, 即 ofc, 保存 %rsp 到 ofc 地址处
- movq %rsp, (%rdi)
- //------------------- 从 nfc 中恢复上下文 -------------------
- //%rsi 表示第二个参数, 即 nfc, 从 nfc 地址处恢复 %rsp
- movq %rsi, %rsp
- ldmxcsr (%rsp)
- fldcw 0x4(%rsp)
- leaq 0x8(%rsp), %rsp
- popq %r12 /* restrore R12 */
- popq %r13 /* restrore R13 */
- popq %r14 /* restrore R14 */
- popq %r15 /* restrore R15 */
- popq %rbx /* restrore RBX */
- popq %rbp /* restrore RBP */
- // 这里弹出的其实是之前保存的 %rip
- popq %r8
- //%rdx 表示第三个参数,%rax 用于存储函数返回值;
- movq %rdx, %rax
- //%rdi 用于存储第一个参数
- movq %rdx, %rdi
- // 跳转到 %r8 指向的地址
- jmp *%r8
观察 jump_fcontext 函数的汇编代码, 可以看到保存上下文与恢复上下文的代码基本是对称的. 恢复上下文时 "popq %r8" 用于弹出上一次保存的程序计数器 %rip 的内容, 然而并没有看到保存寄存器 %rip 的代码. 这是因为调用 jump_fcontext 函数时, 底层 call 指令已经将 %rip 入栈了.
Swoole 协程实现的 Context 层封装了上下文的换入换出, 可以在上下文 swap_ctx_和 ctx_之间随时换入换出, 代码实现如下:
- bool Context::SwapIn()
- {
- jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
- return true;
- }
- bool Context::SwapOut()
- {
- jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true);
- return true;
- }
上下文示意图如下所示:
Swoole PHP 栈管理
PHP 代码在执行时, 同样存在函数栈帧的分配与回收. PHP 将此抽象为两个结构, PHP 栈 zend_vm_stack, 与执行数据(函数栈帧)zend_execute_data.
PHP 栈结构与 c 栈结构基本类似, 定义如下:
- struct _zend_vm_stack {
- zval *top;
- zval *end;
- zend_vm_stack prev;
- };
其中 top 字段指向栈顶位置, end 字段指向栈底位置; prev 指向上一个栈, 形成链表, 当栈空间不够时, 可以进行扩容. PHP 虚拟机申请栈空间时默认大小为 256K,Swoole 创建栈空间时默认大小为 8K.
执行数据结构体, 我们需要重点关注这几个字段: 当前函数编译后的指令集(opline 指向指令集数组中的某一个元素, 虚拟机只需要遍历该数组并执行所有指令即可), 函数返回值, 以及调用该函数的执行数据; 结构定义如下:
- struct _zend_execute_data {
- // 当前执行指令
- const zend_op *opline;
- zend_execute_data *call;
- // 函数返回值
- zval *return_value;
- zend_function *func;
- zval This; /* this + call_info + num_args */
- // 调用当前函数的栈帧
- zend_execute_data *prev_execute_data;
- // 符号表
- zend_array *symbol_table;
- #if ZEND_EX_USE_RUN_TIME_CACHE
- void **run_time_cache;
- #endif
- #if ZEND_EX_USE_LITERALS
- // 常量数组
- zval *literals;
- #endif
- };
PHP 栈初始化函数为 zend_vm_stack_init; 当执行用户函数调用时, 虚拟机通过函数 zend_vm_stack_push_call_frame 在 PHP 栈上分配新的执行数据, 并执行该函数代码; 函数执行完成后, 释放该执行数据. 代码逻辑如下:
- ZEND_API void zend_execute(zend_op_array *op_array, zval *return_value)
- {
- // 分配新的执行数据
- execute_data = zend_vm_stack_push_call_frame(ZEND_CALL_TOP_CODE | ZEND_CALL_HAS_SYMBOL_TABLE,
- (zend_function*)op_array, 0, zend_get_called_scope(EG(current_execute_data)), zend_get_this_object(EG(current_execute_data)));
- // 设置 prev
- execute_data->prev_execute_data = EG(current_execute_data);
- // 初始化当前执行数据, op_array 即为当前函数编译得到的指令集
- i_init_execute_data(execute_data, op_array, return_value);
- // 执行函数代码
- zend_execute_ex(execute_data);
- // 释放执行数据
- zend_vm_stack_free_call_frame(execute_data);
- }
PHP 栈帧结构示意图如下:
Swoole 协程实现, 需要自己管理 PHP 栈, 在发生协程创建以及切换时, 对应的创建新的 PHP 栈, 切换 PHP 栈, 同时保存和恢复 PHP 栈上下文信息. 这里涉及到一个很重要的结构体 php_coro_task:
- struct php_coro_task
- {
- zval *vm_stack_top;
- zval *vm_stack_end;
- zend_vm_stack vm_stack;
- zend_execute_data *execute_data;
- };
这里列出了 php_coro_task 结构体的若干关键字段, 这些字段用于保存和恢复 PHP 上下文信息.
协程创建时, 底层通过函数 PHPCoroutine::create_func 实现了 PHP 栈的创建:
- void PHPCoroutine::create_func(void *arg)
- {
- // 创建并初始化 PHP 栈
- vm_stack_init();
- call = (zend_execute_data *) (EG(vm_stack_top));
- // 为结构 php_coro_task 分配空间
- task = (php_coro_task *) EG(vm_stack_top);
- EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval));
- // 创建新的执行数据结构
- call = zend_vm_stack_push_call_frame(
- ZEND_CALL_TOP_FUNCTION | ZEND_CALL_ALLOCATED,
- func, argc, fci_cache.called_scope, fci_cache.object
- );
- }
从代码中可以看到结构 php_coro_task 是直接存储在 PHP 栈的底部.
当通过 yield 函数让出 CPU 时, 底层会调用函数 PHPCoroutine::on_yield 切换 PHP 栈:
- void PHPCoroutine::on_yield(void *arg)
- {
- php_coro_task *task = (php_coro_task *) arg;
- php_coro_task *origin_task = get_origin_task(task);
- // 保存当前 PHP 栈上下文信息到 php_coro_task 结构
- save_task(task);
- // 从 php_coro_task 结构中恢复 PHP 栈上下文信息
- restore_task(origin_task);
- }
Swoole 协程实现
前面我们简单介绍了 Swoole 协程的实现方案, 以及 Swoole 对 c 栈与 PHP 栈的管理, 接下来将结合前面的知识, 系统性的介绍 Swoole 协程的实现原理.
swoole 协程数据模型
话不多说, 先看一张图:
每个协程都需要管理自己的 c 栈与 PHP 栈;
Context 封装了 c 栈的管理操作; ctx_字段保存的是寄存器 %rsp 的内容 (指向 c 栈栈顶位置);swap_ctx_字段保存的是将被换出的协程寄存器 %rsp 内容(即, 将被换出的协程的 c 栈栈顶位置);SwapIn() 对应协程换入操作; SwapOut()对应协程换出操作;
参考 jump_fcontext 实现, 协程在换出时, 会将寄存器 %rip,%rbp 等暂存在 c 栈栈顶; 协程在换入时, 相应的会从栈顶恢复这些寄存器的内容;
Coroutine 管理着协程所有内容; cid 字段表示当前协程的 ID;task 字段指向当前协程的 php_coro_task 结构, 该结构中保存的是当前协程的 PHP 栈信息 (vm_stack_top,execute_data 等);ctx 字段指向的是当前协程的 Context 对象; origin 字段指向的是另一个协程 Coroutine 对象; yield() 和 resume()对应的是协程的换出换入操作;
注意到 php_coro_task 结构的 co 字段指向其对应的协程对象 Coroutine;
Coroutine 还有一些静态属性, 静态属性的属于类属性, 所有协程共享的; last_cid 字段存储的是当前最大的协程 ID, 创建协程时可用于生成协程 ID;current 字段指向的是当前正在运行的协程 Coroutine 对象; on_yield 和 on_resume 是两个函数指针, 用于实现 PHP 栈的切换操作, 实际指向的是方法 PHPCoroutine::on_yield 和 PHPCoroutine::on_resume;
swoole 协程实现
协程创建
Swoole 创建协程可以使用 go()函数, 底层实现对应的是 PHP_FUNCTION(swoole_coroutine_create), 其函数实现如下:
- PHP_FUNCTION(swoole_coroutine_create)
- {
- ......
- long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
- }
- long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
- {
- ......
- save_task(get_task());
- return Coroutine::create(create_func, (void*) &php_coro_args);
- }
- class Coroutine
- {
- public:
- static inline long create(coroutine_func_t fn, void* args = nullptr)
- {
- return (new Coroutine(fn, args))->run();
- }
- }
注意 Coroutine::create 函数第一个参数伟 create_func, 该函数后续用于创建 PHP 栈, 并开始协程代码的执行;
可以看到 PHPCoroutine::create 在调用 Coroutine::create 创建创建协程之前, 保存了当前 PHP 栈信息到 php_coro_task 结构中.
注意主程序的 PHP 栈是虚拟机创建的, 结构与上面画的协程 PHP 栈不同, 主程序的 php_coro_task 结构并没有存储在 PHP 栈上, 而是一个静态变量 PHPCoroutine::main_task, 从 get_task 方法可以看出, 主程序中 get_current_task()返回的是 null, 因此最后获得的 php_coro_task 结构是 PHPCoroutine::main_task.
- class PHPCoroutine
- {
- public:
- static inline php_coro_task* get_task()
- {
- php_coro_task *task = (php_coro_task *) Coroutine::get_current_task();
- return task ? task : &main_task;
- }
- }
在 Coroutine 的构造函数中完成了协程对象 Coroutine 的创建与初始化, 以及 Context 对象的创建与初始化 (创建了 c 栈);run() 函数执行了协程的换入, 从而开始协程的运行;
- // 全局协程 map
- std::unordered_map<long, Coroutine*> Coroutine::coroutines;
- class Coroutine
- {
- protected:
- Coroutine(coroutine_func_t fn, void *private_data) :
- ctx(stack_size, fn, private_data)
- {
- cid = ++last_cid;
- coroutines[cid] = this;
- }
- inline long run()
- {
- long cid = this->cid;
- origin = current;
- current = this;
- ctx.SwapIn();
- if (ctx.end)
- {
- close();
- }
- return cid;
- }
- }
可以看到创建协程对象 Coroutine 时, 通过 last_cid 来计算当前协程的 ID, 同时将该协程对象加入到全局 map 中; 代码 ctx(stack_size, fn, private_data)创建并初始化了 Context 对象;
run()函数将该协程换入执行时, 赋值 origin 为当前协程 (主程序中 current 为 null), 同时设置 current 为当前协程对象 Coroutine; 调用 SwapIn() 函数完成协程的换入执行; 最后如果协程执行完毕, 则关闭并释放该协程对象 Coroutine;
初始化 Context 对象时, 可以看到其构造函数 Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data), 其中参数 fn 为协程入口函数(PHPCoroutine::create_func), 可以看到其赋值给 ontext 对象的字段 fn_, 但是在创建 c 栈上下文时, 其传入的入口函数为 context_func;
- Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) :
- fn_(fn), stack_size_(stack_size), private_data_(private_data)
- {
- ......
- ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func);
- }
函数 context_func 内部其实调用的就是方法 PHPCoroutine::create_func; 当协程执行结束时, 会标记 end 字段为 true, 同时将该协程换出;
- void Context::context_func(void *arg)
- {
- Context *_this = (Context *) arg;
- _this->fn_(_this->private_data_);
- _this->end = true;
- _this->SwapOut();
- }
问题: 参数 arg 为什么是 Context 对象呢, 是如何传递的呢? 这就涉及到 jump_fcontext 汇编实现, 以及 jump_fcontext 的调用了
- jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
- jump_fcontext:
- movq %rdx, %rdi
调用 jump_fcontext 函数时, 第三个参数传递的是 this, 即当前 Context 对象; 而函数 jump_fcontext 汇编实现时, 将第三个参数的内容拷贝到 %rdi 寄存器中, 当协程换入执行函数 context_func 时, 寄存器 %rdi 存储的就是第一个参数, 即 Context 对象.
方法 PHPCoroutine::create_func 就是创建并初始化 PHP 栈, 执行协程代码; 这里不做过多介绍.
问题: Coroutine 的静态属性 on_yield 和 on_resume 时什么时候赋值的?
在 Swoole 模块初始化时, 会调用函数 swoole_coroutine_util_init(该函数同时声明了 "Co" 等短名称), 该函数进一步的调用 PHPCoroutine::init()方法, 该方法完成了静态属性的赋值操作.
- void PHPCoroutine::init()
- {
- Coroutine::set_on_yield(on_yield);
- Coroutine::set_on_resume(on_resume);
- Coroutine::set_on_close(on_close);
- }
协程切换
用户可以通过 Co::yield()和 Co::resume()实现协程的让出和恢复,
Co::yield()的底层实现函数为 PHP_METHOD(swoole_coroutine_util, yield),Co::resume()的底层实现函数为 PHP_METHOD(swoole_coroutine_util, resume). 本节将为读者讲述协程切换的实现原理.
- static unordered_map<int, Coroutine *> user_yield_coros;
- static PHP_METHOD(swoole_coroutine_util, yield)
- {
- Coroutine* co = Coroutine::get_current_safe();
- user_yield_coros[co->get_cid()] = co;
- co->yield();
- RETURN_TRUE;
- }
- static PHP_METHOD(swoole_coroutine_util, resume)
- {
- ......
- auto coroutine_iterator = user_yield_coros.find(cid);
- if (coroutine_iterator == user_yield_coros.end())
- {
- swoole_php_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation");
- RETURN_FALSE;
- }
- user_yield_coros.erase(cid);
- co->resume();
- }
调用 Co::resume()恢复某个协程之前, 该协程必然已经调用 Co::yield()让出 CPU; 因此在 Co::yield()时, 会将该协程对象添加到全局 map 中; Co::resume()时做相应校验, 如果校验通过则恢复协程, 并从 map 种删除该协程对象;
co->yield()实现了协程的让出操作; 1)设置协程状态为 SW_CORO_WAITING;2)回调 on_yield 方法, 即 PHPCoroutine::on_yield, 保存当前协程 (task 代表协程) 的 PHP 栈上下文, 恢复另一个协程的 PHP 栈上下文 (origin 代表另一个协程对象);3) 设置当前协程对象为 origin;4)换出该协程;
- void Coroutine::yield()
- {
- state = SW_CORO_WAITING;
- if (on_yield)
- {
- on_yield(task);
- }
- current = origin;
- ctx.SwapOut();
- }
co->resume()实现了协程的恢复操作: 1)设置协程状态为 SW_CORO_RUNNING;2)回调 on_resume 方法, 即 PHPCoroutine::on_resume, 保存当前协程 (current 协程) 的 PHP 栈上下文, 恢复另一个协程 (task 代表协程) 的 PHP 栈上下文; 3)设置 origin 为当前协程对象, current 为即将要换入的协程对象; 4)换入协程;
- void Coroutine::resume()
- {
- state = SW_CORO_RUNNING;
- if (on_resume)
- {
- on_resume(task);
- }
- origin = current;
- current = this;
- ctx.SwapIn();
- if (ctx.end)
- {
- close();
- }
- }
Swoole 协程有四种状态: 初始化, 运行中, 等待运行, 运行结束; 定义如下:
- typedef enum
- {
- SW_CORO_INIT = 0,
- SW_CORO_WAITING,
- SW_CORO_RUNNING,
- SW_CORO_END,
- } sw_coro_state;
协程之间可以通过 Coroutine 对象的 origin 字段形成一个类似链表的结构; Co::yield()换出当前协程时, 会换入 origin 协程; 在 A 协程种调用 Co::resume()恢复 B 协程时, 会换出 A 协程, 换入 B 协程, 同时标记 A 协程为 B 的 origin 协程;
协程切换过程比较简单, 这里不做过多详述.
协程调度
当我们调用 Co::sleep()让协程休眠时, 会换出当前协程; 或者调用 CoroutineSocket->recv()从 socket 接收数据, 但 socket 数据还没有准备好时, 会阻塞当前协程, 从而使得协程换出. 那么问题来了, 什么时候再换入执行这个协程呢?
socket 读写实现
Swoole 的 socket 读写使用的成熟的 IO 多路复用模型: epoll/kqueue/select/poll 等, 并且将其封装在结构体_swReactor 中, 其定义如下:
- struct _swReactor
- {
- // 超时时间
- int32_t timeout_msec;
- //fd 的读写事件处理函数
- swReactor_handle handle[SW_MAX_FDTYPE];
- swReactor_handle write_handle[SW_MAX_FDTYPE];
- swReactor_handle error_handle[SW_MAX_FDTYPE];
- //fd 事件的注册修改删除以及 wait
- // 函数指针,(以 epoll 为例)指向的是 epoll_ctl,epoll_wait
- int (*add)(swReactor *, int fd, int fdtype);
- int (*set)(swReactor *, int fd, int fdtype);
- int (*del)(swReactor *, int fd);
- int (*wait)(swReactor *, struct timeval *);
- void (*free)(swReactor *);
- // 超时回调函数, 结束, 开始回调函数
- void (*onTimeout)(swReactor *);
- void (*onFinish)(swReactor *);
- void (*onBegin)(swReactor *);
- }
在调用函数 PHPCoroutine::create 创建协程时, 会校验是否已经初始化_swReactor 对象, 如果没有则会调用 php_swoole_reactor_init 函数创建并初始化 main_reactor 对象;
- void php_swoole_reactor_init()
- {
- if (SwooleG.main_reactor == NULL)
- {
- SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor));
- if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) <0)
- {
- }
- ......
- php_swoole_register_shutdown_function_prepend("swoole_event_wait");
- }
- }
我们以 epoll 为例, main_reactor 各回调函数如下:
- reactor->onFinish = swReactor_onFinish;
- reactor->onTimeout = swReactor_onTimeout;
- reactor->add = swReactorEpoll_add;
- reactor->set = swReactorEpoll_set;
- reactor->del = swReactorEpoll_del;
- reactor->wait = swReactorEpoll_wait;
- reactor->free = swReactorEpoll_free;
注意: 这里注册了一个函数 swoole_event_wait, 在生命周期 register_shutdown 阶段会执行该函数, 开始 Swoole 的事件循环, 阻挡了 PHP 生命周期的结束.
类 Socket 封装了 socket 读写相关的所有操作以及数据结构, 其定义如下:
- class Socket
- {
- public:
- swConnection *socket = nullptr;
- // 读写函数
- ssize_t recv(void *__buf, size_t __n);
- ssize_t send(const void *__buf, size_t __n);
- ......
- private:
- swReactor *reactor = nullptr;
- Coroutine *read_co = nullptr;
- Coroutine *write_co = nullptr;
- // 连接超时时间, 接收数据, 发送数据超时时间
- double connect_timeout = default_connect_timeout;
- double read_timeout = default_read_timeout;
- double write_timeout = default_write_timeout;
- }
socket 字段类型为 swConnection, 代表传输层连接;
reactor 字段指向结构体 swReactor 对象, 用于 fd 事件的注册, 修改, 删除以及 wait;
当调用 recv()函数接收数据, 阻塞了该协程时, read_co 字段指向该协程对象 Coroutine;
当调用 send()函数接收数据, 阻塞了该协程时, write_co 字段指向该协程对象 Coroutine;
类 Socket 初始化函数为 Socket::init_sock:
- void Socket::init_sock(int _fd)
- {
- reactor = SwooleG.main_reactor;
- // 设置协程类型 fd(SW_FD_CORO_SOCKET)的读写事件处理函数
- if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET))
- {
- reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, readable_event_callback);
- reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, writable_event_callback);
- reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, error_event_callback);
- }
- }
当我们调用 CoroutineSocket->recv 接收数据时, 底层实现如下:
- Socket::timeout_setter ts(sock->socket, timeout, SW_TIMEOUT_READ);
- ssize_t bytes = all ? sock->socket->recv_all(ZSTR_VAL(buf), length) : sock->socket->recv(ZSTR_VAL(buf), length);
类 timeout_setter 会设置 socket 的接收数据超时时间 read_timeout 为 timeout.
函数 socket->recv_all 会循环读取数据, 直到读取到指定长度的数据, 或者底层返回等待标识阻塞当前协程:
- ssize_t Socket::recv_all(void *__buf, size_t __n)
- {
- timer_controller timer(&read_timer, read_timeout, this, timer_callback);
- while (true)
- {
- do {
- retval = swConnection_recv(socket, (char *) __buf + total_bytes, __n - total_bytes, 0);
- } while (retval <0 && swConnection_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ));
- if (unlikely(retval <= 0))
- {
- break;
- }
- total_bytes += retval;
- if ((size_t) total_bytes == __n)
- {
- break;
- }
- }
- }
函数首先创建 timer_controller 对象, 设置其超时时间为 read_timeout, 以及超时回调函数为 timer_callback;
while (true)死循环读取 fd 数据, 当读取数据量等于__n 时, 读取操作结束, break 该循环; 如果读取操作 swConnection_recv 返回值小于 0, 并且错误标识为 SW_WAIT, 说明需要等待数据到来, 此时阻塞当前协程等待数据到来 (函数 wait_event 会换出当前协程), 阻塞超时时间为 read_timeout(函数 timer.start() 用于设置超时时间).
- class timer_controller
- {
- public:
- bool start()
- {
- if (timeout> 0)
- {
- *timer_pp = swTimer_add(&SwooleG.timer, (long) (timeout * 1000), 0, data, callback);
- }
- }
- }
函数 swTimer_add 用于添加一个定时器; Swoole 底层定时任务是通过最小堆实现的, 堆顶元素的超时时间最近; 结构体_swTimer 维护着 Swoole 内部所有的定时任务:
- struct _swTimer
- {
- swHeap *heap; // 最小堆
- swHashMap *map; //map, 定时器 ID 作为 key
- // 最早的定时任务触发时间
- long _next_msec;
- // 函数指针, 指向 swReactorTimer_set
- int (*set)(swTimer *timer, long exec_msec);
- // 函数指针, 指向 swReactorTimer_free
- void (*free)(swTimer *timer);
- };
当调用 swTimer_add 向_swTimer 结构中添加定时任务时, 需要更新_swTimer 中最早的定时任务触发时间_next_msec, 同时更新 main_reactor 对象的超时时间:
- if (timer->_next_msec <0 || timer->_next_msec> _msec)
- {
- timer->set(timer, _msec);
- timer->_next_msec = _msec;
- }
- static int swReactorTimer_set(swTimer *timer, long exec_msec)
- {
- SwooleG.main_reactor->timeout_msec = exec_msec;
- return SW_OK;
- }
函数 wait_event 负责将当前协程换出, 直到注册的事件发生
- bool Socket::wait_event(const enum swEvent_type event, const void **__buf, size_t __n)
- {
- if (unlikely(!add_event(event)))
- {
- return false;
- }
- if (likely(event == SW_EVENT_READ))
- {
- read_co = co;
- read_co->yield();
- read_co = nullptr;
- }
- else // if (event == SW_EVENT_WRITE)
- {
- write_co = co;
- write_co->yield();
- write_co = nullptr;
- }
- }
函数 add_event 用于添加事件, 底层调用 reactor->add 添加 fd 的监听事件;
read_co = co 或者 write_co = co, 用于记录当前哪个协程阻塞在该 socket 对象上, 当该 socket 对象的读写事件被触发时, 可以恢复该协程执行;
函数 yield()将该协程换出;
上面提到, 创建协程时, 注册了一个函数 swoole_event_wait, 在生命周期 register_shutdown 阶段会执行该函数, 开始 Swoole 的事件循环, 阻挡了 PHP 生命周期的结束. 函数 swoole_event_wait 底层就是调用 main_reactor->wait 等待 fd 读写事件的产生; 我们以 epoll 为例讲述事件循环的逻辑:
- static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
- {
- while (reactor->running> 0)
- {
- n = epoll_wait(epoll_fd, events, max_event_num, swReactor_get_timeout_msec(reactor));
- if (n == 0)
- {
- if (reactor->onTimeout != NULL)
- {
- reactor->onTimeout(reactor);
- }
- SW_REACTOR_CONTINUE;
- }
- for (i = 0; i <n; i++)
- {
- if ((events[i].events & EPOLLIN) && !event.socket->removed)
- {
- handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
- ret = handle(reactor, &event);
- }
- if ((events[i].events & EPOLLOUT) && !event.socket->removed)
- {
- handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
- ret = handle(reactor, &event);
- }
- }
- }
- }
swReactorEpoll_wait 是对函数 epoll_wait 的封装; 当有读写事件发生时, 执行相应的 handle, 根据上面的讲解我们知道读写事件的 handle 分别为 readable_event_callback 和 writable_event_callback;
- int Socket::readable_event_callback(swReactor *reactor, swEvent *event)
- {
- Socket *socket = (Socket *) event->socket->object;
- socket->read_co->resume();
- }
可以看到函数 readable_event_callback 只是简单的恢复 read_co 协程即可;
当 epoll_wait 发生超时, 最终调用的是函数 swReactor_onTimeout, 该函数会从 Swoole 维护的一系列定时任务 swTimer 中查找已经超时的定时任务, 同时执行其 callback 回调;
- while ((tmp = swHeap_top(timer->heap)))
- {
- tnode = tmp->data;
- if (tnode->exec_msec> now_msec || tnode->round == timer->round)
- {
- break;
- }
- timer->_current_id = tnode->id;
- if (!tnode->remove)
- {
- tnode->callback(timer, tnode);
- }
- ......
- }
- // 该定时任务没有超时, 需要更新需要更新_swTimer 中最早的定时任务触发时间_next_msec
- long next_msec = tnode->exec_msec - now_msec;
- if (next_msec <= 0)
- {
- next_msec = 1;
- }
- // 同时更新 main_reactor 对象的超时时间, 实现函数为 swReactorTimer_set
- timer->set(timer, next_msec);
该 callback 回调函数即为上面设置的 timer_callback:
- void Socket::timer_callback(swTimer *timer, swTimer_node *tnode)
- {
- Socket *socket = (Socket *) tnode->data;
- socket->set_err(ETIMEDOUT);
- if (likely(tnode == socket->read_timer))
- {
- socket->read_timer = nullptr;
- socket->read_co->resume();
- }
- else if (tnode == socket->write_timer)
- {
- socket->write_timer = nullptr;
- socket->write_co->resume();
- }
- }
同样的, timer_callback 函数只是简单的恢复 read_co 或者 write_co 协程即可
sleep 实现
Co::sleep()的实现函数为 PHP_METHOD(swoole_coroutine_util, sleep), 该函数通过调用 Coroutine::sleep 实现了协程休眠的功能:
- int Coroutine::sleep(double sec)
- {
- Coroutine* co = Coroutine::get_current_safe();
- if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL)
- {
- return -1;
- }
- co->yield();
- return 0;
- }
可以看到, 与 socket 读写事件超时处理相同, sleep 内部实现时通过 swTimer_add 添加定时任务, 同时换出当前协程实现的. 该定时任务会导致 main_reactor 对象的超时时间的改变, 即修改了 epoll_wait 的超时时间.
sleep 的超时处理函数为 sleep_timeout, 只需要换入该阻塞协程对象即可, 实现如下:
- static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
- {
- ((Coroutine *) tnode->data)->resume();
- }
来源: https://segmentfault.com/a/1190000019089997