本文主要是翻译 Spark 官网 Quick Start. 只能保证大概意思, 尽量保证细节. 英文水平有限, 如果有错误的地方请指正, 轻喷
快速入门(Quick Start)
使用 Spark Shell 交互式编程
基本操作
更多关于 Dataset 的操作
缓存
独立的应用程序
下一步
这个指南提供了使用 Spark 的快速介绍. 我们会首先介绍 Spark 交互式编程 (使用 Python 或者 Scala) 的 API, 然后展示如何用 Java,Scala 和 Python 来编写应用程序.
为了使用这个指南, 您需要先从 Spark 网页 http://spark.apache.org/downloads.html 下载打包发布的 Spark 安装包. 由于我们将不会 (在指南中) 使用 HDFS, 您可以下载任意版本的 Hadoop 安装包.
需要注意的是, Spark2.0 之前, Spark 的主要编程接口是弹性分布式数据集(Resilient Distributed Dataset (RDD)).Spark2.0 之后, RDD 被 Dataset 取代, Dataset 和 RDD 一样是强类型, 但是在底层进行了更多的优化. Spark2.0 之后仍然支持 RDD 接口, 并且您可以从 RDD 编程指南中 获取更详细的参考. 当然, 我们强烈建议您选择使用 Dataset, 因为它的性能比 RDD 更好. 查看 SQL 编程指南 以得到更多关于 Dataset 的信息.
使用 Spark Shell 交互式编程
基本操作
Spark Shell 提供了一个简单的方式去学习 API, 同时也提供了一个强大的交互式数据分析工具. 它可以基于 Scala(一种在 java 虚拟机上运行并因此可以很好地使用已有的 java 库的编程语言)或 Python 使用. 在 Spark 目录下运行以下内容来开始(Sprk Shell):
Scala 版
./bin/pyspark
Python 版
./bin/pyspark
如果你当前环境使用 pip 下载了 PySpark, 可以使用如下下方式调用
pyspark
Spark 主要的抽象是一个被叫做 Dataset 的分布式集合. Dataset 可以通过 Hadoop InputFormat(比如 HDFS 文件)或者 转换其他 Dataset 中创建. 让我们通过 Spark 源目录下的 README 文件内容创建一个新的 Dataset:
Scala 版
- scala> val textFile = spark.read.textFile("README.md")
- textFile: org.apache.spark.sql.Dataset[String] = [value: string]
Python 版
>>> textFile = spark.read.text("README.md")
你可以直接从 Dataset 中, 通过调用一些操作或者转化 Dataset 以获得一个新的 Dataset 来获取它的值. 请阅读 API 文档(Scala / Python) 以获取更多细节
Scala 版
- scala> textFile.count() // 该 Dataset 中的成员数量
- res0: Long = 126 // 由于 README.md 会随着时间的推移不断改变, 所以结果可能会有所不同, 其他输出也有类似情况
- scala> textFile.first() // 该 Dataset 的第一个成员
- res1: String = # Apache Spark
Python 版
- >>> textFile.count() # 该 DataFrame 中的行数
- 126
- >>> textFile.first() # 该 DataFrame 的第一行
- Row(value=u'# Apache Spark')
现在让我们使用该 Dataset 来转换成一个新的 Dataset. 我们调用 filter 来返回一个新的 Dataset, 其中包含这个文件内容的子集.
Scala 版
- scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
- linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
Python 版
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将数据集转换和数据集操作串接在一起
Scala 版
- scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
- res3: Long = 15
Python 版
- >>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
- 15
更多关于 Dataset 的操作
Dataset 操作和转换可以用来做更复杂的计算. 假设我们想要找到单词数量最多的那行:
Scala 版
- scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a> b) a else b)
- res4: Long = 15
这首先将文件中的一行映射成一个整数值, 并创建一个新的 Dataset. 调用该 Dataset 的 reduce 方法以找到最大的单词计数. map 和 reduce 的参数是 Scala 的函数字面量(闭包), 并且可以使用任何语言的特性或者 Scala/Java 库. 比如, 我们可以很荣誉地调用任何地方声明地函数(方法). 我们将使用 Math.max() 方法以使这段代码易于理解:
- scala> import java.lang.Math
- import java.lang.Math
- scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
- res5: Int = 15
MapReduce 是一种常见的数据流格式, 这是由 Hadoop 推广的. Spark 可以很容易地实现 MapReduce 流:
- scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
- wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里, 我们调用 flatMap 来将一个行级 (以文本中的一行为一个成员(Item)) 的 Dataset 转换成一个 单词 级 的 Dataset, 然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为 (String, Long) 数据对形式 的 Dateset. 为了在我们的 shell 中统计出单词的数量, 我们可以调用 collect 方法:
- scala> wordCounts.collect()
- res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Python 版
- >>> from pyspark.sql.functions import *
- >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
- [Row(max(numWords)=15)]
这首先将文件中的一行映射成一个整数值 并取一个为 "numWords" 的别名, 同时创建一个新的 DataFrame. 调用该 Dataset 的 agg 方法以找到最大的单词计数. select 和 agg 的参数都是 , 我们可以使用 df.colName 方法来从一个 DataFrame 中获得一个 colum. 我们同样可以导入 pyspark.sql.functions, 它提供了很多简易的方法从一个已有的 Colum 构建一个新的 Colum.
MapReduce 是一种常见的数据流格式, 这是由 Hadoop 推广的. Spark 可以很容易地实现 MapReduce 流:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
这里, 我们在 select 方法中使用了 explode 方法来将一个行级 (以文本中的一行为一个成员(Item)) 的 Dataset 转换成一个 单词 级 的 Dataset. 然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为一个拥有两个 Colum:"word" 和 "count" 的 DataFrame. 为了在我们的 shell 中统计出单词的数量, 我们可以调用 collect 方法:
- >>> wordCounts.collect()
- [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
缓存(Caching)
Spark 同样支持将数据集加入到一个集群中的内存缓存中. 当数据被重复访问时, 这是非常有用的. 比如查询一个小的热点数据集 或者 运行像 PageRank 这样的迭代算法. 让我们标记我们的 linesWithSpark 作为缓存数据 来作为一个例子:
Scala 版
- scala> linesWithSpark.cache()
- res7: linesWithSpark.type = [value: string]
- scala> linesWithSpark.count()
- res8: Long = 15
- scala> linesWithSpark.count()
- res9: Long = 15
Python 版
- >>> linesWithSpark.cache()
- >>> linesWithSpark.count()
- 15
- >>> linesWithSpark.count()
- 15
使用 Spark 来探索和缓存一个 100 行的文本文件看起来很蠢. 有趣的是, 这些方法同样可以作用在非常大的数据集中, 哪怕它们被分布在数十个或上百个节点中. 正如 RDD 编程指南 中描述的那样, 您可以通过连接 bin/spark-shell 到一个集群中来进行以上交互式操作.
独立的应用程序
假设我们希望使用 Spark API 编写一个独立的 应用程序. 我们将分别使用 Scala(带 sbt),Java(带 Maven) 和 Python(pip) 编写一个简单的应用程序.
Scala
我们将在 Scala 中创建一个 Spark 应用程序 -- 非常简单. 实际上, 它被命名为 SimleApp.scala
- /* SimpleApp.scala */
- import org.apache.spark.sql.SparkSession
- object SimpleApp {
- def main(args: Array[String]) {
- val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
- val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
- val logData = spark.read.textFile(logFile).cache()
- val numAs = logData.filter(line => line.contains("a")).count()
- val numBs = logData.filter(line => line.contains("b")).count()
- println(s"Lines with a: $numAs, Lines with b: $numBs")
- spark.stop()
- }
- }
注意, 这个应用程序需要定义一个 main() 方法 而不是 继承 scala.App. scala.App 的子类可能无法正常地工作.
这个程序只是统计 Spark README 文件中包含 "a" 的行数和 包含 "b" 的行数. 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME. 与之前 Spark Shell 中的例子不同的是, Spark Shell 初始化它自己的 SparkSession, 而我们初始化一个 SparkSeesion 作为程序的一部分.
我们调用 SparkSession.builder 来构造一个 [SparkSession], 然后设置应用的名字, 最后调用 getOrCreate 方法获取一个 [SparkSession]实例.
我们的应用程序取决于 Spark API, 所以我们同样需要一个 sbt 配置文件, build.sbt, 这表示 Spark 是一个依赖组件.
- name := "Simple Project"
- version := "1.0"
- scalaVersion := "2.11.8"
- libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
为了使 sbt 能够正常工作, 我们需要根据经典的目录结构布局 SimpleApp.scala 和 build.sbt. 一旦完成这些, 我们就可以创建一个包含这个应用程序源代码的 JAR 包, 然后使用 spark-submit 脚本运行我们的程序.
- # Your directory layout should look like this
- $ find .
- .
- ./build.sbt
- ./src
- ./src/main
- ./src/main/scala
- ./src/main/scala/SimpleApp.scala
- # Package a jar containing your application
- $ sbt package
- ...
- [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
- # Use spark-submit to run your application
- $ YOUR_SPARK_HOME/bin/spark-submit \
- --class "SimpleApp" \
- --master local[4] \
- target/scala-2.11/simple-project_2.11-1.0.jar
- ...
- Lines with a: 46, Lines with b: 23
- Java
这个例子将会使用 Maven 编译一个 JAR 应用程序, 但是很多类似的构建系统都可以工作.
我们将创建一个简单的 Spark 应用程序, SimpleApp.java
- /* SimpleApp.java */
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.Dataset;
- public class SimpleApp {
- public static void main(String[] args) {
- String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
- SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
- Dataset<String> logData = spark.read().textFile(logFile).cache();
- long numAs = logData.filter(s -> s.contains("a")).count();
- long numBs = logData.filter(s -> s.contains("b")).count();
- System.out.println("Lines with a:" + numAs + ", lines with b:" + numBs);
- spark.stop();
- }
- }
这个程序只是统计 Spark README 文件中包含 "a" 的行数和 包含 "b" 的行数. 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME. 与之前 Spark Shell 中的例子不同的是, Spark Shell 初始化它自己的 SparkSession, 而我们初始化一个 SparkSeesion 作为程序的一部分.
为了构建这个程序, 我们同样要编写一个 Maven pom.xml 文件, 这个文件将 Spark 列为一个依赖组件. 请注意, Spark 构件 被标记为 Scala 版本
- <project>
- <groupId>edu.berkeley</groupId>
- <artifactId>simple-project</artifactId>
- <modelVersion>4.0.0</modelVersion>
- <name>Simple Project</name>
- <packaging>jar</packaging>
- <version>1.0</version>
- <dependencies>
- <dependency> <!-- Spark dependency -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.3.1</version>
- </dependency>
- </dependencies>
- </project>
我们根据规范的 Maven 目录结构列出这些文件
- $ find .
- ./pom.xml
- ./src
- ./src/main
- ./src/main/java
- ./src/main/java/SimpleApp.java
现在, 我们可以使用 Maven 打包这个应用程序并且 通过 ./bin/spark-submit. 执行
- # Package a JAR containing your application
- $ mvn package
- ...
- [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
- # Use spark-submit to run your application
- $ YOUR_SPARK_HOME/bin/spark-submit \
- --class "SimpleApp" \
- --master local[4] \
- target/simple-project-1.0.jar
- ...
- Lines with a: 46, Lines with b: 23
- Python
这里我们将展示如何使用 Python API(PySpark)来编写一个应用程序
如果你正构建一个打包的 PySpark 应用程序或库, 你可以将它添加到你的 setup.py 文件中, 如下:
- install_requires=[
- 'pyspark=={site.SPARK_VERSION}'
- ]
作为示例, 我们将创建一个简单的 Spark 应用程序, SimpleApp.py:
- """SimpleApp.py"""
- from pyspark.sql import SparkSession
- logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
- spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
- logData = spark.read.text(logFile).cache()
- numAs = logData.filter(logData.value.contains('a')).count()
- numBs = logData.filter(logData.value.contains('b')).count()
- print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
- spark.stop()
这个程序只是统计 Spark README 文件中包含 "a" 的行数和 包含 "b" 的行数. 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME. 与之前 Spark Shell 中的例子不同的是, Spark Shell 初始化它自己的 SparkSession, 而我们初始化一个 SparkSeesion 作为程序的一部分. 和 Scala 和 Java 例子一样, 我们使用 SparkSession 来创建 Dataset . 对于使用自定义类或者第三方库的应用程序, 我们同样可以通过它的 --py-- files 参数将代码和依赖打包成 zip 文件 (使用 spark-submit --help 查看细节) 的形式 添加到 spark-submit. SimpleApp 足够简单, 所以我们不用指定任何代码依赖组件.
我们使用 bin/spark-submit 脚本运行这个程序
- # Use spark-submit to run your application
- $ YOUR_SPARK_HOME/bin/spark-submit \
- --master local[4] \
- SimpleApp.py
- ...
- Lines with a: 46, Lines with b: 23
如果您将 PySpark 通过 pip 安装到了您的环境中(eg. pip install pyspark), 根据您的喜好, 可以使用常规的 Python 解释器 或者 使用 spark-submit 来运行您的程序
- # Use the Python interpreter to run your application
- $ python SimpleApp.py
- ...
- Lines with a: 46, Lines with b: 23
下一步
祝贺您运行了您的第一个 Spark 应用程序
关于 API 的深入概述, 请从 RDD 编程指南 和 SQL 编程指南 开始, 或者 查看编程指南菜单 以了解其他组件
关于使用集群运行应用程序, 请移步 部署概述
最后, Spark 包含了几个简单的例子, 它们被保存在 example 目录下(Scala, Java, Python, R), 你可以按照以下方式运行它们:
- # For Scala and Java, use run-example:
- ./bin/run-example SparkPi
- # For Python examples, use spark-submit directly:
- ./bin/spark-submit examples/src/main/python/pi.py
- # For R examples, use spark-submit directly:
- ./bin/spark-submit examples/src/main/r/dataframe.R
来源: https://www.cnblogs.com/yeyeck/p/9652117.html