一, 什么是 Spark SQL
Spark SQL 是 Spark 用来处理结构化数据的一个模块, 它提供了两个编程抽象分别叫做 DataFrame 和 DataSet, 它们用于作为分布式 SQL 查询引擎. 从下图可以查看 RDD,DataFrames 与 DataSet 的关系.
image
二, 为什么要学习 Spark SQL?
我们已经学习了 Hive, 它是将 Hive SQL 转换成 MapReduce 然后提交到集群上执行, 大大简化了编写 MapReduce 的程序的复杂性, 由于 MapReduce 这种计算模型执行效率比较慢. 所以 Spark SQL 的应运而生, 它是将 Spark SQL 转换成 RDD, 然后提交到集群执行, 执行效率非常快! 所以我们类比的理解: Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD. 都是一种解析传统 SQL 到大数据运算模型的引擎, 属于数据分析的范围.
三, 什么是 DataFrame 和 DataSet?
首先, 最简单的理解我们可以认为 DataFrame 就是 Spark 中的数据表(类比传统数据库),DataFrame 的结构如下:
DataFrame(表)= Schema(表结构) + Data(表数据)
总结: DataFrame(表)是 Spark SQL 对结构化数据的抽象. 可以将 DataFrame 看做 RDD.
DataFrame
DataFrame 是组织成命名列的数据集. 它在概念上等同于关系数据库中的表, 但在底层具有更丰富的优化. DataFrames 可以从各种来源构建,
例如:
结构化数据文件(JSON)
外部数据库或现有 RDDs
DataFrame API 支持的语言有 Scala,Java,Python 和 R.
image
从上图可以看出, DataFrame 相比 RDD 多了数据的结构信息, 即 schema.RDD 是分布式的 Java 对象的集合. DataFrame 是分布式的 Row 对象的集合. DataFrame 除了提供了比 RDD 更丰富的算子以外, 更重要的特点是提升执行效率, 减少数据读取以及执行计划的优化.
DataSet
Dataset 是数据的分布式集合. Dataset 是在 Spark 1.6 中添加的一个新接口, 是 DataFrame 之上更高一级的抽象. 它提供了 RDD 的优点 (强类型化) 以及 Spark SQL 优化后的执行引擎的优点. 一个 Dataset 可以从 JVM 对象构造, 然后使用函数转换 (map, flatMap,filter 等) 去操作. Dataset API 支持 Scala 和 Java. Python 不支持 Dataset API.
四, 测试数据
1)启动 hdfs
start-dfs.sh
我们使用 2 个 CSV 文件作为部分测试数据:
dept.CSV 信息:
- 10,ACCOUNTING,NEW YORK
- 20,RESEARCH,DALLAS
- 30,SALES,CHICAGO
- 40,OPERATIONS,BOSTON
emp.CSV 信息:
- 7369,SMITH,CLERK,7902,1980/12/17,800,,20
- 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
- 7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
- 7566,JONES,MANAGER,7839,1981/4/2,2975,,20
- 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
- 7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
- 7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
- 7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
- 7839,KING,PRESIDENT,,1981/11/17,5000,,10
- 7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
- 7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
- 7900,JAMES,CLERK,7698,1981/12/3,950,,30
- 7902,FORD,ANALYST,7566,1981/12/3,3000,,20
- 7934,MILLER,CLERK,7782,1982/1/23,1300,,10
将这 2 个 CSV 文件 put 到 HDFS 的 hdfs://bigdata111:9000/input/csvFiles/ 目录以便后面使用
- [root@hadoop3 ~]# hadoop fs -put ./dept.CSV /input
- [root@hadoop3 ~]# hadoop fs -put ./emp.CSV /input
再启动 spark-shell 之前, 需要启动 spark
[root@hadoop3 ~]# spark-start-all.sh
方式 1: 使用 case class 定义表
(1) 定义 case class 代表表的结构 schema
scala>case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
(2) 导入 emp.CSV 文件(导入数据)
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))// 读取 Linux 本地数据
或者
scala>val lines = sc.textFile("hdfs://192.168.60.128:8020/input/csvFiles/emp.csv").map(_.split(","))// 读取 HDFS 数据
(3) 生成表: DataFrame
scala>val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4)由 allEmp 直接生成表
scala>val empDF = allEmp.toDF
(4) 操作: DSL 语句
- scala>empDF.show ----> select * from emp
- scala>empDF.printSchema ----> desc emp
结果如下:
image.PNG
方式 2: 使用 SparkSession 对象创建 DataFrame
什么是 SparkSession?
Apache Spark 2.0 引入了 SparkSession, 其为用户提供了一个统一的切入点来使用 Spark 的各项功能, 并且允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写 Spark 程序. 最重要的是, 它减少了用户需要了解的一些概念, 使得我们可以很容易地与 Spark 交互.
在 2.0 版本之前, 与 Spark 交互之前必须先创建 SparkConf 和 SparkContext. 然而在 Spark 2.0 中, 我们可以通过 SparkSession 来实现同样的功能, 而不需要显式地创建 SparkConf, SparkContext 以及 SQLContext, 因为这些对象已经封装在 SparkSession 中.
通过 SparkSession 可以访问 Spark 所有的模块!
使用 Sparksession 创建 DataFrame 过程:
(1)加载结构化数据
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))// 读取 Linux 数据
或者
scala>val lines = sc.textFile("hdfs://192.168.60.1128:8020/input/emp.csv").map(_.split(","))// 读取 HDFS 数据
(2) 定义 schema:StructType
- scala>import org.apache.spark.sql._
- scala>import org.apache.spark.sql.types._
- scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType)
- , StructField("ename", DataTypes.StringType)
- ,StructField("job", DataTypes.StringType)
- ,StructField("mgr", DataTypes.StringType)
- ,StructField("hiredate", DataTypes.StringType)
- ,StructField("sal", DataTypes.IntegerType)
- ,StructField("comm", DataTypes.StringType)
- ,StructField("deptno", DataTypes.IntegerType)))
(3)把读入的每一行数据映射成一个个 Row
scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4) 使用 SparkSession.createDataFrame 创建表
scala>val df = spark.createDataFrame(rowRDD,myschema)
然后:
df.show
结果如下:
image.PNG
方式 3: 直接读取格式化的文件 (JSON,CSV) 等 - 最简单
前提: 数据文件本身一定具有格式, 这里我们选取 JSON 格式的数据, JSON 文件可以使用 spark 例子中提供的 people.JSON. 你也可以使用任意 JSON 文件进行操作.
测试数据如下:
- [root@bigdata111 resources]# pwd
- /root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
- [root@bigdata111 resources]# ls
- full_user.avsc kv1.txt people.JSON people.txt user.avsc users.avro users.parquet
- [root@bigdata111 resources]# more people.JSON
- {
- "name":"Michael"
- }
- {
- "name":"Andy", "age":30
- }
- {
- "name":"Justin", "age":19
- }
然后进入 spark-shell
使用 SparkSession 对象直接读取 JSON 文件
spark>val peopleDF = spark.read.JSON("hdfs://bigdata111:9000/input/people.json")
创建完毕 DF 之后就可以直接查看表的信息, 十分的简单:
peopleDF.show
如图:
image.PNG
六, 操作 DataFrame(DSL+SQL)
DataFrame 操作也称为无类型的 Dataset 操作. 操作的 DataFrame 是方法 1 创建的 empDF.
>1.DSL(domain-specific language)操作 DataFrame
1. 查看所有的员工信息 ===selec * from empDF;
scala>empDF.show
image
2. 查询所有的员工姓名 ($ 符号添加不加功能一样)===select ename,deptno from empDF;
- scala>empDF.select("ename","deptno").show
- scala>empDF.select([图片上传失败...(image-d58b13-1552381233151)]
- "deptno").show
image
3. 查询所有的员工姓名和薪水, 并给薪水加 100 块钱 ===select ename,sal,sal+100 from empDF;
- scala>empDF.select([图片上传失败...(image-d832d6-1552381233151)]
- "sal",$"sal"+100).show
image
4. 查询工资大于 2000 的员工 ===select * from empDF where sal>2000;
scala>empDF.filter($"sal"> 2000).show
image
5. 分组 ===select deptno,count(*) from empDF group by deptno;
- scala>empDF.groupBy([图片上传失败...(image-c0111e-1552381233151)]
- "deptno").avg().show
- scala>empDF.groupBy($"deptno").max().show
image
2.SQL 操作 DataFrame
(1)前提条件: 需要把 DataFrame 注册成是一个 Table 或者 View
scala>empDF.createOrReplaceTempView("emp")
(2)使用 SparkSession 执行从查询
- scala>spark.sql("select * from emp").show
- scala>spark.sql("select * from emp where deptno=10").show
image
(3)求每个部门的工资总额
scala>spark.sql("select deptno,sum(sal) from emp group by deptno").show
image
七, 视图(临时和全局视图)
在使用 SQL 操作 DataFrame 的时候, 有一个前提就是必须通过 DF 创建一个表或者视图: empDF.createOrReplaceTempView("emp")
在 SparkSQL 中, 如果你想拥有一个临时的 view, 并想在不同的 Session 中共享, 而且在 application 的运行周期内可用, 那么就需要创建一个全局的临时 view. 并记得使用的时候加上 global_temp 作为前缀来引用它, 因为全局的临时 view 是绑定到系统保留的数据库 global_temp 上.
1 创建一个普通的 view 和一个全局的 view
- scala>empDF.createOrReplaceTempView("emp1")
- scala>empDF.createGlobalTempView("emp2")
image
2 在当前会话中执行查询, 均可查询出结果.
- scala>spark.sql("select * from emp1").show
- scala>spark.sql("select * from global_temp.emp2").show
image
3 开启一个新的会话, 执行同样的查询
- scala>spark.newSession.sql("select * from emp1").show (运行出错)
- scala>spark.newSession.sql("select * from global_temp.emp2").show
image
八, 使用数据源
在介绍 parquet 文件的时候我们使用的是 Spark 例子文件夹中提供的 users.parquet 文件:
- [root@bigdata111 resources]# pwd
- /root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
- [root@bigdata111 resources]# ls
- full_user.avsc kv1.txt people.JSON people.txt temp user.avsc users.avro users.parquet
1, 通用的 Load/Save 函数
(*)什么是 parquet 文件?
Parquet 是列式存储格式的一种文件类型, 列式存储有以下的核心:
可以跳过不符合条件的数据, 只读取需要的数据, 降低 IO 数据量.
压缩编码可以降低磁盘存储空间. 由于同一列的数据类型是一样的, 可以使用更高效的压缩编码 (例如 Run Length Encoding 和 Delta Encoding) 进一步节约存储空间.
只读取需要的列, 支持向量运算, 能够获取更好的扫描性能.
Parquet 格式是 Spark SQL 的默认数据源, 可通过 spark.sql.sources.default 配置
(*)通用的 Load/Save 函数
load 函数读取 Parquet 文件: scala>val userDF = spark.read.load("hdfs://bigdata111:9000/input/users.parquet")
对比如下语句:
- scala>val peopleDF = spark.read.JSON("hdfs://bigdata111:9000/input/people.json")
- scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")
查询 Schema 和数据: scala>userDF.show
image
save 函数保存数据, 默认的文件格式: Parquet 文件(列式存储文件)
- scala>userDF.select([图片上传失败...(image-1f551d-1552381233150)]
- "favorite_color").write.save("/root/temp/result1")
- scala>userDF.select([图片上传失败...(image-d37c5e-1552381233150)]
- "favorite_color").write.format("csv").save("/root/temp/result2")
- scala>userDF.select([图片上传失败...(image-2d08c-1552381233150)]
- "favorite_color").write.CSV("/root/temp/result3")
image
image
(*)显式指定文件格式: 加载 JSON 格式
直接加载: val usersDF = spark.read.load("/root/resources/people.json")
会出错
val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存储模式(Save Modes)
可以采用 SaveMode 执行存储操作, SaveMode 定义了对数据的处理模式. 需要注意的是, 这些保存模式不使用任何锁定, 不是原子操作. 此外, 当使用 Overwrite 方式执行时, 在输出新数据之前原数据就已经被删除. SaveMode 详细介绍如下:
默认为 SaveMode.ErrorIfExists 模式, 该模式下, 如果数据库中已经存在该表, 则会直接报异常, 导致数据不能存入数据库. 另外三种模式如下:
SaveMode.Append 如果表已经存在, 则追加在该表中; 若该表不存在, 则会先创建表, 再插入数据;
SaveMode.Overwrite 重写模式, 其实质是先将已有的表及其数据全都删除, 再重新创建该表, 最后插入新的数据;
SaveMode.Ignore 若表不存在, 则创建表, 并存入数据; 在表存在的情况下, 直接跳过数据的存储, 不会报错.
Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出错: 因为 / root/result/parquet1 已经存在
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
5 读写 MySQL
5.1 JDBC
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame, 通过对 DataFrame 一系列的计算后, 还可以将数据再写回关系型数据库中.
5.1.1 从 MySQL 中加载数据库(Spark Shell 方式)
启动 Spark Shell, 必须指定 MySQL 连接驱动 jar 包
spark-shell --master spark://hadoop1:7077 --jars MySQL-connector-java-5.1.35-bin.jar --driver-class-path MySQL-connector-java-5.1.35-bin.jar
从 MySQL 中加载数据
- val jdbcDF = sqlContext.read.format("jdbc").options(
- Map("url"->"jdbc:mysql://hadoop1:3306/bigdata",
- "driver"->"com.mysql.jdbc.Driver",
- "dbtable"->"person", // "dbtable"->"(select * from person where id = 12) as person",
- "user"->"root",
- "password"->"123456")
- ).load()
执行查询
jdbcDF.show()
5.1.2 将数据写入到 MySQL 中(打 jar 包方式)
编写 Spark SQL 程序
- import java.util.Properties
- import org.apache.spark.sql.{Row, SQLContext, SparkSession}
- import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * @author y15079
- * @create 2018-05-12 2:50
- * @desc
- **/
- object JdbcDFDemo {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("MysqlDemo").setMaster("local[2]")
- val sc = new SparkContext(conf)
- // 创建 SQLContext spark1.6.1 以下的写法
- //val sqlContext = new SQLContext(sc)
- //spark2.0 以上的写法
- val sqlContext = SparkSession.builder().config(conf).getOrCreate()
- // 通过并行化创建 RDD
- val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
- // 通过 StructType 直接指定每个字段的 schema
- val schema = StructType(
- List(
- StructField("id", IntegerType, true),
- StructField("name", StringType, true),
- StructField("age", IntegerType, true)
- )
- )
- // 将 RDD 映射到 rowRDD
- val rowRDD = personRDD.map(p=>Row(p(0).toInt, p(1).trim, p(2).toInt))
- // 将 schema 信息应用到 rowRDD 上
- val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
- // 创建 Properties 存储数据库相关属性
- val prop = new Properties()
- prop.put("user", "root")
- prop.put("password", "123456")
- // 将数据追加到数据库
- personDataFrame.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata","bigdata.person", prop)
- sc.stop()
- }
- }
用 maven-shade-plugin 插件将程序打包
将 jar 包提交到 spark 集群
- spark-submit
- --class cn.itcast.spark.sql.jdbcDF
- --master spark://hadoop1:7077
- --jars MySQL-connector-java-5.1.35-bin.jar
- --driver-class-path MySQL-connector-java-5.1.35-bin.jar
- /root/demo.jar
来源: http://www.jianshu.com/p/2997c90982d4