Spark 的官方文档再三强调那些将要作用到 RDD 上的操作, 不管它们是一个函数还是一段代码片段, 它们都是 "闭包",Spark 会把这个闭包分发到各个 worker 节点上去执行, 这里涉及到了一个容易被忽视的问题: 闭包的 "序列化".
显然, 闭包是有状态的, 这主要是指它牵涉到的那些自由变量以及自由变量依赖到的其他变量, 所以, 在将一个简单的函数或者一段代码片段 (就是闭包) 传递给类似 RDD.map 这样的操作前, Spark 需要检索闭包内所有的涉及到的变量(包括传递依赖的变量), 正确地把这些变量序列化之后才能传递到 worker 节点并反序列化去执行. 如果在涉及到的所有的变量中有任何不支持序列化或没有指明如何序列化自己时, 你就会遇到这样的错误:
org.apache.spark.SparkException: Task not serializable
在下面的例子中, 我们从 kafka 中持续地接收 JSON 消息, 并在 spark-streaming 中将字符串解析成对应的实体:
- object App {
- private val config = ConfigFactory.load("my-streaming.conf")
- case class Person (firstName: String,lastName: String)
- def main(args: Array[String]) {
- val zkQuorum = config.getString("kafka.zkQuorum")
- val myTopic = config.getString("kafka.myTopic")
- val myGroup = config.getString("kafka.myGroup")
- val conf = new SparkConf().setAppName("my-streaming")
- val ssc = new StreamingContext(conf, Seconds(1))
- val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
- //this val is a part of closure, and it's not serializable!
- implicit val formats = DefaultFormats
- def parser(JSON: String) = parse(JSON).extract[Person].firstName
- lines.map(_._2).map(parser).print
- ....
- ssc.start()
- ssc.awaitTerminationOrTimeout(10000)
- ssc.stop()
- }
- }
这段代码在执行时就会报如下错误:
- org.apache.spark.SparkException: Task not serializable
- Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$
问题的症结就在于: 闭包没有办法序列化. 在这个例子里, 闭包的范围是: 函数 parser 以及它所依赖的一个隐式参数: formats , 而问题就出在这个隐式参数上, 它的类型是 DefaultFormats, 这个类没有提供序列化和反序列自身的说明, 所以 Spark 无法序列化 formats, 进而无法将 task 推送到远端执行.
隐式参数 formats 是为 extract 准备的, 它的参数列表如下:
org.json4s.ExtractableJsonAstNode#extract[A](implicit formats: Formats, mf: scala.reflect.Manifest[A]): A = ...
找到问题的根源之后就好解决了. 实际上我们根本不需要序列化 formats, 对我们来说, 它是无状态的. 所以, 我们只需要把它声明为一个全局静态的变量就可以绕过序列化. 所以改动的方法就是简单地把 implicit val formats = DefaultFormats 的声明从方法内部迁移到 App Object 的字段位置上即可.
- object App {
- private val config = ConfigFactory.load("my-streaming.conf")
- case class Person (firstName: String,lastName: String)
- //As Object field, global, static, no need to serialize
- implicit val formats = DefaultFormats
- def main(args: Array[String]) {
- val zkQuorum = config.getString("kafka.zkQuorum")
- val myTopic = config.getString("kafka.myTopic")
- val myGroup = config.getString("kafka.myGroup")
- val conf = new SparkConf().setAppName("my-streaming")
- val ssc = new StreamingContext(conf, Seconds(1))
- val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
- def parser(JSON: String) = parse(JSON).extract[Person].firstName
- lines..map(_._2).map(parser).print
- ....
- ssc.start()
- ssc.awaitTerminationOrTimeout(10000)
- ssc.stop()
- }
- }
这里再提供另外一个很好的例子:
这个例子很好演示了解决类似问题的方案:"把类成员变量拷贝一份到闭包中" , 不然整个对象都需要被序列化!
最后我们来总结一下应该如何正确的处理 Spark Task 闭包的序列化问题. 首先你需要对 Task 涉及的闭包的边界要有一个清晰的认识, 要尽量地控制闭包的范围和牵涉到的自由变量, 一个非常值得警惕的地方是: 尽量不要在闭包中直接引用一个类的成员变量和函数, 这样会导致整个类实例被序列化. 这样的例子在 Spark 文档中也有提及, 如下:
- class MyClass {
- def func1(s: String): String = { ... }
- def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
- }
然后, 一个好的组织代码的方式是: 除了那些很短小的函数, 尽量把复杂的操作封装到全局单一的函数体: 全局静态方法或者函数对象
如果确实需要某个类的实例参与到计算过程中, 则要作好相关的序列化工作.
来源: http://www.bubuko.com/infodetail-3306605.html