AI 前线导读: 近日, eBay 宣布正式开源 Accelerator, 一款久经考验的数据处理框架, 提供快速的数据访问, 并行执行以及自动组织源码, 输入数据和结果. 它可以用于日常数据分析, 也可以用在包含数十万大型数据文件的实时推荐系统上.
Accelerator 可运行在笔记本电脑或机架式服务器上, 轻松处理数十亿行数据, 井然有序地处理成千上万的输入文件, 计算和结果.
Accelerator 的数据吞吐量通常在每秒数百万行. 如果运行在高速计算机上, 每秒最多可处理几十亿行数据.
Accelerator 最初由瑞典人工智能公司 Expertmaker 开发, 于 2012 年正式发布, 从那以后, 它一直是众多研究项目和实时推荐系统的核心工具. 2016 年, Expertmaker 被 eBay 收购, 而 eBay 目前正在基于 Apache 许可协议第 2 版开源 Expertmaker Accelerator.
设计目标
Accelerator 的主要设计目标如下:
简化在多个 CPU 上并行处理数据.
数据吞吐量应尽可能快, 即使一台小型笔记本电脑也能轻松处理数百万行数据.
如果可能, 尽量重用计算结果, 而不是重新计算. 同样, 在多个用户之间共享结果应该是毫不费力的.
数据科学项目可能会有很多 (数十万) 输入文件和大量的源码和中间结果.
Accelerator 应该避免手动管理和记录数据文件, 计算, 结果以及它们之间的关系.
主要功能
Accelerator 主要的原子操作是创建作业. 创建作业是用输入数据和参数执行一些程序并将结果 (即输出) 以及计算所需的所有信息存储到磁盘上的过程. 作业目录将包含计算结果和计算结果所需的所有信息.
作业可以是简单或复杂的计算, 也可以是大型数据集的容器. 作业之间可以彼此链接, 新作业可以依赖于一个或多个旧作业.
关键特性
Accelerator 提供了两个关键功能, 结果重用和数据流.
结果重用
在创建新作业之前, Accelerator 会检查之前是否已经跑过相同的作业. 如果已经存在, Accelerator 不会创建这个作业, 而是将现有作业的链接返回. 这样不仅节省了执行时间, 而且有助于在用户之间共享结果. 更重要的是, 它提供了可见性和确定性.
Accelerator 提供了一种机制, 将会话中的作业信息保存到数据库中, 这样有助于管理作业和它们相互之间的关系.
数据流
将连续的数据流从磁盘传输到 CPU 比在数据库中执行随机查询效率更高. 流式传输是实现从磁盘到 CPU 高带宽的最佳途径. 它不需要缓存, 可以很好地利用操作系统的基于 RAM 的磁盘缓冲区.
整体架构
现在让我们来看看 Accelerator 的整体架构.
是一个基于客户端 / 服务器的应用程序. 在左侧有一个 runner 客户端, 在右边有两台服务器, 称为 daemon 和 urd, 其中 urd 是可选的. runner 通过执行脚本 (构建脚本) 在 daemon 服务器上启动作业. 此服务器将加载和存储使用基于 workdirs 文件系统的数据库执行的所有作业的信息和结果. 同时, 构建脚本中所有有关作业的信息将由 urd 服务器存储到作业日志文件系统的数据库中. urd 负责管理作业, 包括存储和检索之前执行过的相关作业的会话或清单.
作业
作业是通过执行称为 method 的小程序来创建的. method 用 Python 2 或 Python 3 编写, 有时也用 C 语言.
最简单的作业:"Hello, World"
我们通过一个简单的 "Hello World" 程序来说明如何创建一个作业(method):
- def synthesis():
- return "hello world"
这个程序不需要任何输入参数, 只是返回一个字符串并退出. 要执行它, 我们还需要创建一个构建脚本, 如下所示:
- def main(urd):
- jid = urd.build('hello_world')
当执行完这个方法之后, 用户会得到一个叫作 jobid 的链接. jobid 指向存储执行结果的目录, 以及运行作业所需的所有信息.
如果我们尝试再次执行这个作业, 它将不会被执行, 而是返回指向上一次执行作业的 jobid, 因为 Accelerator 记得之前已经执行过与此类似的作业. 要再次执行作业, 我们必须更改源代码或输入参数.
链接作业
我们假设刚刚创建的 hello_world 作业非常耗费计算资源, 并已经返回了我们想要的结果. 为了简单起见, 我们通过创建一个名为 print_result 的方法来演示其中的原理, 该方法只读取前一个作业的结果并将结果打印到 stdout.
- import blob
- jobids = ('hello_world_job',)
- def synthesis():
- x = blob.load(jobid=jobids.hello_world_job)
- print(x)
要创建这个作业, 我们需要扩展构建脚本:
- def main(urd):
- jid = urd.build('hello_world')
- urd.build('print_result', jobids=dict(hello_world_job=jid))
在执行构建脚本时, 只会创建 print_result 作业, 因为 hello_world 作业之前已经创建过了.
作业执行流程和结果传递
到目前为止, 我们已经知道如何创建, 链接和执行简单的作业. 现在我们将重点转向 method. 在执行 method 时, Accelerator 会调用三个函数, 它们分别是 prepare(),analysis() 和 synthesis(). 一个 method 可以同时调用这三个函数, 或者至少调用一个.
三个函数的返回值都可以存储在作业的目录中, 并被用在其他作业上.
数据集
数据集是 Accelerator 默认的存储类型, 专为并行处理和高性能而设计. 数据集建立在作业之上, 因此数据集通过 method 来创建, 并存储在作业目录中. 单个作业可以包含任意数量的数据集.
在内部, 数据集中的数据以行列格式存储. 所有列都可以被独立访问, 避免读取到不必要的数据. 数据也被分成固定数量的片段, 提供并行访问能力. 数据集可能会被散列, 散列函数将具有相同散列值的数据行组合到同一个片段中.
导入数据
让我们来看看导入文件 (创建数据集) 的常见操作. csvimport 方法可用于导入许多不同的文件类型, 它可以解析大量的 CSV 格式的文件, 并将数据存储为数据集. 创建的数据集存储在结果作业中, 数据集的名称默认为 jobid 加上字符串 default, 也可以使用自定义字符串.
链接数据集
就像作业一样, 数据集也可以相互链接. 由于数据集是建立在作业之上的, 所以链接数据集就很简单. 例如, 假设我们刚刚将 file0.txt 导入 imp-0, 并且 file1.txt 中存储了更多数据. 我们可以导入后一个文件并提供一个指向前一个数据集的链接. 由于数据集已链接, 现在可以使用 imp-1(或 imp-1/default)数据集引用来访问从这两个数据集导入的所有数据文件.
在处理随时间增长的数据 (如日志数据) 时, 使用链接十分方便. 我们可以通过链接扩展具有更多行的数据集, 这是一个非常轻量级的操作.
将新列添加到数据集
添加列是很常用操作, Accelerator 通过链接来处理新列.
原理很简单, 假设我们有一个 "源" 数据集, 我们要添加一个新列, 只需要创建一个只包含新列的新数据集, 并在创建它时让 Accelerator 将所有源数据集的列链接到新数据集.
并行执行
Accelerator 专为并行处理而设计, 主要通过分片数据集和并行 analysis() 调用组合来实现并行处理.
迭代器在 analysis() 函数内部运行, 该函数为每个数据集片段 fork 一次. analysis() 函数的返回值将作为 synthesis() 函数的输入. 我们可以显式地合并结果, 不过 analysis_res 带有一个相当神奇的方法 merge_auto(), 它根据数据类型将所有片段的结果合并为一个.
urd
我们已经看到 Accelerator 如何跟踪已经创建好的作业, 并在必要时重用作业. 这样节省了时间并将相关的计算链接在一起, 不过, 在这之上还有另一个层可以进一步提高可视性和作业重用性, 它就是 urd 服务器.
urd 将作业清单及其依赖关系存储在基于日志文件的数据库中. 在构建脚本中发生的所有事情都可以记录到 urd 中. 为了做到这一点, 我们需要一个名单来存储信息, 还需要一个密钥, 而在大多数情况下还需要一个日期, 方便日后查找.
性能测试
新作业的启动时间只有几分之一秒. 以下是一些不同作业类型的处理时间.
准备数据: 导入, 类型转换和散列
示例数据文件大小为 1.1TB(压缩后 280GB), 包含 63 亿行和 14 列. Accelerator 运行在具有 72 核心和快速磁盘的大型机上.
上述数值是基于全部数据得出的. 导入作业 (A): 导入 gz 压缩文件. 有趣的是, 导入比普通的 zcat file.gz> /dev/null 要快 30%. 在 FreeBSD 上, zcat 速度更快. 类型转换作业(B):5 个 json-list,5 个数字, 2 个日期和 2 个 unicode 列, 每行平均有 172 个字节. 该作业的读取速度超过每秒半千兆字节, 同时保存几乎相同数量的数据到磁盘上, 因此磁盘带宽高于每秒 1 千兆字节. 由于散列速度取决于被散列的列, 因此显示的值(C) 是四个散列作业的平均值.
处理数据
为了计算Σ(a×b×c), 我们通过一个 method 读取三个列, 将它们的值相乘并将结果写入新列. 第二个作业为新列添加值.
可以看到, 将三个 float64 相乘并写回到磁盘实际上是很快的 -- 每秒 7 千 7 百万行. 将这些值汇总在一起甚至更快 -- 每秒超过十亿个值. 而在 Python 中, 执行同样操作需要 6 秒.
结论
Accelerator 是一款用于快速数据处理的工具. 在单机上, 每秒可以处理数百万行数据, 如果任务简单, 可以每秒处理 10 亿行. 除了速度快之外, Accelerator 还可以减少手动管理源文件, 数据文件, 计算以及相关结果的工作. 它已被成功用在多个项目中, eBay 现在正式将其开源.
相关链接:
ExpertMaker Accelerator 代码仓库 (https://github.com/eBay/accelerator)
Installer 仓库 (https://github.com/eBay/accelerator-project_skeleton)
Accelerator 用户参考手册 (https://berkeman.github.io/pdf/acc_manual.pdf)
来源: https://juejin.im/post/5ae9626ff265da0b926564de