随着移动互联网的快速发展, 人工智能在移动端上的应用越来越广泛, 集团内端智能在图像识别, 视频检测, 数据计算等核心场景发挥着重要作用. 而在开发阶段, Python 毋庸置疑是算法进行研发的首选语言. 但在移动端上, 进行算法的部署, 调试, 验证, 仍处在 "刀耕火种" 的时代, 目前算法主要通过在代码中插入日志, 验证程序的运行逻辑和结果.
通过打日志当然也能验证结果和定位问题, 但一旦工程稍微复杂点, 生产效率会非常低. 因此, 在 MNN 工作台之中 (点击文末阅读原文, 前往 MNN 官网: www.mnn.zone 下载) 嵌入了端侧 Python 调试能力. 经常使用 Python 的同学一定熟悉 pdb 模块, 它是 Python 官方标准库提供的交互式代码调试器, 和任何一门语言提供的调试能力一样, pdb 提供了源代码行级别的设置断点, 单步执行等常规调试能力, 是 Python 开发的一个很重要的工具模块.
今天就让我们来重点分析下官方 pdb 模块源码, 看看其调试功能的底层技术原理.
原理
从 cpython 源码中可以看到, pdb 模块并非 c 实现的内置模块, 而是纯 Python 实现和封装的模块. 核心文件是 pdb.py, 它继承自 bdb 和 cmd 模块:
class Pdb(bdb.Bdb, cmd.Cmd): ...
基本原理: 利用 cmd 模块定义和实现一系列的调试命令的交互式输入, 基于 sys.settrace 插桩跟踪代码运行的栈帧, 针对不同的调试命令控制代码的运行和断点状态, 并向控制台输出对应的信息.
cmd 模块主要是提供一个控制台的命令交互能力, 通过 raw_input/readline 这些阻塞的方法实现输入等待, 然后将命令交给子类处理决定是否继续循环输入下去, 就和他主要的方法名 runloop 一样.
cmd 是一个常用的模块, 并非为 pdb 专门设计的, pdb 使用了 cmd 的框架从而实现了交互式自定义调试.
bdb 提供了调试的核心框架, 依赖 sys.settrace 进行代码的单步运行跟踪, 然后分发对应的事件 (call/line/return/exception) 交给子类 (pdb) 处理. bdb 的核心逻辑在对于调试命令的中断控制, 比如输入一个单步运行的 "s" 命令, 决定是否需要继续跟踪运行还是中断等待交互输入, 中断到哪一帧等.
基本流程
pdb 启动, 当前 frame 绑定跟踪函数 trace_dispatch
- def trace_dispatch(self, frame, event, arg):
- if self.quitting:
- return # None
- if event == 'line':
- return self.dispatch_line(frame)
- if event == 'call':
- return self.dispatch_call(frame, arg)
- if event == 'return':
- return self.dispatch_return(frame, arg)
- if event == 'exception':
- ...
每一帧的不同事件的处理都会经过中断控制逻辑, 主要是 stop_here(line 事件还会经过 break_here)函数, 处理后决定代码是否中断, 需要中断到哪一行
如需要中断, 触发子类方法 user_#event, 子类通过 interaction 实现栈帧信息更新, 并在控制台打印对应的信息, 然后执行 cmdloop 让控制台处于等待交互输入
- def interaction(self, frame, traceback):
- self.setup(frame, traceback) # 当前栈, frame,local vars
- self.print_stack_entry(self.stack[self.curindex])
- self.cmdloop()
- self.forget()
用户输入调试命令如 "next" 并回车, 首先会调用 set_# 命令, 对 stopframe,returnframe,stoplineno 进行设置, 它会影响中断控制 `stop_here 的逻辑, 从而决定运行到下一帧的中断结果.
- def _set_stopinfo(self, stopframe, returnframe, stoplineno=0):
- self.stopframe = stopframe
- self.returnframe = returnframe
- self.quitting = 0
- # stoplineno>= 0 means: stop at line>= the stoplineno
- # stoplineno -1 means: don't stop at all
- self.stoplineno = stoplineno
对于调试过程控制类的命令, 一般 do_# 命令都会返回 1, 这样本次 runloop 立马结束, 下次运行到某一帧触发中断会再次启动 runloop(见步骤 3); 对于信息获取类的命令, do_# 命令都没有返回值, 保持当前的中断状态.
代码运行到下一帧, 重复步骤 3
中断控制
中断控制也就是对于不同的调试命令输入后, 能让代码执行到正确的位置停止, 等待用户输入, 比如输入 "s" 控制台就应该在下一个运行 frame 的代码处停止, 而输出 "c" 就需要运行到下一个打断点的地方. 中断控制发生在 sys.settrace 的每一步跟踪的中, 是调试运行的核心逻辑.
pdb 中主要跟踪了 frame 的四个事件:
line: 同一个 frame 中的顺序执行事件
call: 发生函数调用, 跳到下一级的 frame 中, 在函数第一行产生 call 事件
return: 函数执行完最后一行(line), 发生结果返回, 即将跳出当前 frame 回到上一级 frame, 在函数最后一行产生 return 事件
exception: 函数执行中发生异常, 在异常行产生 exception 事件, 然后在该行返回(return 事件), 接下来一级一级向上在 frame 中产生 exception 和 return 事件, 直到回到底层 frame.
它们是代码跟踪时的不同节点类型, pdb 根据用户输入的调试命令, 在每一步 frame 跟踪时都会进行中断控制, 决定接下来是否中断, 中断到哪一行. 中断控制的主要方法是 stop_here:
- def stop_here(self, frame):
- # (CT) stopframe may now also be None, see dispatch_call.
- # (CT) the former test for None is therefore removed from here.
- if self.skip and \
- self.is_skipped_module(frame.f_globals.get('__name__')):
- return False
- # next
- if frame is self.stopframe:
- # stoplineno>= 0 means: stop at line>= the stoplineno
- # stoplineno -1 means: don't stop at all
- if self.stoplineno == -1:
- return False
- return frame.f_lineno>= self.stoplineno
- # step: 当前只要追溯到 botframe, 就等待执行.
- while frame is not None and frame is not self.stopframe:
- if frame is self.botframe:
- return True
- frame = frame.f_back
- return False
调试命令大体上分两类:
过程控制: 如 setp,next,continue 等这些执行后马上进入下阶段的代码执行
信息获取 / 设置: 如 args,p,list 等获取当前信息的, 也不会影响 cmd 状态
下面重点讲解几个最常见的, 用于过程控制的调试命令中断控制实现原理:
s(step)
1 命令定义
执行下一条命令, 如果本句是函数调用, 则 s 会执行到函数的第一句.
2 代码分析
pdb 中实现逻辑为顺序执行每一个帧 frame 并等待执行, 它的执行粒度和 settrace 一样.
- def stop_here(self, frame):
- ...
- # stopframe 为 None
- if frame is self.stopframe:
- ...
- # 当前 frame 一定会追溯到 botframe, 返回 true
- while frame is not None and frame is not self.stopframe:
- if frame is self.botframe:
- return True
- frame = frame.f_back
- return False
step 会将 stopframe 设置为 None, 因此只要当前 frame 能向后一直追溯到底层 frame(botframe), 就表示可以等待执行了, 也就是 pdb 处于交互等待状态.
因为 step 的执行粒度和 settrace 一样, 所以运行到每一帧都会等待执行.
n(next)
1 命令定义
执行下一条语句, 如果本句是函数调用, 则执行函数, 接着执行当前执行语句的下一条.
2 代码分析
pdb 中实现逻辑为, 运行至当前 frame 的下一次跟踪中断, 但进入到下一个 frame(函数调用)中不会中断.
- def stop_here(self, frame):
- ...
- # 如果 frame 还没跳出 stopframe, 永远返回 true
- if frame is self.stopframe:
- if self.stoplineno == -1:
- return False
- return frame.f_lineno>= self.stoplineno
- # 如果 frame 跳出了 stopframe, 进入下一个 frame, 则执行不会中断, 一直到跳出到 stopframe
- # 还有一种情况, 如果在 return 事件中断执行了 next, 下一次跟踪在上一级 frame 中, 此时上一级 frame 能跟踪到 botframe, 中断
- while frame is not None and frame is not self.stopframe:
- if frame is self.botframe:
- return True
- frame = frame.f_back
- return False
next 会设置 stopframe 为当前 frame, 也就是除非在当前 frame 内, 进入其他的 frame 都不会执行中断.
c
1 命令定义
继续执行, 直到遇到下一条断点
2 代码分析
stopframe 设置为 botframe,stoplineno 设置为 - 1.stop_here 总返回 false, 运行不会中断, 直到遇到断点(break_here 条件成立)
- def stop_here(self, frame): ... # 如果在 botframe 中, stoplineno 为 - 1 返回 false if frame is self.stopframe: if self.stoplineno == -1: return False return frame.f_lineno>= self.stoplineno # 如果在非 botframe 中, 会先追溯到 stopframe, 返回 false while frame is not None and frame is not self.stopframe: if frame is self.botframe: return True frame = frame.f_back return False
- r(return)
1 命令定义
执行当前运行函数到结束.
2 代码分析
return 命令仅在执行到 frame 结束 (函数调用) 时中断, 也就是遇到 return 事件时中断.\
pdb 会设置 stopframe 为上一帧 frame,returnframe 为当前 frame. 如果是非 return 事件, stop_here 永远返回 false, 不会中断;
- def stop_here(self, frame):
- ...
- # 如果当前帧代码顺序执行, 下一个 frame 的 lineno==stoplineno
- # 如果执行到 for 循环的最后一行, 下一个 frame(for 循环第一行)的 lineno<stoplineno, 不会中断. 直到 for 循环执行结束, 紧接着的下一行的 lineno==stoplineno, 执行中断
- if frame is self.stopframe:
- if self.stoplineno == -1:
- return False
- return frame.f_lineno>= self.stoplineno
- # 如果在非 botframe 中, 会先追溯到 stopframe, 返回 false, 同 next
- while frame is not None and frame is not self.stopframe:
- if frame is self.botframe:
- return True
- frame = frame.f_back
- return False
如果是 return 事件, stop_here 仍然返回 false, 但是 returnframe 为当前 frame 判断成立, 会执行中断.
- def dispatch_return(self, frame, arg):
- if self.stop_here(frame) or frame == self.returnframe:
- self.user_return(frame, arg)
- if self.quitting: raise BdbQuit
- return self.trace_dispatch
- unt(until)
1 命令定义
执行到下一行, 和 next 的区别就在于 for 循环只会跟踪一次
2 代码分析
设置 stopframe 和 returnframe 为当前 frame,stoplineno 为当前 lineno+1.
- def stop_here(self, frame):
- ...
- # 如果当前帧代码顺序执行, 下一个 frame 的 lineno==stoplineno
- # 如果执行到 for 循环的最后一行, 下一个 frame(for 循环第一行)的 lineno<stoplineno, 不会中断. 直到 for 循环执行结束, 紧接着的下一行的 lineno==stoplineno, 执行中断
- if frame is self.stopframe:
- if self.stoplineno == -1:
- return False
- return frame.f_lineno>= self.stoplineno
- # 如果在非 botframe 中, 会先追溯到 stopframe, 返回 false, 同 next
- while frame is not None and frame is not self.stopframe:
- if frame is self.botframe:
- return True
- frame = frame.f_back
- return False
如果在当前 frame 中有 for 循环, 只会从上向下执行一次. 如果是函数返回 return 事件, 下一个 frame 的 lineno 有可能小于 stoplineno, 所以把 returnframe 设置为当前 frame, 这样函数执行就和 next 表现一样了.
u(up)/ d(down)
1 命令定义
切换到上 / 下一个栈帧
2 代码分析
栈帧信息
栈帧包含代码调用路径上的每一级 frame 信息, 每次命令执行中断都会刷新, 可以通过 u/d 命令上下切换 frame.\
栈帧获取主要通过 get_stack 方法, 第一个参数是 frame, 第二个参数是 traceback object.traceback object 是在 exception 事件产生的, exception 事件会带一个 arg 参数:
- exc_type, exc_value, exc_traceback = arg
- (<type 'exceptions.IOError'>, (2, 'No such file or directory', 'wdwrg'), <traceback object at 0x10bd08a70>)
traceback object 有几个常用的属性:
tb_frame: 当前 exception 发生在的 frame
tb_lineno: 当前 exception 发生在的 frame 的行号, 即 frame.tb_lineno
tb_next: 指向堆栈下一级调用的 exc_traceback(traceback object), 如果是最顶层则为 None
栈帧信息由两部分组成, frame 的调用栈和异常栈(如有), 顺序为: botframe -> frame1 -> frame2 -> tb1 -> tb2(出错 tb)
- def get_stack(self, f, t):
- stack = []
- if t and t.tb_frame is f:
- t = t.tb_next
- # frame 调用栈, 从底到顶
- while f is not None:
- stack.append((f, f.f_lineno))
- if f is self.botframe:
- break
- f = f.f_back
- stack.reverse()
- i = max(0, len(stack) - 1)
- # 异常栈, 从底到顶(出错栈)
- while t is not None:
- stack.append((t.tb_frame, t.tb_lineno))
- t = t.tb_next
- if f is None:
- i = max(0, len(stack) - 1)
- return stack, i
pdb 每次执行中断都会更新调用的栈帧表, 以及当前的栈帧信息, 堆栈切换只要向上 / 下切换索引即可.
- def setup(self, f, t):
- self.forget()
- self.stack, self.curindex = self.get_stack(f, t)
- self.curframe_locals = self.curframe.f_locals
- ...
- ...
- def do_up(self, arg):
- if self.curindex == 0:
- print>>self.stdout, '*** Oldest frame'
- else:
- self.curindex = self.curindex - 1
- self.curframe = self.stack[self.curindex][0]
- self.curframe_locals = self.curframe.f_locals
- self.print_stack_entry(self.stack[self.curindex])
- self.lineno = None
- b(break)
区别于过程控制的调试命令, break 命令用来设置断点, 不会马上影响程序中断状态, 但可能会影响后续的中断. 在 line 事件发生的时候, 除了 stop_here 会增加 break_here 的条件判断, 设置断点的实现比较简单, 这里主要介绍对函数设置断点的时候, 是怎么让代码执行到函数第一行中断的.
设置断点时, 断点的 lineno 为了函数的第一行:
- # 函数断点示例: break func
- def do_break(self, arg, temporary = 0):
- ...
- if hasattr(func, 'im_func'):
- func = func.im_func
- funcname = code.co_name
- lineno = code.co_firstlineno
- filename = code.co_filename
当 line 事件执行到函数的第一行代码时, 这一行没有主动设置过断点, 但是函数第一行 co_firstlineno 命中断点, 所以会继续判断断点有效性.
- def break_here(self, frame):
- ...
- lineno = frame.f_lineno
- if not lineno in self.breaks[filename]:
- lineno = frame.f_code.co_firstlineno
- if not lineno in self.breaks[filename]:
- return False
- # flag says ok to delete temp. bp
- (bp, flag) = effective(filename, lineno, frame)
断点的有效性判断通过 effective 方法, 其中处理了 ignore,enabled 这些配置, 对函数断点的有效性判断通过 checkfuncname 方法:
- def checkfuncname(b, frame):
- """Check whether we should break here because of `b.funcname`."""
- ...
- # Breakpoint set via function name.
- ...
- # We are in the right frame.
- if not b.func_first_executable_line:
- # The function is entered for the 1st time.
- b.func_first_executable_line = frame.f_lineno
- if b.func_first_executable_line != frame.f_lineno:
- # But we are not at the first line number: don't break.
- return False
- return True
在 line 事件在函数第一行发生时, func_first_executable_line 还没有, 于是设置为当前行号, 并且断点生效, 因此函数执行到第一行中断. 接下来 line 到行数的后面行时, 因为 func_first_executable_line 已经有值, 并且肯定不等于当前行号, 所以 break_here 判断为无效, 不会中断.
实例分析
以下结合一个很简单的 Python 代码调试的例子, 复习下上述命令的实现原理:
在控制台中, 命令行执行快照:
命令行中执行 python test.py,Python 代码实际是从第一行开始执行的, 但因为 pdb.set_trace()是在__main__中调用的, 所以实际是从 set_trace 的下一行才挂载到 pdb 的跟踪函数, 开始 frame 的中断控制.
这段 Python 代码执行会经过经过 3 个 frame:
底层根 frame0, 即__main__所在的 frame0, 其中包含一断 for 循环代码, frame0 的 back frame 为 None
第二层 frame1, 进入 func 方法所在的 frame1,frame1 的 back frame 为 frame0
顶层 frame2, 进入 add 方法所在的 frame2,frame2 的 back frame 为 frame1
调试过程:
跟踪__main__所在的 frame(根 frame0), 在 20 行触发 line 事件
用户输入 unt 命令回车, frame0 在 21 行触发 line 事件, 行号等于上一次跟踪行号 + 1,stop_here 成立, 中断等待
用户输入 unt 命令回车, 同 2, 在 22 行中断
用户输入 unt 命令回车, 代码跟踪至 frame0 在 20 行触发 line 事件, 行号小于上一次跟踪行号 + 1(23),stop_here 不成立, 继续执行
在 24 行触发 line 事件, 行号大于上一次跟踪行号 + 1(23),stop_here 成立, 中断等待
用户输入 s 命令回车, 代码跟踪至 frame1 在 12 行触发 call 事件, step 执行粒度和 sys.settrace 一样, 在 12 行中断等待
用户设置 add 函数断点, 断点列表中会加入 add 函数的第一行 (第 7 行) 的断点
用户输入 c 命令回车, stop_here 总返回 false, 继续跟踪运行直到在第 8 行触发 line 事件, 虽然第 8 行不再断点列表中, 但当前函数帧 firstlineno 在, 并且有效, 所以在第 8 行中断等待
用户输入 r 命令回车, 后面的 line 事件处理中 stop_here 都返回 false, 直到在第 10 行触发 return 事件, 此时 returnframe 为当前 frame, 在 10 行中断等待
用户输入 up 命令, 栈帧向前切换索引, 回到上一帧 frame1, 也就是第 13 行 func 中调用 add 的地方
用户输入 down 命令, 栈帧向前后切换索引, 回到当前帧
用户输入 n 命令, 运行至下一次跟踪 14 行(line 事件), 这一次跟踪在 frame1 上, 能追溯到 botframe, 所以在 14 行中断
用户输入 n 命令, 运行至下一次跟踪 14 行(return 事件), 还在当前 frame1 中, 中断
用户输入 n 命令, 运行至下一次跟踪 24 行(return 事件), 这一次跟踪就是 botframe(frame0), 中断
用户输入 n 命令, frame0 执行结束.
小结
Python 标准库提供的 pdb 的实现并不复杂, 本文对源码中的核心的逻辑做了讲解, 如果你了解其原理, 也可以自己定制或重写一个 Python 调试器. 事实上, 业界的很多通用 IDE 如 pycharm,vscode 等都没有使用标准的 pdb, 他们开发了自己的 Python 调试器来更好的适配 IDE. 不过了解 pdb 原理, 在 pdb 上改写和定制调试器来满足调试需求, 也是一种成本低而有效的方式.
MNN 工作台对端侧的调试能力也是基于原生 pdb 实现的, 并且支持阿里巴巴集团内端计算的各种研发场景, 对算法的研发部署都有很大的效率提升. 点击阅读原文, 前往 www.mnn.zone 下载 MNN 工作台赶快体验吧.
关注我们, 每周 3 篇移动技术实践 & 干货给你思考!
来源: https://segmentfault.com/a/1190000040624913