Spark Streaming 是核心 Spark API 的扩展, 可实现实时数据流的可伸缩, 高吞吐量, 容错流处理. 可以从许多数据源 (例如 Kafka,Flume,Kinesis 或 TCP sockets) 中提取数据, 并且可以使用复杂的算法处理数据, 这些算法用高级函数表示, 如 map,reduce,join 和 Windows. 最后, 可以将处理后的数据推送到文件系统, 数据库和实时仪表板. 实际上, 可以在数据流上应用 Spark 的机器学习和图形处理算法.
在内部, 它的工作方式如下. Spark Streaming 接收实时输入数据流, 并将数据分成批次, 然后由 Spark 引擎进行处理, 以生成批次的最终结果流.
Spark Streaming 提供了一种高级抽象, 称为离散流或 DStream, 它表示连续的数据流. DStreams 可以从 Kafka,Flume 和 Kinesis 等源的输入数据流创建, 也可以通过在其他 DStreams 上应用高级操作创建. 在内部, DStream 表示为 RDDs 序列.
1. 了解 Spark
Apache Spark 是一个用于大规模数据处理的统一分析引擎
特性:
快
将工作负载运行速度提高 100 倍
Apache Spark 使用最新的 DAG 调度程序, 查询优化器和物理执行引擎, 为批处理数据和流数据提供了高性能.
易用
可以使用 Java,Scala,Python,R 和 SQL 快速编写应用程序.
通用
结合 SQL, 流和复杂的分析
Spark 为包括 SQL 和 DataFrames, 用于机器学习的 MLlib,GraphX 和 Spark Streaming 在内的一堆库提供支持. 您可以在同一应用程序中无缝组合这些库.
到处运行
Spark 可在 Hadoop,Apache Mesos,Kubernetes, 独立或云中运行. 它可以访问各种数据源.
可以在 EC2,Hadoop YARN,Mesos 或 Kubernetes 上使用其独立集群模式运行 Spark. 访问 HDFS,Alluxio,Apache Cassandra,Apache HBase,Apache Hive 和数百种其他数据源中的数据.
2. 入门案例
统计单词出现的次数, 这个例子在 Hadoop 中用 MapReduce 也写过.
JavaStreamingContext 是 java 版的 StreamingContext. 它是 Spark Streaming 功能的主要入口点. 它提供了从输入源创建 JavaDStream 和 JavaPairDStream 的方法. 可以使用 context.sparkContext 访问内部的 org.apache.spark.API.java.JavaSparkContext. 在创建和转换 DStream 之后, 可以分别使用 context.start()和 context.stop()启动和停止流计算.
- public static void main(String[] args) throws InterruptedException {
- // Create a local StreamingContext with two working thread and batch interval of 1 second
- SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
- // Create a DStream that will connect to hostname:port, like localhost:9999
- JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
- // Split each line into words
- JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
- // Count each Word in each batch
- JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
- JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
- // Print the first ten elements of each RDD generated in this DStream to the console
- wordCounts.print();
- // Start the computation
- jssc.start();
- // Wait for the computation to terminate
- jssc.awaitTermination();
- }
3. 基本概念
3.1. Maven 依赖
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.12</artifactId>
- <version>2.4.5</version>
- <scope>provided</scope>
- </dependency>
为了从其它数据源获取数据, 需要添加相应的依赖项 spark-streaming-xyz_2.12. 例如:
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
- <version>2.4.5</version>
- </dependency>
3.2. 初始化 StreamingContext
为了初始化一个 Spark Streaming 程序, 必须创建一个 StreamingContext 对象, 该对象是所有 Spark Streaming 功能的主要入口点.
我们可以从 SparkConf 对象中创建一个 JavaStreamingContext 对象
- import org.apache.spark.SparkConf;
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.API.java.JavaStreamingContext;
- SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
- JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
appName 参数是显示在集群 UI 上的你的应用的名字
master 参数是一个 Spark, Mesos 或 YARN 集群 URL, 或者也可以是一个特定的字符串 "local[*]" 表示以本地模式运行. 实际上, 当在集群上运行时, 肯定不希望对在程序中对 master 进行硬编码, 而希望通过 spark-submit 启动应用程序并在其中接收它. 然而, 对于本地测试, 你可以传 "local[*]" 来运行 Spark Streaming.
还可以从一个已存在的 JavaSparkContext 中创建一个 JavaStreamingContext 对象
- import org.apache.spark.streaming.API.java.*;
- JavaSparkContext sc = ... //existing JavaSparkContext
- JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
在定义完 context 之后, 必须做以下事情:
通过创建 input DStreams 来定义 input sources
通过对 DStreams 应用 transformation(转换)和 output(输出)操作来定义流计算
用 streamingContext.start()来开始接收数据并处理它
用 streamingContext.awaitTermination()等待处理停止(手动停止或由于任何错误)
用 streamingContext.stop()可以手动停止
需要记住的点:
一旦启动上下文, 就无法设置新的流计算或将其添加到该流计算中
上下文一旦停止, 就无法重新启动
一个 JVM 中只能同时激活一个 StreamingContext
StreamingContext 中的 stop()也会停止 SparkContext. 但如果要仅停止 StreamingContext 的话, 设置 stop(false)
只要在创建下一个 StreamingContext 之前停止了上一个 StreamingContext(不停止 SparkContext), 就可以将 SparkContext 重用于创建多个 StreamingContext
3.3. DStreams(离散流)
Discretized Stream 或 DStream 是 Spark Streaming 提供的基本抽象. 它表示一个连续的数据流, 可以是从源接收的输入数据流, 也可以是通过转换输入流生成的已处理数据流. 在内部, DStream 由一系列连续的 RDD 表示, 这是 Spark 对不变的分布式数据集的抽象. DStream 中的每个 RDD 都包含来自特定间隔的数据, 如下图所示.
在 DStream 上执行的任何操作都转换为对基础 RDD 的操作. 例如, 最简单的将一行句子转换为单词的例子中, flatMap 操作应用于行 DStream 中的每个 RDD, 以生成单词 DStream 的 RDD. 如下图所示:
3.4. Input DStreams 和 Receivers
Input DStream 是表示从源接收的输入数据流. 在上图中, lines 是输入 DStream, 因为它表示从 netcat 服务器接收的数据流. 每一个输入 DStream 都关联着一个 Receiver 对象, 该对象从源接收数据并将其存储在 Spark 的内存中以进行处理.
Spark Streaming 提供了两类内置的 streaming 源:
Basic sources : 直接在 StreamingContext API 中可用的源. 例如, 文件系统和 socket 连接
Advanced sources : 像 Kafka,Flume,Kinesis 等这样的源, 可通过额外的程序类获得
如果要在流应用程序中并行接收多个数据流, 则可以创建多个输入 DStream. 这将创建多个 Receiver(接收器), 这些接收器将同时接收多个数据流. 重要的是要记住, 必须为 Spark Streaming 应用程序分配足够的内核(或线程, 如果在本地运行), 以处理接收到的数据以及运行接收器.
需要记住的点:
在本地运行 Spark Streaming 程序时, 请勿使用 "local" 或 "local [1]" 作为 master URL. 这两种方式均意味着仅一个线程将用于本地运行任务. 如果使用的是基于接收器的输入 DStream(例如套接字, Kafka,Flume 等), 则将使用单个线程来运行接收器, 而不会留下任何线程来处理接收到的数据. 因此, 在本地运行时, 请始终使用 "local [n]" 作为主 URL, 其中 n > 要运行的接收器数
为了将逻辑扩展到在集群上运行, 分配给 Spark Streaming 应用程序的内核数必须大于接收器数. 否则, 系统将接收数据, 但无法处理它.
Basic Sources
为了从文件中读取数据, 可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]来创建一个 DStream
例如: streamingContext.textFileStream(dataDirectory);
Spark Streaming 将监视目录 dataDirectory 并处理在该目录中创建的所有文件
可以监视一个简单的目录, 例如:"hdfs://namenode:8040/logs/2017/*". 在这里, DStream 将由目录中与模式匹配的所有文件组成. 也就是说: 它是目录的模式, 而不是目录中的文件.
所有文件必须使用相同的数据格式
根据文件的修改时间而不是创建时间, 将其视为时间段的一部分
一旦已经被处理后, 在当前窗口中对文件的更改不会导致重新读取该文件. 即: 更新被忽略.
3.5. Transformations on DStreams
对 DStreams 做转换, 与 RDD 相似, 转换允许修改输入 DStream 中的数据. DStream 支持普通 Spark RDD 上可用的许多转换. 一些常见的方法如下:
map (func) | 通过将源 DStream 的每个元素传递给函数 func 来处理并返回新的 DStream |
flatMap (func) | 与 map 类似,但是每个输入项可以映射到 0 个或多个输出项 |
filter (func) | 过滤 |
repartition (numPartitions) | 通过创建更多或更少的分区来更改此 DStream 中的并行度 |
union (otherStream) | 将源 DStream 和另一个 DStream 中的元素合并在一起,返回一个新的 DStream。相当于 SQL 中的 union |
count () | 返回元素的个数 |
reduce (func) | 通过使用函数 func(接受两个参数并返回一个)来聚合源 DStream 的每个 RDD 中的元素,从而返回一个单元素 RDD 的新 DStream。 |
countByValue () |
|
reduceByKey (func, [numTasks]) | 在一个(K,V)形式的 DStream 上调用时,返回一个新的(K,V)DStream,其中使用给定的 reduce 函数汇总每个键的值 |
join (otherStream, [numTasks]) | 在(K,V)和(K,W)两个 DStream 上调用时,返回一个新的(K,(V,W))DStream |
cogroup (otherStream, [numTasks]) | 在(K,V)和(K,W)DStream 上调用时,返回一个新的(K,Seq [V],Seq [W])元组的 DStream |
transform (func) | 通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数来返回新的 DStream。这可用于在 DStream 上执行任意 RDD 操作。 |
updateStateByKey (func) | 返回一个新的 “state” DStream |
其实, 这次操作跟 Java Stream 很像
Windows Operations(窗口操作)
Spark Streaming 还提供了窗口计算, 可以在数据的滑动窗口上应用转换. 下图说明了此滑动窗口:
如图所示, 每当窗口在源 DStream 上滑动时, 就会对落入窗口内的源 RDD 进行操作, 以生成窗口 DStream 的 RDD.
任何窗口函数所必须的两个参数:
窗口的长度
滑到的频率(或者说时间间隔)
举个例子, 我们来扩展前面的示例, 假设我们想要每 10 秒在数据的最后 30 秒生成一次单词次数统计. 为此, 必须在数据的最后 30 秒内对 (Word,1) 对的 DStream 对应用 reduceByKey 操作.
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.API.java.JavaDStream;
- import org.apache.spark.streaming.API.java.JavaPairDStream;
- import scala.Tuple2;
- JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
- JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
- // Reduce last 30 seconds of data, every 10 seconds
- JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
一些常见的窗口操作如下. 所有这些操作均采用上述两个参数: windowLength 和 slideInterval
window (windowLength, slideInterval) | 返回基于源 DStream 的窗口批处理计算的新 DStream |
countByWindow (windowLength, slideInterval) | 返回流中元素的滑动窗口数 |
reduceByWindow (func, windowLength, slideInterval) | 对窗口内的数据进行聚合操作 |
reduceByKeyAndWindow (func, windowLength, slideInterval, [numTasks]) | 在(K,V)DStream 上调用时,返回新的(K,V)DStream,其中使用给定的 reduce 函数 func 在滑动窗口中的批处理上汇总每个键的值 |
reduceByKeyAndWindow (func, invFunc, windowLength, slideInterval, [numTasks]) | |
countByValueAndWindow (windowLength, slideInterval, [numTasks]) |
3.6. Output Operations on DStreams
输出操作允许将 DStream 的数据输出到外部系统, 例如数据库或文件系统.
流式应用程序必须 24/7 全天候运行, 因此必须能够抵抗与应用程序逻辑无关的故障(例如, 系统故障, JVM 崩溃等). 为此, Spark Streaming 需要将足够的信息检查点指向容错存储系统, 以便可以从故障中恢复. 检查点有两种类型的数据.
元数据检查点 - 将定义流计算的信息保存到 HDFS 等容错存储中. 这用于从运行流应用程序的驱动程序的节点的故障中恢复.
数据检查点 - 将生成的 RDD 保存到可靠的存储中
完整代码:
- package com.example.demo;
- import org.apache.spark.SparkConf;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.API.java.JavaDStream;
- import org.apache.spark.streaming.API.java.JavaPairDStream;
- import org.apache.spark.streaming.API.java.JavaStreamingContext;
- import scala.Tuple2;
- import java.util.Arrays;
- import java.util.regex.Pattern;
- /**
- * @author ChengJianSheng
- */
- public class JavaWordCount {
- private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
- if (args.length <1) {
- System.err.println("Usage: JavaWordCount <file>");
- System.exit(1);
- }
- SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("JavaWordCount");
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
- JavaDStream<String> lines = jssc.textFileStream(args[0]);
- JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator());
- JavaPairDStream<String, Integer> ones = words.mapToPair(Word -> new Tuple2<>(Word, 1));
- JavaPairDStream<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
- counts.print();
- /*
- JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
- JavaDStream<String> textFileStream = jsc.textFileStream("/data");
- textFileStream.flatMap(line->Arrays.asList(line.split(" ")).iterator())
- .mapToPair(Word->new Tuple2<>(Word, 1))
- .reduceByKey((a,b)->a+b)
- .print();
- jsc.start();
- */
- }
- }
- 4. Docs
- https://spark.apache.org/
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
来源: https://www.cnblogs.com/cjsblog/p/12620974.html