那么本篇就介绍下如何利用 Ui 做性能分析,因为本人的经验也不是很丰富,所以只能作为一个入门的介绍。
如果是单机版本,在单机调试的时候输出信息中已经提示了 UI 的入口:
- 17/02/26 13:55:48 INFO SparkEnv: Registering OutputCommitCoordinator
- 17/02/26 13:55:49 INFO Utils: Successfully started service 'SparkUI' on port 4040.
- 17/02/26 13:55:49 INFO SparkUI: Started SparkUI at http://192.168.1.104:4040
- 17/02/26 13:55:49 INFO Executor: Starting executor ID driver on host localhost
单机调试的时候,可以直接登陆:
如果是集群模式,可以通过 Spark 日志服务器 xxxxx:18088 者 yarn 的 UI 进入到应用 xxxx:8088, 进入相应的 Spark UI 界面。
上面就是 Spark 的 UI 主页,首先进来能看到的是 Spark 当前应用的 job 页面,在上面的导航栏:
下面挨个介绍一下各个页面的使用方法和实践,为了方便分析,我这里直接使用了分布式计算里面最经典的 helloworld 程序——WordCount, 这个程序用于统计某一段文本中一个单词出现的次数。原始的文本如下:
- for the shadow of lost knowledge at least protects you from many illusions
这句话也是我最喜欢的一句,意思是:"知识,哪怕是知识的幻影,也会成为你的铠甲,保护你不被愚昧反噬"(来自知乎——《为什么读书?》)
程序代码如下:
- public static void main(String[] args) throws InterruptedException {
- SparkConf sparkConf = new SparkConf();
- sparkConf.setMaster("local[2]");
- sparkConf.setAppName("test-for-spark-ui");
- JavaSparkContext sc = new JavaSparkContext(sparkConf);
- //知识,哪怕是知识的幻影,也会成为你的铠甲,保护你不被愚昧反噬。
- JavaPairRDD<String,Integer> counts = sc.textFile( "C:\\Users\\xinghailong\\Desktop\\你为什么要读书.txt" )
- .flatMap(line -> Arrays.asList(line.split(" ")))
- .mapToPair(s -> new Tuple2<String,Integer>(s,1))
- .reduceByKey((x,y) -> x+y);
- counts.cache();
- List<Tuple2<String,Integer>> result = counts.collect();
- for(Tuple2<String,Integer> t2 : result){
- System.out.println(t2._1+" : "+t2._2);
- }
- sc.stop();
- }
这个程序首先创建了 SparkContext,然后读取文件,先使用 ` ` 进行切分,再把每个单词转换成二元组,再根据 key 进行累加,最后输出打印。为了测试 storage 的使用,我这对计算的结果添加了缓存。
主页可以分为两部分,一部分是
,另一部分是进行中和完成的 job 任务。
- event timeline
第一部分
展开后,可以看到 executor 创建的时间点,以及某个 action 触发的算子任务,执行的时间。通过这个时间图,可以快速的发现应用的执行瓶颈,以及触发了多少个 action。
- event timeline
第二部分的图表,显示了触发 action 的 job 名字,它通常是某个 count,collect 等操作。有 spark 基础的人都应该知道,在 spark 中 rdd 的计算分为两类,一类是 transform 转换操作,一类是 action 操作,只有 action 操作才会触发真正的 rdd 计算。具体的有哪些 action 可以触发计算,可以参考 api。
描述了 action 的名字和所在的行号,这里的行号是精准匹配到代码的,所以通过它可以直接定位到任务所属的代码,这在调试分析的时候是非常有帮助的。
- collect at test2.java:28
显示了该 action 的耗时,通过它也可以对代码进行专门的优化。最后的进度条,显示了该任务失败和成功的次数,如果有失败的就需要引起注意,因为这种情况在生产环境可能会更普遍更严重。点击能进入该 action 具体的分析页面,可以看到 DAG 图等详细信息。
- Duration
在 Spark 中 job 是根据 action 操作来区分的,另外任务还有一个级别是 stage,它是根据宽窄依赖来区分的。
窄依赖是指前一个 rdd 计算能出一个唯一的 rdd,比如 map 或者 filter 等;宽依赖则是指多个 rdd 生成一个或者多个 rdd 的操作,比如 groupbykey reducebykey 等,这种宽依赖通常会进行 shuffle。
因此 Spark 会根据宽窄依赖区分 stage,某个 stage 作为专门的计算,计算完成后,会等待其他的 executor,然后再统一进行计算。
stage 页面的使用基本上跟 job 类似,不过多了一个 DAG 图。这个 DAG 图也叫作血统图,标记了每个 rdd 从创建到应用的一个流程图,也是我们进行分析和调优很重要的内容。比如上面的 wordcount 程序,就会触发 acton,然后生成一段 DAG 图:
从这个图可以看出,wordcount 会生成两个 dag,一种一个是从读数据到切分到生成二元组,第二个进行了 reducebykey,产生 shuffle。
点击进去还可以看到详细的 DAG 图,鼠标移到上面,可以看到一些简要的信息。
storage 页面能看出目前使用的缓存,点击进去可以看到具体在每个机器上,使用的 block 的情况。
这个页面一般不太用,因为环境基本上不会有太多差异的,不用时刻关注它。
这个页面比较常用了,一方面通过它可以看出来每个 excutor 是否发生了数据倾斜,另一方面可以具体分析目前的应用是否产生了大量的 shuffle,是否可以通过数据的本地性或者减小数据的传输来减少 shuffle 的数据量。
在 Spark 应用里面可以直接使用 System.out.println 把信息输出出来,系统会直接拦截 out 输出到 spark 的日志。像我们使用的 yarn 作为资源管理系统,在 yarn 的日志中就可以直接看到这些输出信息了。这在数据量很大的时候,做一些 show()(默认显示 20),count() 或者 take(10) 的时候会很方便。
当任务失败,收到 sparkContext shutdown 的信息时,基本都是执行者的内存不够。这个时候,一方面可以调大 --excutor-memory 参数,另一方面还是得回去看看程序。如果受限于系统的硬件条件,无法加大内存,可以采用局部调试法,检查是在哪里出现的内存问题。比如,你的程序分成几个步骤,一步一步的打包运行,最后检查出现问题的点就可以了。
线程池不够,这个是因为 --excutor-core 给的太少了,出现线程池不够用的情况。这个时候就需要调整参数的配置了。
这种问题一般是 driver memory 不够导致的,driver memory 通常存储了以一些调度方面的信息,这种情况很有可能是你的调度过于复杂,或者是内部死循环导致。
在 Spark 的计算中,不太建议直接使用 cache,万一 cache 的量很大,可能导致内存溢出。可以采用 persist 的方式,指定缓存的级别为 MEMORY_AND_DISK, 这样在内存不够的时候,可以把数据缓存到磁盘上。另外,要合理的设计代码,恰当地使用广播和缓存,广播的数据量太大会对传输带来压力,缓存过多未及时释放,也会导致内存占用。一般来说,你的代码在需要重复使用某一个 rdd 的时候,才需要考虑进行缓存,并且在不使用的时候,要及时 unpersist 释放。
这个点,在优化的过程中是很重要的。比如你需要把两个 rdd 按照某个 key 进行 groupby,然后在进行 leftouterjoin,这个时候一定要考虑大小表的问题。如果把大表关联到小表,那么性能很可能会很惨。而只需要简单的调换一下位置,性能就可能提升好几倍。
大数据计算总是充满了各种神奇的色彩,节点之间的分布式,单节点内多线程的并行化,只有多去了解一些原理性的东西,才能用好这些工具。
最后还是献上最喜欢的那句话——
- 知识,哪怕是知识的幻影,也会成为你的铠甲,保护你不被愚昧反噬。
来源: http://www.cnblogs.com/xing901022/p/6445254.html