spark(spark-2.10)算子 (如 map,filter 等) 的源码实现, 都会调用 ClosureCleaner.clean 对传入的 function 进行检查和清理. 其中有一步检查是, 如果 function 包含了 return, 则直接失败. 有关代码如下:
- // Fail fast if we detect return statements in closures
- getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
比如说, 写下如下代码:
- val rdd = sc.parallelize(1 to 10)
- println(rdd.map(x => return x * 2).collect())
你会发现运行时抛出如下异常:
Exception in thread "main" org.apache.spark.util.ReturnStatementInClosureException: Return statements aren't allowed in Spark closures
可是为什么 spark 会做这个检查呢? 这和 scala 中对 return 的实现有关.
先来聊一聊 anonymous function 中的 return
scala 中的 return 和 java 中的 return, 是不一样的. 在 scala 的 anonymous function(后面也简写为 anonymous func)中使用 return 时, 代码并不像看起来的那样.
先看一段代码:
- def main(args: Array[String]) {
- def d1(n:Int): Int = n * 2 // 定义 d1, 将输入值乘以 2, 未使用 return
- def d2(n:Int): Int = return n * 2 // 定义 d2, 将输入值乘以 2, 使用了 return
- val list = List(1, 2, 3)
- println(list.map(d1)) // 调用 d1 将 list 中的值都乘以 2
- println(list.map(d2)) // 调用 d2 将 list 中的值都乘以 2
- println(list.map(x => x * 2)) // 使用 anonymous func 将 list 中的值乘以 2, 没 return
- println(list.map(x => return x * 2)) // 使用 anonymous func 将 list 中的值都乘以 2, 有 return
- }
输出如下:
- List(2, 4, 6)
- List(2, 4, 6)
- List(2, 4, 6)
只输出了三个预期结果, 预期的第四个结果不见了.
这是为什么呢?
anonymous func 中 return 的实现方式: NonLocalReturnException
scala 是用抛出异常的方式来实现 anonymous func 中的 return 的, Scala Language Specification(下文简称 SLS) 6.20 Return Expressions 中的原文如下:
Returning from a nested anonymous function is implemented by throwing and catching a scala.runtime.NonLocalReturnException.
将上一节中的代码编译后生成的 class 文件, 反编译成 java 代码, 可以看到:
对于具名方法, 如 d1 和 d2, 无论是否显式的用了 return, 在反编译出来的 java 代码中, 都是用 java return 实现的. 下面以 d1 的反编译代码为例:
- //println(list.map(d1))
- // 创建了一个 anonymous function, 并在其 apply 方法中调用 d1
- Predef..MODULE$.println(list.map(new AbstractFunction1.mcII.sp() {
- public static
- final long serialVersionUID = 0L;
- public int apply$mcII$sp(int n) {
- return TestMain..MODULE$.com$iflytek$gnome$data$tmpsupport$main$TestMain$$d1$1(n);
- }
- public
- final int apply (int n) {
- return apply$mcII$sp(n);
- }
- }, List..MODULE$.canBuildFrom()));
- // 方法 d1, 仍是 java 的 return
- public
- final int com$iflytek$gnome$data$tmpsupport$main$TestMain$$d1$1 (int n) {
- return n * 2;
- }
对于未包含 return 的 anonymous function, 在反编译后的代码中, 则也是创建了一个 anonymous func, 并使用了 java 的 return.
- // 创建了 anonymous func, 并在其 apply 方法中直接处理 x * 2 的逻辑.
- Predef..MODULE$.println(list.map(new AbstractFunction1.mcII.sp()
- {
- public static final long serialVersionUID = 0L;
- public int apply$mcII$sp(int x)
- {
- return x * 2;
- }
- public final int apply(int x)
- {
- return apply$mcII$sp(x);
- }
- }, List..MODULE$.canBuildFrom()));
对于包含 return 的 anonymous function, 则是抛出了异常.
- Predef..MODULE$.println(list.map(new AbstractFunction1()
- {
- public static final long serialVersionUID = 0L;
- private final Object nonLocalReturnKey1$1;
- public final Nothing. apply(int x)
- {
- // 这里有个 x * 2, 但是没有作为返回值, 后面紧接着抛出异常了.
- (x * 2);throw new NonLocalReturnControl.mcV.sp(this.nonLocalReturnKey1$1, BoxedUnit.UNIT);
- }
- }, List..MODULE$.canBuildFrom()));
执行流程并不如我们所想, 代入 1 执行返回后, 再代入 2 执行. 而是在代入 1 后, 就直接抛出了异常, 这就是为什么最后一句 println 没有输出. 其实, 我们可以试着在最后一句 println 之后再加几句 println("test")之类的, 会发现也打印不出来.
到这里, 我们知道了 ,anonymous function 中的 return 是以 NonLocalReturnException 实现的. 可, 啥是 non-local return?
non-local return
引用内容及有关代码来自知乎 - 怎么理解 non-local return https://www.zhihu.com/question/22240354/answer/64673094 .
Non-local return 是 non-local control-flow 的一种.
这里的 non-local 指的是: 控制流并不转移到当前函数内的某个地方(return 的话, 并不从当前函数返回到其直接调用者), 而是转移到更外层的调用者去. 举例:
- local return
- function f() {
- g()
- return
- }
- function g() {
- return
- }
这样从 f()调用 g(),g()里的 return 就属于 local return--f()是 g()的直接调用者, g()里的 return 将控制流转移到其直接调用者处.
- non-local return
- function a() {
- b() // non-local return if returns to after this call
- return
- }
- function b() {
- c(function () { // 类似 map(x => return x * 2)
- return // <- the return in question
- })
- return
- }
这个例子里
如果 b()里的匿名函数出现的 return 在执行后是返回到 c()里, 那就是 local return.
如果 b()里的匿名函数出现的 return 在执行后是返回到 a()里, 那就是 non-local return.
我们再来回顾一下前文中, 在 anonymous func 中使用 return 的代码:
- def main(args: Array[String]) {
- val list = List(1, 2, 3)
- println(list.map(x => return x * 2)) // 使用 anonymous func 将 list 中的值都乘以 2, 有 return
- }
在反编译后的 java 代码中可以看到, 异常打断了 println, 直接返回到 main 中. 异常之后会被 main 的 catch 捕捉到. 代码的大概流程是下面这个样子:
- public void main(String[] args) {
- Predef..MODULE$.println(list.map(new AbstractFunction1() {
- ...
- throw new NonLocalReturnControl.mcV.sp(...);
- ...
- }
- catch (NonLocalReturnControl localNonLocalReturnControl) {}
也就是说, scala 采用抛出异常的方式, 实现了 non-local return.
我们再从 SLS 来理解下这个 non-local return.
SLS 6.20 Return Expressions 说, 一个 return 表达式, 必须发生在一个 named method 里. 可 anonymous function 明显是没名字的嘛.
你说等等, 反编译后的 java 版本的 anonymous function 明明有个 apply 方法啊. 可 SLS 还说了, 这个 apply 方法是不算数的. 所以, 还是要往上层找啊. 直到找到了 main, 因为 main, 是离这个 return 最近的 named method(下文中将这个 named method 称为 return 的 innermost enclosing method). 也就是说, return 要直接把 main 中止掉, 也就造就了 non-local return.
可是, 怎么中止啊? java 的 return(这里强调是 java 中的关键字 return)是不行的, 不过 Exception 很好使啊. 所以, anonymous function 中的 return, 反编译到 java 后, 就成了异常了.
SLS 中有关原文如下:
A return expression return e must occur inside the body of some enclosing named method or function.
An apply method which is generated by the compiler as an expansion of an anonymous function does not count as a named function in the source program, and therefore is never the target of a return expression.
原文中是 named method or function, 而我阐述时, 只取了 named method, 而且我在全文中使用 method 和 function 比较多, 而不是 "方法" 和 "函数", 具体原因可参见文末 function VS method 一节.
找到 return 的 innermost enclosing method
在上文的例子中, return 的 innermost enclosing method 不就是 main 么? 为啥这里还要 "找到"?
再看一遍这个代码:
- def main(args: Array[String]) {
- val list = List(1, 2, 3)
- println(list.map(x => return x * 2)) // 使用 anonymous func 将 list 中的值乘以 2, 有 return
- }
我钻了个牛角尖. 离 return 更近的, 不是还有 map 和 println 嘛? 它们都是 named method 啊? 为啥它们不是 return 的 innermost enclosing method 呢?
map 和 println 是第三方的 lib,class 文件都是编译好的, 再重新编译它们的 class 文件, 将 Exception 加进去么? 匪夷所思了.
这里的 innermost enclosing method, 是在 definition 层面的 enclosing, 而非 execution 层面的 enclosing.
type of innermost enclosing method
我还想聊一聊, 有关这个 main 的返回值类型问题. SLS 说, 这个 innermost enclosing method 的返回值类型, 必须和 return e 中的 e 保持一致. 不过有时候, e 可能会被忽略, return e 会被直接当成 return ().
我们来看几个例子:
return e 被 当成 return(). 直接拿上文的例子. return 的是整数, 不过 main 的返回值是 unit, 这种情况下, return x * 2 就被当成 return()了.
- def main(args: Array[String]) {
- val list = List(1, 2, 3)
- println(list.map(x => return x * 2)) // 使用 anonymous func 将 list 中的值乘以 2, 有 return
- }
明确定义 main 的返回值类型为 Int. 这样编译也能通过, 不过因为要保证所有的分支都返回整型, 所以最后加了个 1(不过这段代码 run 不起来, 因为最外面的那个 main 返回必须是 unit(void), 才能 run 起来).
- def main(args: Array[String]): Int = {
- val list = List(1, 2, 3)
- println(list.map(x => return x * 2)) // 使用 anonymous func 将 list 中的值乘以 2, 有 return
- 1
- }
明确定义 main 的返回值类型为 String. 这下编译就出错了.
- def main(args: Array[String]): String = {
- val list = List(1, 2, 3)
- println(list.map(x => return x * 2)) // 使用 anonymous func 将 list 中的值乘以 2, 有 return
- 1
- }
spark 失败在 return 的原因
说了这么多, 最开始的问题, 虽没有明确回答, 却已现端倪.
猜测 1
像 rdd.map(return x => x * 2)这种东西, 若真能执行的话, 那么在代入第一条记录后, 就会直接退出整批记录的处理了, 明显与用户期望相去甚远啊. 干脆毙了你.
在看猜测 2 之前, 我们又得引用一下 SLS 了. 它说, scala 会确保这个异常一定是被 return 的 innermost enclosing method 捕捉到, 如果不是, 就会往外面传播. 可有的时候, return 执行时(也就是抛异常时), 这个 innermost enclosing method 说不定已经先执行完了. 那么, 这个异常就只能一直往外抛了, 直到把程序抛跪.
原文如下:
Returning from a nested anonymous function is implemented by throwing and catching a scala.runtime.NonLocalReturnException. Any exception catches between the point of return and the enclosing methods might see the exception. A key comparison makes sure that these exceptions are only caught by the method instance which is terminated by the return.
If the return expression is itself part of an anonymous function, it is possible that the enclosing instance of f has already returned before the return expression is executed. In that case, the thrown scala.runtime.NonLocalReturnException will not be caught, and will propagate up the call stack.
一个参考 stackoverflow-Is non-local return in Scala new? https://stackoverflow.com/questions/6915701/is-non-local-return-in-scala-new 的例子.
异常被成功捕获
执行下面的代码, 安安静静就结束了. 程序第一次调用 g()的时候就退出了, 根本没后面代码的事情. 此时, return 的 innermost enclosing method 是 main,main 捕获到异常后, 一看,"啊, 是我的异常", 然后就把异常默默吞掉了, 然后就世界和平了.
- def main(args: Array[String]) {
- var g: () => Unit = () => return
- g() // 执行到这, 程序就退出了
- def f() { g = () => return }
- f() // set g
- g() // scala.runtime.NonLocalReturnControl$mcI$sp
- }
异常抛出来了
把 1 中的代码的第一个 g()去掉, 如下. 执行后你会发现, console 里躺着一句:
Exception in thread "main" scala.runtime.NonLocalReturnControl$mcV$sp
- .
- def main(args: Array[String]) {
- var g: () => Unit = () => return // 第一个 anonymous function
- def f() { g = () => return } // 第二个 anonymous function
- f() // set g
- g() // scala.runtime.NonLocalReturnControl$mcI$sp
- }
其实, 这段代码里有两个 anonymous function, 别看两个长得一模一样, 看反编译后的代码就知道了, 它们可不是同一个. 而 g 呢? 不过是引用了 anonymous function 的一个 variable 而已.
当程序最后调用 g()的时候, 执行是第二个 anonymous function 中的 return.
问题来了. 这第二个 anonymous function 中的 return 的 inner most enclosing method 是谁? 是 f. 可 f 已经跑完了(f 重新设置了 g, 却没有调用 g). 此时, 异常被抛给了 main,main 一看,"哎呀, 这不是我的异常, 继续扔吧". 然后, 异常就躺在 console 里了.
猜测 2
所以说, return 还会导致另一个问题, 编译的时候好好的, 运行时却会抛出致命的异常(致命的意思就是把我们的程序搞跪了).
而 spark 算子里的 function 实际都要分发到 executor 上才会执行. 那会儿, 在 definition 阶段给 return 找的 innermost enclosing method, 早不知去哪了吧.
不管怎样, 到这里, 也算是对本文开头提出的疑问, 有一个交代了.
最后, 聊一聊 scala 里的 function 和 method.
function VS method
function: 函数.
method: 方法.
我们在用这两个词的时候, 尤其是在 java 里, 大部分情况下没啥区别. 在 SLS 里, 有时候, 这两个词, 用的也挺混的.
嗯, 下面的内容又是参考的 stackoverflow-Difference between method and function in Scala https://stackoverflow.com/questions/2529184/difference-between-method-and-function-in-scala .
先来了解几个 SLS 中的概念:
A Function Type is (roughly) a type of the form (T1, ..., Tn) => U.
An Anonymous Function is an instance of a Function Type.
A method type is a def declaration - everything about a def except its body.
A method value actually has a Function Type
method 就像是 java 中类的方法, 而 function 则更倾向于用来指代一个 object(类似 java 中的 class).This object has an apply method which receives N parameters of types T1, T2, ..., TN, and returns something of type R.
我想, 可没有所谓的 named function. 对了, 上文中有这样的代码:
var g: () => Unit = () => return
这里的 g, 只是个 Function Type 的 variable 而已, 可不是什么 function.
来源: http://www.jianshu.com/p/2053634328d3