本篇博客主要是 sparksql 从初始开发注意的一些基本点以及力所能及的可优化部分的介绍:
所使用 spark 版本: 2.0.0 scala 版本: 2.11.8
1. SparkSession 的初始化:
val sparkSession = SparkSession.builder().master("local[*]").appName("AppName").config("spark.sql.warehouse.dir", "file:///D:/XXXX/XXXX/spark-warehouse").config("spark.sql.shuffle.partitions", 50).getOrCreate()
注意点:
a. spark.sql.warehouse.dir 需要显示设置, 否则会抛出 Exception in thread "main" java.lang.IllegalArgumentException: java.NET.URISyntaxException: Relative path in absolute URI: file:... 错误
b. spark.sql.shuffle.partitions 指定 Shuffle 时 Partition 个数, 也即 Reducer 个数. 根据业务数据量测试调整最佳结果
Partition 个数不宜设置过大:
Reducer(代指 Spark Shuffle 过程中执行 Shuffle Read 的 Task) 个数过多, 每个 Reducer 处理的数据量过小. 大量小 Task 造成不必要的 Task 调度开销与可能的资源调度开销 (如果开启了 Dynamic Allocation)
Reducer 个数过大, 如果 Reducer 直接写 HDFS 会生成大量小文件, 从而造成大量 addBlock RPC,Name node 可能成为瓶颈, 并影响其它使用 HDFS 的应用
过多 Reducer 写小文件, 会造成后面读取这些小文件时产生大量 getBlock RPC, 对 Name node 产生冲击
Partition 个数不宜设置过小:
每个 Reducer 处理的数据量太大, Spill 到磁盘开销增大
Reducer GC 时间增长
Reducer 如果写 HDFS, 每个 Reducer 写入数据量较大, 无法充分发挥并行处理优势
2. 将非结构化数据转换为结构化数据 DataFrame(本人用的自定义模式):
- val rdd= sparkSession.sparkContext.textFile(path, 250) // 默认 split 为 2
- val schemaString = "time hour lic" // 结构化数据的列名, 可理解为关系型数据库的列名
- val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) // 字段名 字段类型 是否可为空
- val schema = StructType(fields) // 上两步组装最终 createDataFrame 时需要的 schema
- val rowRDD = citySECRDD.map(_.split(",")).filter(attributes => attributes.length>= 6 && attributes(1).equals("2")&& attributes(0).split("").length> 1 && attributes(0).split(" ")(1).split(":").length> 1).map(attributes => {Row(attributes(0).trim,attributes(0).split(" "(1).split(":")(0).trim,attributes(2).trim,attributes(3).trim,attributes(4).trim,attributes(5).trim)}) // 自定义一些过滤条件 以及组装最终的 row 类型的 RDD
- val df= sparkSession.createDataFrame(rowRDD, schema) // 将 rdd 装换成 DataFrame
3. 两种缓存使用方式:
- 1)df.persist(StorageLevel.MEMORY_ONLY) // 后续如果需要反复使用 DF[DataFrame 的简称], 则就把此 DF 缓存起来
- df.unpersist() // 释放缓存
常用的两种序列化方式: MEMORY_ONLY-> 不加工在内存中存储 MEMORY_ONLY_SER-> 在内存中序列化存储 (占用内存空间较小)
- 2)df.createOrReplaceTempView("table")
- sparkSession.sql("cache table table") // 以 sql 形式缓存 DF
- sparkSession.sql("uncache table table") // 释放缓存
4.spark 整合 Hbase 快速批量插入
将计算结果写入 Hbase:
注意: 1) 如果是带有 shuffle 过程的, shuffle 计算之前使用 select() 提出只需要的字段然后再进行计算, 因为 shuffle 特别耗费时间, 写磁盘的过程, 所以要能少写就少写.
- df.foreachPartition(partition => {
- val hconf = HBaseConfiguration.create();
- hconf.set(zkClientPort, zkClientPortValue) //zk 端口
- hconf.set(zkQuorum, zkQuorumValue) //zk 地址
- hconf.set(hbaseMaster, hbaseMasterValue) //hbase master
- val myTable = new HTable(hconf, TableName.valueOf(tableName))
- myTable.setAutoFlush(false, false) // 关键点 1
- myTable.setWriteBufferSize(5 * 1024 * 1024) // 关键点 2
- partition.foreach(x => {
- val column1 = x.getAs[String]("column1") // 列 1
- val column2 = x.getAs[String]("column2") // 列 2
- val column3 = x.getAs[Double]("column3") // 列 3
- val date = dateStr.replace("-", "") // 格式化后的日期
- val rowkey = MD5Hash.getMD5AsHex(Bytes.toBytes(column1+ date)) + Bytes.toBytes(hour)
- val put = new Put(Bytes.toBytes(rowkey))
- put.add("c1".getBytes(), "column1".getBytes(), licPlateNum.getBytes()) // 第一列族 第一列
- put.add("c1".getBytes(), "column2".getBytes(), hour.getBytes()) // 第一列族 第二列
- put.add("c1".getBytes(), "column3".getBytes(), interval.toString.getBytes()) // 第一列族 第三列
- put.add("c1".getBytes(), "date".getBytes(), date.getBytes()) // 第一列族 第四列
- myTable.put(put)
- })
- myTable.flushCommits() // 关键点 3
- /*
- * 关键点 1_: 将自动提交关闭, 如果不关闭, 每写一条数据都会进行提交, 是导入数据较慢的做主要因素.
- 关键点 2: 设置缓存大小, 当缓存大于设置值时, hbase 会自动提交. 此处可自己尝试大小, 一般对大数据量, 设置为 5M 即可, 本文设置为 3M.
- 关键点 3: 每一个分片结束后都进行 flushCommits(), 如果不执行, 当 hbase 最后缓存小于上面设定值时, 不会进行提交, 导致数据丢失.
- 注: 此外如果想提高 Spark 写数据如 Hbase 速度, 可以增加 Spark 可用核数量.
- */
5. spark 任务提交 shell 脚本:
- spark-submit --jars /XXX/XXX/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar \
- --master yarn\
- --num-executors 200 \
- --conf "spark.driver.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \
- --conf "spark.executor.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \
- --conf spark.driver.cores=2 \
- --conf spark.driver.memory=10g \
- --conf spark.driver.maxResultSize=2g \
- --conf spark.executor.cores=6 \
- --conf spark.executor.memory=10g \
- --conf spark.shuffle.blockTransferService=nio \
- --conf spark.memory.fraction=0.8 \
- --conf spark.shuffle.memoryFraction=0.4 \
- --conf spark.default.parallelism=1000 \
--conf spark.sql.shuffle.partitions=400 \ 默认 200, 如果项目中代码设置了此选项, 则代码设置级别优先, 会覆盖此处设置
- --conf spark.shuffle.consolidateFiles=true \
- --conf spark.shuffle.io.maxRetries=10 \
- --conf spark.scheduler.listenerbus.eventqueue.size=1000000 \
--class XXXXX\ 项目启动主类引用
--name zzzz \
/data/XXX/XXX-jar-with-dependencies.jar \ 项目 jar 包
"参数 1" "参数 2"
注: 红色部分是 Hbase 需要的配置, 同时需要在 spark 集群的 spark-defaults.conf 里面配置
spark.driver.extraClassPath 和 spark.executor.extraClassPath 直指 hbase-protocol-0.96.1.1-cdh5.0.2.jar 路径
先写到这里吧, 后续会继续完善通过 sparkUi 优化细节以及提交 spark 任务的时候 如何分配 executor.cores 和 executor.memory.
来源: https://www.cnblogs.com/ityuanmanito/p/9945447.html