我们上篇文章中讲了, RDD 的持久化是 spark 优化中必须掌握的, 并且, 在内存不足的情况下, 我们可以将持久化类型选择为 MEMORY_ONLY_SER, 减少内存的占用, 持久化更多的 partition, 并且不同的序列化方法也会影响序列化性能.
下面, 我们就来测试下, 持久化级别和序列化方法的选择对 RDD 持久化大小的影响.
我选择了一个 170.9MB 的日志文件, 传到了百度网盘 https://pan.baidu.com/s/1vXTl3ZfCF6sSfzOfWGWJUA
提取码: ffae
测试环境是 Windows,
IDEA 参数配置
MEMORY_ONLY
代码为
- case class CleanedLog(cdn:String,region:String,level:String,date:String,ip:String, domain:String, url:String, traffic:String)
- object KyroTest {
- def main(args: Array[String]) {
- val inputPath=new Path(args(0))
- val outputPath=new Path(args(1))
- val fsConf=new Configuration()
- val fs= FileSystem.get(fsConf)
- if (fs.exists(outputPath)) {
- fs.delete(outputPath,true)
- val path=args(1).toString
- println(s"已删除已存在的路径 $path")
- }
- val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
- //conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- //conf.set("spark.kryo.registrationRequired", "true")
- val sc = new SparkContext(conf)
- val logs = sc.textFile(args(0))
- //logs.filter(_.split("\t").length==8).take(10).foreach(println(_))
- val logsCache=logsCahe(logs)
- // 序列化的方式将 rdd 存到内存
- saveAtLocal(logsCache,args(1))
- Thread.sleep(100000)
- }
- def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={
- logs.filter(_.split("\t").length==8).map(x=>{
- val fields=x.split("\t")
- CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))
- }).cache()
- }
- def saveAtLocal(logsCache: RDD[CleanedLog], outputPath: String) = {
- logsCache.map(x=>{
- x.cdn+"\t"+x.region+"\t"+x.level+"\t"+x.date+"\t"+x.ip+"\t"+x.domain+"\t"+x.url+"\t"+x.traffic
- }).repartition(1).saveAsTextFile(outputPath)
- }
- }
代码逻辑就是输入是什么内容, 输就是什么内容, 在中间我将输入的文本 RDD 进行了 memory_only 持久化, 我们就看这个持久化内存占多少
显然, input size 大小是 170.9 MB, 但是持久化之后是 908.5 MB, 显然占据内存空间增大了好几倍, 如果在生产上, 内存资源不足的情况下, 这种方式显然缓存不了不少 partition
时间耗费 14s
MEMORY_ONLY_SER 未使用 kryo 序列化
- def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={
- logs.filter(_.split("\t").length==8).map(x=>{
- val fields=x.split("\t")
- CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))
- }).persist(StorageLevel.MEMORY_ONLY_SER)
代码仅更改了 persist(StorageLevel.MEMORY_ONLY_SER)
显然, input size 大小是 170.9 MB, 但是持久化之后是有 204.9MB, 所以序列化对于节约内存空间是很有帮助的.
时间耗费 11s
MEMORY_ONLY_SER 使用 kryo 序列化未注册
- val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
与上一代码相比, 为 SparkConf 设置了开启 kryo 序列化, 不是默认的 java 序列化了, 但是没有进行具体的类注册!
显然, input size 大小是 170.9 MB, 但是持久化之后是有 230.8MB, 使用未注册的 kryo 序列化竟然比使用 java 序列化还臃肿! 原因是: 每一个对象实例的序列化结果都会包含一份完整的类名, 造成了大量的空间浪费!
时间是 9s, 比 java 序列化快了一些.
MEMORY_ONLY_SER 使用 kryo 序列化并注册
- val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))
添加了 String 类和自定义样例类的 kryo 注册
显然, input size 大小是 170.9 MB, 使用注册的 kryo 序列化之后, 只有 175.7MB, 时间也才 9 秒, 很舒服!
所以在目前为止, 使用 kryo 序列化并注册是性能最好得了!!!
如果 CPU 还是那么悠闲的话, 我们还有另外一个进一步优化点!
注册 kryo 序列化并开启 RDD 压缩
注意: RDD 压缩只能存在于序列化的情况下
- val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))
- conf.set("spark.rdd.compress","true")
持久化的大小仅有 45.6MB!!!
spark.rdd.compress
这个参数决定了 RDD Cache 的过程中, RDD 数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上. 当然是为了进一步减小 Cache 数据的尺寸, 对于 Cache 在磁盘上而言, 绝对大小大概没有太大关系, 主要是考虑 Disk 的 IO 带宽. 而对于 Cache 在内存中, 那主要就是考虑尺寸的影响, 是否能够 Cache 更多的数据, 是否能减小 Cache 数据对 GC 造成的压力等.
这两者, 前者通常不会是主要问题, 尤其是在 RDD Cache 本身的目的就是追求速度, 减少重算步骤, 用 IO 换 CPU 的情况下. 而后者, GC 问题当然是需要考量的, 数据量小, 占用空间少, GC 的问题大概会减轻, 但是是否真的需要走到 RDD Cache 压缩这一步, 或许用其它方式来解决可能更加有效.
所以这个值默认是关闭的, 但是如果在磁盘 IO 的确成为问题或者 GC 问题真的没有其它更好的解决办法的时候, 可以考虑启用 RDD 压缩.
来源: http://www.bubuko.com/infodetail-3065094.html