简介
??Spark SQL 支持多种结构化数据源, 轻松从各种数据源中读取 Row 对象. 这些数据源包括 Parquet,JSON,Hive 表及关系型数据库等.
?? 当只使用一部分字段时, Spark SQL 可以智能地只扫描这些字段, 而不会像 hadoopFile 方法一样简单粗暴地扫描全部数据.
Parquet
??Parquet 是一种流行的列式存储格式, 可以高效地存储具有嵌套字段的记录. Parquet 自动保存原始数据的类型, 当写入 Parquet 文件时, 所有的列会自动转为可空约束.
- scala
- // Encoders for most common types are automatically provided by importing spark.implicits._
- import spark.implicits._
- val peopleDF = spark.read.json("examples/src/main/resources/people.json")
- // DataFrames can be saved as Parquet files, maintaining the schema information
- peopleDF.write.parquet("people.parquet")
- // Read in the parquet file created above
- // Parquet files are self-describing so the schema is preserved
- // The result of loading a Parquet file is also a DataFrame
- val parquetFileDF = spark.read.parquet("people.parquet")
- // Parquet files can also be used to create a temporary view and then used in SQL statements
- parquetFileDF.createOrReplaceTempView("parquetFile")
- val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
- namesDF.map(attributes => "Name:" + attributes(0)).show()
- // +------------+
- // | value|
- // +------------+
- // |Name: Justin|
- // +------------+
- java
- import org.apache.spark.api.java.function.MapFunction;
- import org.apache.spark.sql.Encoders;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
- // DataFrames can be saved as Parquet files, maintaining the schema information
- peopleDF.write().parquet("people.parquet");
- // Read in the Parquet file created above.
- // Parquet files are self-describing so the schema is preserved
- // The result of loading a parquet file is also a DataFrame
- Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
- // Parquet files can also be used to create a temporary view and then used in SQL statements
- parquetFileDF.createOrReplaceTempView("parquetFile");
- Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
- Dataset<String> namesDS = namesDF.map(
- (MapFunction<Row, String>) row -> "Name:" + row.getString(0),
- Encoders.STRING());
- namesDS.show();
- // +------------+
- // | value|
- // +------------+
- // |Name: Justin|
- // +------------+
- python
- peopleDF = spark.read.json("examples/src/main/resources/people.json")
- # DataFrames can be saved as Parquet files, maintaining the schema information.
- peopleDF.write.parquet("people.parquet")
- # Read in the Parquet file created above.
- # Parquet files are self-describing so the schema is preserved.
- # The result of loading a parquet file is also a DataFrame.
- parquetFile = spark.read.parquet("people.parquet")
- # Parquet files can also be used to create a temporary view and then used in SQL statements.
- parquetFile.createOrReplaceTempView("parquetFile")
- teenagers = spark.sql("SELECT name FROM parquetFile WHERE age>= 13 AND age <= 19")
- teenagers.show()
- # +------+
- # | name|
- # +------+
- # |Justin|
- # +------+
- sql
- CREATE TEMPORARY VIEW parquetTable
- USING org.apache.spark.sql.parquet
- OPTIONS (
- path "examples/src/main/resources/people.parquet"
- )
- SELECT * FROM parquetTable
- JSON
??Spark SQL 可以自动推断 JSON 数据集的结构, 并加载为以 Row 为集合项的 Dataset.
?? 默认 Spark SQL 读取的 json 文件不是常规的 json 文件, 每一行必须包含一个独立的, 自包含的有效 JSOn 对象. 对于常规的多行 JSON 文件, 设置 multiLine 选项为 true 即可.
- scala
- // Primitive types (Int, String, etc) and Product types (case classes) encoders are
- // supported by importing this when creating a Dataset.
- import spark.implicits._
- // A JSON dataset is pointed to by path.
- // The path can be either a single text file or a directory storing text files
- val path = "examples/src/main/resources/people.json"
- val peopleDF = spark.read.json(path)
- // The inferred schema can be visualized using the printSchema() method
- peopleDF.printSchema()
- // root
- // |-- age: long (nullable = true)
- // |-- name: string (nullable = true)
- // Creates a temporary view using the DataFrame
- peopleDF.createOrReplaceTempView("people")
- // SQL statements can be run by using the sql methods provided by spark
- val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
- teenagerNamesDF.show()
- // +------+
- // | name|
- // +------+
- // |Justin|
- // +------+
- // Alternatively, a DataFrame can be created for a JSON dataset represented by
- // a Dataset[String] storing one JSON object per string
- val otherPeopleDataset = spark.createDataset(
- """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
- val otherPeople = spark.read.json(otherPeopleDataset)
- otherPeople.show()
- // +---------------+----+
- // | address|name|
- // +---------------+----+
- // |[Columbus,Ohio]| Yin|
- // +---------------+----+
- java
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- // A JSON dataset is pointed to by path.
- // The path can be either a single text file or a directory storing text files
- Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
- // The inferred schema can be visualized using the printSchema() method
- people.printSchema();
- // root
- // |-- age: long (nullable = true)
- // |-- name: string (nullable = true)
- // Creates a temporary view using the DataFrame
- people.createOrReplaceTempView("people");
- // SQL statements can be run by using the sql methods provided by spark
- Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
- namesDF.show();
- // +------+
- // | name|
- // +------+
- // |Justin|
- // +------+
- // Alternatively, a DataFrame can be created for a JSON dataset represented by
- // a Dataset<String> storing one JSON object per string.
- List<String> jsonData = Arrays.asList(
- "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
- Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
- Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
- anotherPeople.show();
- // +---------------+----+
- // | address|name|
- // +---------------+----+
- // |[Columbus,Ohio]| Yin|
- // +---------------+----+
- python
- # spark is from the previous example.
- sc = spark.sparkContext
- # A JSON dataset is pointed to by path.
- # The path can be either a single text file or a directory storing text files
- path = "examples/src/main/resources/people.json"
- peopleDF = spark.read.json(path)
- # The inferred schema can be visualized using the printSchema() method
- peopleDF.printSchema()
- # root
- # |-- age: long (nullable = true)
- # |-- name: string (nullable = true)
- # Creates a temporary view using the DataFrame
- peopleDF.createOrReplaceTempView("people")
- # SQL statements can be run by using the sql methods provided by spark
- teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
- teenagerNamesDF.show()
- # +------+
- # | name|
- # +------+
- # |Justin|
- # +------+
- # Alternatively, a DataFrame can be created for a JSON dataset represented by
- # an RDD[String] storing one JSON object per string
- jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
- otherPeopleRDD = sc.parallelize(jsonStrings)
- otherPeople = spark.read.json(otherPeopleRDD)
- otherPeople.show()
- # +---------------+----+
- # | address|name|
- # +---------------+----+
- # |[Columbus,Ohio]| Yin|
- # +---------------+----+
- sql
- CREATE TEMPORARY VIEW jsonTable
- USING org.apache.spark.sql.json
- OPTIONS (
- path "examples/src/main/resources/people.json"
- )
- SELECT * FROM jsonTable
- Hive
??Spark SQL 支持任何 Hive 支持的存储格式 (SerDe), 包括文本文件, RCFiles,ORC,Parquet,Avro 及 Protocol Buffer 等.
?? 如果已配置好 hive 环境, 将 hive-site.xml,core-site.xml(用于安全配置),hdfs-site.xml(HDFS 配置) 放到 conf 目录下; 如果没有 hive 环境, Spark SQL 会自动在 spark-warehouse(spark.sql.warehouse.dir 配置项) 目录下创建 metastore_db. 另外, 需要赋予执行 spark 应用的用户写权限.
- scala
- import java.io.File
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.SparkSession
- case class Record(key: Int, value: String)
- // warehouseLocation points to the default location for managed databases and tables
- val warehouseLocation = new File("spark-warehouse").getAbsolutePath
- val spark = SparkSession
- .builder()
- .appName("Spark Hive Example")
- .config("spark.sql.warehouse.dir", warehouseLocation)
- .enableHiveSupport()
- .getOrCreate()
- import spark.implicits._
- import spark.sql
- sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
- sql("LOAD DATA LOCAL INPATH'examples/src/main/resources/kv1.txt'INTO TABLE src")
- // Queries are expressed in HiveQL
- sql("SELECT * FROM src").show()
- // +---+-------+
- // |key| value|
- // +---+-------+
- // |238|val_238|
- // | 86| val_86|
- // |311|val_311|
- // ...
- // Aggregation queries are also supported.
- sql("SELECT COUNT(*) FROM src").show()
- // +--------+
- // |count(1)|
- // +--------+
- // | 500 |
- // +--------+
- // The results of SQL queries are themselves DataFrames and support all normal functions.
- val sqlDF = sql("SELECT key, value FROM src WHERE key <10 ORDER BY key")
- // The items in DataFrames are of type Row, which allows you to access each column by ordinal.
- val stringsDS = sqlDF.map {
- case Row(key: Int, value: String) => s"Key: $key, Value: $value"
- }
- stringsDS.show()
- // +--------------------+
- // | value|
- // +--------------------+
- // |Key: 0, Value: val_0|
- // |Key: 0, Value: val_0|
- // |Key: 0, Value: val_0|
- // ...
- // You can also use DataFrames to create temporary views within a SparkSession.
- val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
- recordsDF.createOrReplaceTempView("records")
- // Queries can then join DataFrame data with data stored in Hive.
- sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
- // +---+------+---+------+
- // |key| value|key| value|
- // +---+------+---+------+
- // | 2| val_2| 2| val_2|
- // | 4| val_4| 4| val_4|
- // | 5| val_5| 5| val_5|
- // ...
- java
- import java.io.File;
- import java.io.Serializable;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.spark.api.java.function.MapFunction;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Encoders;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public static class Record implements Serializable {
- private int key;
- private String value;
- public int getKey() {
- return key;
- }
- public void setKey(int key) {
- this.key = key;
- }
- public String getValue() {
- return value;
- }
- public void setValue(String value) {
- this.value = value;
- }
- }
- // warehouseLocation points to the default location for managed databases and tables
- String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
- SparkSession spark = SparkSession
- .builder()
- .appName("Java Spark Hive Example")
- .config("spark.sql.warehouse.dir", warehouseLocation)
- .enableHiveSupport()
- .getOrCreate();
- spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
- spark.sql("LOAD DATA LOCAL INPATH'examples/src/main/resources/kv1.txt'INTO TABLE src");
- // Queries are expressed in HiveQL
- spark.sql("SELECT * FROM src").show();
- // +---+-------+
- // |key| value|
- // +---+-------+
- // |238|val_238|
- // | 86| val_86|
- // |311|val_311|
- // ...
- // Aggregation queries are also supported.
- spark.sql("SELECT COUNT(*) FROM src").show();
- // +--------+
- // |count(1)|
- // +--------+
- // | 500 |
- // +--------+
- // The results of SQL queries are themselves DataFrames and support all normal functions.
- Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key <10 ORDER BY key");
- // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
- Dataset<String> stringsDS = sqlDF.map(
- (MapFunction<Row, String>) row -> "Key:" + row.get(0) + ", Value:" + row.get(1),
- Encoders.STRING());
- stringsDS.show();
- // +--------------------+
- // | value|
- // +--------------------+
- // |Key: 0, Value: val_0|
- // |Key: 0, Value: val_0|
- // |Key: 0, Value: val_0|
- // ...
- // You can also use DataFrames to create temporary views within a SparkSession.
- List<Record> records = new ArrayList<>();
- for (int key = 1; key <100; key++) {
- Record record = new Record();
- record.setKey(key);
- record.setValue("val_" + key);
- records.add(record);
- }
- Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
- recordsDF.createOrReplaceTempView("records");
- // Queries can then join DataFrames data with data stored in Hive.
- spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
- // +---+------+---+------+
- // |key| value|key| value|
- // +---+------+---+------+
- // | 2| val_2| 2| val_2|
- // | 2| val_2| 2| val_2|
- // | 4| val_4| 4| val_4|
- // ...
- python
- from os.path import expanduser, join, abspath
- from pyspark.sql import SparkSession
- from pyspark.sql import Row
- # warehouse_location points to the default location for managed databases and tables
- warehouse_location = abspath('spark-warehouse')
- spark = SparkSession .builder .appName("Python Spark SQL Hive integration example") .config("spark.sql.warehouse.dir", warehouse_location) .enableHiveSupport() .getOrCreate()
- # spark is an existing SparkSession
- spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
- spark.sql("LOAD DATA LOCAL INPATH'examples/src/main/resources/kv1.txt'INTO TABLE src")
- # Queries are expressed in HiveQL
- spark.sql("SELECT * FROM src").show()
- # +---+-------+
- # |key| value|
- # +---+-------+
- # |238|val_238|
- # | 86| val_86|
- # |311|val_311|
- # ...
- # Aggregation queries are also supported.
- spark.sql("SELECT COUNT(*) FROM src").show()
- # +--------+
- # |count(1)|
- # +--------+
- # | 500 |
- # +--------+
- # The results of SQL queries are themselves DataFrames and support all normal functions.
- sqlDF = spark.sql("SELECT key, value FROM src WHERE key <10 ORDER BY key")
- # The items in DataFrames are of type Row, which allows you to access each column by ordinal.
- stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
- for record in stringsDS.collect():
- print(record)
- # Key: 0, Value: val_0
- # Key: 0, Value: val_0
- # Key: 0, Value: val_0
- # ...
- # You can also use DataFrames to create temporary views within a SparkSession.
- Record = Row("key", "value")
- recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
- recordsDF.createOrReplaceTempView("records")
- # Queries can then join DataFrame data with data stored in Hive.
- spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
- # +---+------+---+------+
- # |key| value|key| value|
- # +---+------+---+------+
- # | 2| val_2| 2| val_2|
- # | 4| val_4| 4| val_4|
- # | 5| val_5| 5| val_5|
- # ...
JDBC 连接
??Spark SQL 可以使用 JDBC 连接读写关系型数据库中的数据. 这种方式比使用 spark core 中的 JdbcRDD 要好, 因为生成的 DataFrame 可以很容易被处理.
- scala
- // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
- // Loading data from a JDBC source
- val jdbcDF = spark.read
- .format("jdbc")
- .option("url", "jdbc:postgresql:dbserver")
- .option("dbtable", "schema.tablename")
- .option("user", "username")
- .option("password", "password")
- .load()
- val connectionProperties = new Properties()
- connectionProperties.put("user", "username")
- connectionProperties.put("password", "password")
- val jdbcDF2 = spark.read
- .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
- // Saving data to a JDBC source
- jdbcDF.write
- .format("jdbc")
- .option("url", "jdbc:postgresql:dbserver")
- .option("dbtable", "schema.tablename")
- .option("user", "username")
- .option("password", "password")
- .save()
- jdbcDF2.write
- .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
- // Specifying create table column data types on write
- jdbcDF.write
- .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
- .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
- java
- // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
- // Loading data from a JDBC source
- Dataset<Row> jdbcDF = spark.read()
- .format("jdbc")
- .option("url", "jdbc:postgresql:dbserver")
- .option("dbtable", "schema.tablename")
- .option("user", "username")
- .option("password", "password")
- .load();
- Properties connectionProperties = new Properties();
- connectionProperties.put("user", "username");
- connectionProperties.put("password", "password");
- Dataset<Row> jdbcDF2 = spark.read()
- .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
- // Saving data to a JDBC source
- jdbcDF.write()
- .format("jdbc")
- .option("url", "jdbc:postgresql:dbserver")
- .option("dbtable", "schema.tablename")
- .option("user", "username")
- .option("password", "password")
- .save();
- jdbcDF2.write()
- .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
- // Specifying create table column data types on write
- jdbcDF.write()
- .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
- .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
- python
- # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
- # Loading data from a JDBC source
- jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load()
- jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
- properties={"user": "username", "password": "password"})
- # Saving data to a JDBC source
- jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save()
- jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
- properties={"user": "username", "password": "password"})
- # Specifying create table column data types on write
- jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
- properties={"user": "username", "password": "password"})
- sql
- CREATE TEMPORARY VIEW jdbcTable
- USING org.apache.spark.sql.jdbc
- OPTIONS (
- url "jdbc:postgresql:dbserver",
- dbtable "schema.tablename",
- user 'username',
- password 'password'
- )
- INSERT INTO TABLE jdbcTable
- SELECT * FROM resultTable
来源: http://www.bubuko.com/infodetail-2762133.html