Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通信框架, 可用于构建高并发, 分布式, 可容错, 事件驱动的基于 JVM 的应用, 在 Spark 中曾被用于实现进程, 节点间通信, 在实际项目中协助我们成功搭建了满足业务需求的模型部署平台.
项目背景
某国内大型连锁餐饮企业旗下拥有大量门店. 餐厅门店的每日生产, 订货, 排班都依赖于每日客单量预估的合理性, 其内部数据团队实现了一套预估模型, 需要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署, 从而将模型真正地应用到实际生产环节中.
经过交流, 我们发现在实际生产环境中, 在各方面存在一些问题:
异步: 所有门店的前日销售, 业务等数据均由各自门店的店长负责整合上传. 上传的开始时间, 结束时间, 数据的完整性等均不确定. 而模型训练和预测均依赖这部分数据, 这就意味这无法为模型训练和预测设置统一的开始入口.
高并发: 除了一些特殊类型的门店, 绝大多数门店的营业时间相对固定, 从店长决定整理上传销售数据, 到准备物料, 排班准备次日营业, 留给模型训练和模型预测回吐预测结果的时间大概为 3 小时. 如果每个门店的预测指标有 2 至 3 项, 那么需要有足够的调度能力在规定时间内完成大概 2 万次模型训练加预测流程.
容错: 由于门店数量众多且情况各不相同, 仍然有很多潜在的因素可能导致流程出错或失败. 原则上, 某次流程的失败不应该对其他流程造成任何影响, 每个流程在平台层面应该成为互相独立的任务.
因此, 我们需要一套轻量化的分布式服务框架, 来实现满足上述需求的模型训练预测平台, 并在一定程度上保证平台的可拓展性. 结合此前团队内的技术积累, 最终选择了 Akka 框架用于实现平台的内部通信.
选型过程
消息驱动方式 -- 流程异步化
一次完整的预测任务包括: 训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出, 其中数据准备步骤在时间上不确定, 模型相关步骤在执行结果上不确定, 如果采用同步模型, 将会产生大量的等待线程, 占用浪费大量资源. 在 Actor 模型中, 每个 Actor 作为一个基本计算单元, 回应接收到的消息, 同时并行的:
发送有限数量的消息给其他 Actor
创建有限数量的新 Actor
指定接受到下一个消息时的行为
上述操作没有顺序执行的假设, 因此可以并行进行. 发送者与已经发送的消息解耦, 可以进行无需等待的异步通信.
Actor 模型通信方式
Akka 中的 Actor 本质上就是接收消息并采取行动处理消息的对象, 是封装状态和行为的对象, 它们唯一的通信方式是交换消息 -- 把消息存放在接收方的邮箱里. Actor 自然形成树形结构, 这种结构的精髓在于任务被拆开, 委托, 直到任务小到可以被完整地处理. 因此, 我们将预测任务的各个步骤拆分抽象, 并创建类型消息与步骤对应, 将每个步骤交给线程级别的 Actor 执行处理, 通过发送不同类型的消息来触发创建不同操作的 Actor, 让整个预测流程无需等待.
结构 -- 应对高并发
由于绝大多数门店的营业时间大致相同, 平台在流量上会有明显的峰值和低谷, 在低谷期间平台需要尽可能减少资源占有量, 而在流量峰值来临时平台要能够及时响应, 保证足够的可用性.
经过讨论, 我们确定了采用 Master-Worker 模式的平台结构, Master 负责接收与分配任务, Worker 负责处理执行具体的模型任务.
Master 和 Worker 均为独立的 ActorSystem, 管理内部不同操作逻辑的 Actor, 在空闲状态下占有资源很小. Actor 为线程级别, 同样仅占用极少量资源, 生命周期由 ActorSystem 统一管理. 少量请求时, Actor 线程具有很高的复用率, 请求并发高时, ActorSystem 会创建大量的 Actor 线程用来承接请求, 保证可用性.
Akka 中 Actor 的生命周期
子 Actor-- 模块化提高容错
每个预测任务的模型相关步骤均存在失败的可能性, 此外, 数据准备过程中的网络波动, 内容校验出错等情况, 都会导致当前预测任务的失败. 对于失败的任务, 我们希望能够尽可能记录错误信息, 为重跑提供先决条件.
在 Akka 中, 构建了父子 Actor 的树形监督结构, 提供 Actor 的监督机制以保证容错性, 把处理响应错误的责任交给出错对象以外的实体. 父 Actor 创建子 Actor 来委托处理子任务, 同时便会自动地监管它们. 子 Actor 列表维护在父 Actor 的上下文中, 父 Actor 可以访问它.
Akka 中的 Actor 结构
通过更进一步的拆分细化, 我们将 Worker 端的 Actor 分为 Prepare 和 Executor 两种, Prepare 为主要负责数据准备步骤, Executor 负责模型相关步骤, 统一由 Worker 端的父 Actor 管理, 错误和异常均向上层抛出, 由 Worker 端的父 Actor 记录并发送给的错误收集模块统一处理.
实践应用
ActorSystem
创建 ActorSystem 时, 默认将在 classpath 中寻找 application.conf, application.JSON 和 application.properties, 并自动加载:
- valsystem=ActorSystem("RsModelActorSystem")
- valsystem=ActorSystem("RsModelActorSystem", ConfigFactory.load())// 同上
如果想要使用自己的配置文件, 可以通过 ConfigFactory 来配置加载:
- valsystem =ActorSystem("UniversityMessageSystem",
- ConfigFactory.load("own-application.conf"))
- valconfig =ConfigFactory.parseString(
- s"""
- |akka.remote.netty.tcp.hostname = $host
- |akka.actor.provider = akka.remote.RemoteActorRefProvider
- |akka.remote.enabled-transport = akka.remote.netty.tcp
- |akka.remote.netty.tcp.port = 2445
- """.stripMargin)
- valsystem =ActorSystem("RsModelActorSystem",
- config.withFallback(ConfigFactory.load()))// 同上
ActorSystem 的配置参数中有大量参数可以自定义, 需要根据实际需要修改, 例如在该项目中, 后期单个算法任务对象大小超过了 Akka remote 默认包大小 128000 bytes, 需要修改参数 akka.remote.netty.tcp.maximum-frame-size
Actor
一个 Actor 包含了状态, 行为, 一个邮箱, 子 Actor 和一个监管策略, 所有这些封装在一个 Actor 引用里. Actor 对象通常包含一些变量来反映其所处的可能状态, Akka-actor 自身的轻量线程与系统的其他部分完全隔离, 因此无须担心并发问题. 每当一个消息被处理, 它会与 Actor 的当前行为进行匹配. 行为是一个函数, 它定义了在某个时间点处理当前消息所要采取的动作, 需要结合实际需求编写具体逻辑. Actor 的邮箱是连接发送者与接收者的纽带, 每个 Actor 有且仅有一个邮箱, 所有的发来的消息都在邮箱里排队. 可以有不同策略的邮箱实现供选择, 缺省时为 FIFO.
编写逻辑
在 Actor 类中, 主要逻辑均在 receive 方法中实现, 通过偏函数方法, 执行并返回对应的逻辑:
- //ActorLogging 提供 Actor 内部的日志输出
- classRsActorextendsActorwithActorLogging{
- overridedefreceive:Receive= {
- caseMapMessage(parameters) =>
- println(parameters.get("code"))
- caseMapKeyMessage(parameters, key) =>
- println(parameters.get(key))
- caseStringMessage(msg) =>
- println(msg.getBytes().length)
- caseo:Object=>
- println(o.getClass)
- case_:AnyRef=>
- println("233")
- }
- }
生成引用
生成一个可以接收消息的 Actor 实例主要有两个方法:
- // 生成一个基于本地类的 Actor 实例
- valrsActor = system.actorOf(Props[RsActor],"rsActor")
- // 生成一个基于远程地址的 Actor 实例
- valrmActor =
- system.actorSelection("akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor")
- // 使用! 向对应的 Actor 实例发送消息
- rsActor !StringMessage("test")
- rmActor !MapMessage(Map("code"->"233"))
- Message
Akka 中对传递的消息内容并没有太严格要求, 可以是基本数据类型, 也可以是支持序列化的对象:
- //scala 的 case class 便于简洁地创建消息类
- caseclassStringMessage(msg:String)extendsSerializable
- caseclassMapMessage(parameters:Map[String,String])extendsSerializable
- caseclassMapKeyMessage(parameters:Map[String,String], key:String)extendsSerializable
其他
Akka 作为一款被广泛使用的开源工具, 在实际项目中体现出了很多的优势, 异步的消息驱动方式也给我们提供了一套新的思路和实现方法.
来源: http://www.tuicool.com/articles/JRfuaef