1, 避免创建重复的 RDD 和不必要的内存空间浪费
错误代码:
- val rdd1 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt")
- rdd1.map(...)
- val rdd2 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt")
- rdd2.reduce(...)
错误解析:
这种情况下, Spark 需要从文件中加载两次 hello.txt 文件的内容, 并创建两个单独的 RDD; 第二次加载 HDFS 文件以及创建 RDD 的性能开销, 很明显是白白浪费掉的
正确代码:
- val rdd1 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt")
- rdd1.map(...)
- rdd1.reduce(...)
这种写法很明显比上一种写法要好多了, 因为我们对于同一份数据只创建了一个 RDD, 然后对这一个 RDD 执行了多次算子操作
2, 尽最大可能复用同一个 RDD
错误代码:
- val rdd1 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt")
- Val rdd2 = rdd1.map(...)
- // 分别对 rdd1 和 rdd2 进行了不同的算子操作
- rdd1.reduceByKey(...)
- rdd2.map(...)
错误解析:
上面这个案例中, 其实 rdd1 和 rdd2 的区别无非就是数据格式不同而已, rdd2 的数据完全就是 rdd1 的子集而已, 却创建了两个 rdd, 并对两个 rdd 都执行了一次算子操作. 此时会因为对 rdd1 执行 map 算子来创建 rdd2, 而多执行一次算子操作, 进而增加性能开销.
正确代码:
- val rdd1 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt")
- rdd1.reduceByKey(...)
- rdd1.map(tuple._2...)
在进行第二个 map 操作时, 只使用每个数据的 tuple._2, 也就是 rdd1 中的 value 值, 即可. 第二种方式相较于第一种方式而言, 很明显减少了一次 rdd2 的计算开销
3, 对多次使用的 RDD 进行持久化操作
错误代码:
- val rdd1 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt")
- rdd1.map(...)
- .reduce(...)
- .reduceByKey(...)
错误解析:
Spark 中对于一个 RDD 执行多次算子的默认原理是这样的: 每次你对一个 RDD 执行一个算子操作时, 都会重新从源头处计算一遍, 计算出那个 RDD 来, 然后再对这个 RDD 执行你的算子操作. 这种方式的性能是很差的
正确代码:
- val rdd1 = sc.textFile("D:\\abc\\wordcount\\input\\hello.txt").cache()
- rdd1.map(...)
- rdd1.reduce(...)
或者
- Val rdd1=sc.textFile("D:\\abc\\wordcount\\input\\hello.txt").persist
- (StorageLevel.MEMORY_AND_DISK_SER)
- rdd1.map(...)
- rdd1.reduce(...)
如果要对一个 RDD 进行持久化, 只要对这个 RDD 调用 cache()和 persist()即可.
cache()方法表示: 使用非序列化的方式将 RDD 中的数据全部尝试持久化到内存中. 此时再对 rdd1 执行两次算子操作时, 只有在第一次执行 map 算子时, 才会将这个 rdd1 从源头处计算一次.
第二次执行 reduce 算子时, 就会直接从内存中提取数据进行计算, 不会重复计算一个 rdd.
persist()方法表示: 手动选择持久化级别, 并使用指定的方式进行持久化. 比如说, StorageLevel.MEMORY_AND_DISK_SER 表示, 内存充足时优先持久化到内存中, 内存不充足时持久化到磁盘文件中.
而且其中的_SER 后缀表示, 使用序列化的方式来保存 RDD 数据, 此时 RDD 中的每个 partition 都会序列化成一个大的字节数组, 然后再持久化到内存或磁盘中.
序列化的方式可以减少持久化的数据对内存 / 磁盘的占用量, 进而避免内存被持久化数据占用过多, 从而发生频繁 GC.(总共有 12 中方式源码)
4, 尽量避免使用 shuffle 算子
错误代码:
val rdd3 = rdd1.join(rdd2)
错误分析:
Spark 作业运行过程中, 最消耗性能的地方就是 shuffle 过程. 所以能避免则尽可能避免使用 reduceByKey,join,distinct,repartition 等会进行 shuffle 的算子, 尽量使用 map 类的非 shuffle 算子. 这样的话, 没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业, 可以大大减少性能开销
正确代码:
- val rdd2 = rdd2.collect()
- val rdd2Broadcast = sc.broadcast(rdd2)
Broadcast+map 的 join 操作, 不会导致 shuffle 操作. 使用 Broadcast 将一个数据量较小的 RDD 作为广播变量. 以上操作, 建议仅仅在 rdd2 的数据量比较少 (比如几百 M, 或者一两 G) 的情况下使用
因为每个 Executor 的内存中, 都会驻留一份 rdd2 的全量数据
5, 使用性能较高的算子
groupByKey 代码:
- val Data1 = lines.flatMap(_.split(" "))
- .map((_,1))
- .groupByKey().mapValues(_.sum)
reduceByKey 代码:
- val Data2 = lines.flatMap(_.split(" "))
- .map((_, 1))
- .reduceByKey(_ + _)
使用 reduceByKey 替代 groupByKey, 因为 reduceByKey 会先进行一次局部聚合
再比如使用 mapPartitions 替代普通 map
mapPartitions 类的算子, 一次函数调用会处理一个 partition 所有的数据, 而不是一次函数调用处理一条, 性能相对来说会高一些, 使用 mapPartitions 会出现 OOM(内存溢出)的问题, 所以使用这类操作时要慎重
6, 使用 Kryo 优化序列化性能
代码:
- object YouHua {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- .setAppName("haha")
- .setMaster("local[*]")
- .set("spark.testing.memory", "2147480000")
- .registerKryoClasses(Array(classOf[Student]))
- val sc = new SparkContext(conf)
- }
- }
- // spark 默认 Java 的序列化
- class Student{}
Kryo 序列化机制比 Java 序列化机制, 性能高 10 倍左右, 但是 spark 默认的是 java 的序列化, 因为这个不需要自己手动注册. 而 Kryo 序列化机制我们只要设置序列化类, 再注册要序列化的自定义类型即可, 注意有一个类需要注册一次所以说比较费劲
7, 解决数据倾斜
如何定位导致数据倾斜的代码:
数据倾斜只会发生在 shuffle 过程中. 这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子: distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,repartition 等. 出现数据倾斜时, 可能就是你的代码中使用了这些算子中的某一个所导致的
解决办法
(1)加盐
- val lines = sc.makeRDD(List("ni ni aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa aa"))
- lines.flatMap(_.split(" "))
- // 首先我们拼一个时间也可以拼随机数
- .map(x => (System.currentTimeMillis().toString + "-" + x,1))
- // 按照 key 聚合一下, 这样就会打散, 均匀分发各个分区
- .reduceByKey(_+_)
- // 去掉我们拼的前缀
- .map(x => (x._1.substring(14),x._2))
- // 最后在聚合得到我们想要的结果
- .reduceByKey(_+_)
- .mapPartitionsWithIndex(func1)
(2)自定义分区
- val func1 = (index:Int,iter:Iterator[(String,Int)]) => {
- iter.map(index +":" + _)
- }
- lines.flatMap(_.split(" "))
- .map(x => (x,1))
- // 按照自定义的分区随机分发数据
- .reduceByKey(new MyPartitioner(),_+_)
- // 在按照 HashPartitioner 聚合数据
- .reduceByKey(new HashPartitioner(4),_+_)
- .mapPartitionsWithIndex(func1)
- .foreach(println)
- class MyPartitioner extends Partitioner{
- override def numPartitions: Int = 4
- override def getPartition(key: Any): Int = {
- // 这里我们给返回随机数
- Random.nextInt(numPartitions)
- }
- }
(1)采样倾斜 key 并分拆 join 操作
如果出现数据倾斜, 是因为其中某一个 RDD/Hive 表中的少数几个 key 的数据量过大, 而另一个 RDD/Hive 表中的所有 key 都分布比较均匀, 那么采用这个解决方案是比较合适的
实现思路
举例: 一号 RDD 中多个 Key 都有数据倾斜, 二号 RDD 相对均衡
先从一号 RDD 中利用 sample 进行抽样 Val sampledRDD = rdd1.sample(false, 0.1)
对样本数据 RDD 统计出每个 key 的出现次数, 并按出现次数降序排序. 对降序排序后的数据, 取出 top 10 或者 top 100 的数据, 也就是 key 最多的前 n 个数据. 具体取出多少个数据量最多的 key, 由大家自己决定
从一号 RDD 中分离出来导致数据倾斜的 key, 形成新的 RDD
从一号 RDD 中拆出不导致数据倾斜的普通 key, 形成独立的 RDD
先给一号 RDD 中导致数据倾斜的 key 加盐 (随机数(或者自己想要的前缀)) 再从二号 RDD 中, 前面获取到的 key 对应的数据, 过滤出来, 分拆成单独的 rdd, 并对 rdd 中的数据也都加盐, 加上前面一号 RDD 的所有盐. 然后将这个一号 RDD 中分拆出来的独立 rdd, 与上面二号 RDD 分拆出来的独立 rdd, 进行 join
举例说明:
6. 将一号 RDD 中分拆出来的包含不倾斜 key 的独立 rdd, 直接与 rdd2 进行 join
7. 将倾斜 key join 后的结果与普通 key join 后的结果, uinon 起来就是最终的结果
来源: http://www.bubuko.com/infodetail-3259735.html