1 foreachRDD
output operation 算子, 必须对抽取出来的 RDD 执行 action 类算子, 代码才能执行.
代码: 见上个随笔例子
2 transform
transformation 类算子
可以通过 transform 算子, 对 Dstream 做 RDD 到 RDD 的任意操作.
代码:
- /**
- * 过滤黑名单
- * transform 操作
- * DStream 可以通过 transform 做 RDD 到 RDD 的任意操作.
- * @author root
- *
- */
- public class TransformOperator {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.setMaster("local[2]").setAppName("transform");
- JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
- // 黑名单
- List<String> list = Arrays.asList("zhangsan");
- final Broadcast<List<String>> bcBlackList = jsc.sparkContext().broadcast(list);
- // 接受 socket 数据源
- JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999);
- JavaPairDStream<String, String> pairNameList =
- nameList.mapToPair(new PairFunction<String, String, String>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple2<String, String> call(String s) throws Exception {
- return new Tuple2<String, String>(s.split(" ")[1], s);
- }
- });
- /**
- * transform 可以拿到 DStream 中的 RDD, 做 RDD 到 RDD 之间的转换, 不需要 Action 算子触发, 需要返回 RDD 类型.
- * 注意: transform call 方法内, 拿到 RDD 算子外的代码 在 Driver 端执行, 也可以做到动态改变广播变量.
- */
- JavaDStream<String> transFormResult =
- pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
- private static final long serialVersionUID = 1L;
- @Override
- public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD)
- throws Exception {
- JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String,String>, Boolean>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Boolean call(Tuple2<String, String> tuple) throws Exception {
- return !bcBlackList.value().contains(tuple._1);
- }
- });
- JavaRDD<String> map = filter.map(new Function<Tuple2<String,String>, String>() {
- private static final long serialVersionUID = 1L;
- @Override
- public String call(Tuple2<String, String> tuple) throws Exception {
- return tuple._2;
- }
- });
- // 返回过滤好的结果
- return map;
- }
- });
- transFormResult.print();
- jsc.start();
- jsc.awaitTermination();
- jsc.stop();
- }
- }
- 3 updateStateByKey
transformation 算子
updateStateByKey 作用:
1) 为 SparkStreaming 中每一个 Key 维护一份 state 状态, state 类型可以是任意类型的, 可以是一个自定义的对象, 更新函数也可以是自定义的.
2) 通过更新函数对该 key 的状态不断更新, 对于每个新的 batch 而言, SparkStreaming 会在使用 updateStateByKey 的时候为已经存在的 key 进行 state 的状态更新.
使用到 updateStateByKey 要开启 checkpoint 机制和功能.
多久会将内存中的数据写入到磁盘一份?
如果 batchInterval 设置的时间小于 10 秒, 那么 10 秒写入磁盘一份. 如果 batchInterval 设置的时间大于 10 秒, 那么就会 batchInterval 时间间隔写入磁盘一份.
代码
- public class UpdateStateByKeyDemo {
- public static void main(String[] args) {
- /*
- * 第一步: 配置 SparkConf:
- * 1, 至少 2 条线程: 因为 Spark Streaming 应用程序在运行的时候, 至少有一条
- * 线程用于不断的循环接收数据, 并且至少有一条线程用于处理接受的数据(否则的话无法
- * 有线程用于处理数据, 随着时间的推移, 内存和磁盘都会不堪重负);
- * 2, 对于集群而言, 每个 Executor 一般肯定不止一个 Thread, 那对于处理 Spark Streaming 的
- * 应用程序而言, 每个 Executor 一般分配多少 Core 比较合适? 根据我们过去的经验, 5 个左右的
- * Core 是最佳的(一个段子分配为奇数个 Core 表现最佳, 例如 3 个, 5 个, 7 个 Core 等);
- */
- SparkConf conf = new SparkConf().setMaster("local[2]").
- setAppName("UpdateStateByKeyDemo");
- /*
- * 第二步: 创建 SparkStreamingContext:
- * 1, 这个是 SparkStreaming 应用程序所有功能的起始点和程序调度的核心
- * SparkStreamingContext 的构建可以基于 SparkConf 参数, 也可基于持久化的 SparkStreamingContext 的内容
- * 来恢复过来(典型的场景是 Driver 崩溃后重新启动, 由于 Spark Streaming 具有连续 7*24 小时不间断运行的特征,
- * 所有需要在 Driver 重新启动后继续上衣系的状态, 此时的状态恢复需要基于曾经的 Checkpoint);
- * 2, 在一个 Spark Streaming 应用程序中可以创建若干个 SparkStreamingContext 对象, 使用下一个 SparkStreamingContext
- * 之前需要把前面正在运行的 SparkStreamingContext 对象关闭掉, 由此, 我们获得一个重大的启发 SparkStreaming 框架也只是
- * Spark Core 上的一个应用程序而已, 只不过 Spark Streaming 框架箱运行的话需要 Spark 工程师写业务逻辑处理代码;
- */
- JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
- // 报错解决办法做 checkpoint, 开启 checkpoint 机制, 把 checkpoint 中的数据放在这里设置的目录中,
- // 生产环境下一般放在 HDFS 中
- jsc.checkpoint("/usr/local/tmp/checkpoint");
- /*
- * 第三步: 创建 Spark Streaming 输入数据来源 input Stream:
- * 1, 数据输入来源可以基于 File,HDFS,Flume,Kafka,Socket 等
- * 2, 在这里我们指定数据来源于网络 Socket 端口, Spark Streaming 连接上该端口并在运行的时候一直监听该端口
- * 的数据(当然该端口服务首先必须存在), 并且在后续会根据业务需要不断的有数据产生(当然对于 Spark Streaming
- * 应用程序的运行而言, 有无数据其处理流程都是一样的);
- * 3, 如果经常在每间隔 5 秒钟没有数据的话不断的启动空的 Job 其实是会造成调度资源的浪费, 因为并没有数据需要发生计算, 所以
- * 实例的企业级生成环境的代码在具体提交 Job 前会判断是否有数据, 如果没有的话就不再提交 Job;
- */
- JavaReceiverInputDStream lines = jsc.socketTextStream("hadoop100", 9999);
- /*
- * 第四步: 接下来就像对于 RDD 编程一样基于 DStream 进行编程!!! 原因是 DStream 是 RDD 产生的模板(或者说类), 在 Spark Streaming 具体
- * 发生计算前, 其实质是把每个 Batch 的 DStream 的操作翻译成为对 RDD 的操作!!!
- * 对初始的 DStream 进行 Transformation 级别的处理, 例如 map,filter 等高阶函数等的编程, 来进行具体的数据计算
- * 第 4.1 步: 讲每一行的字符串拆分成单个的单词
- */
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { // 如果是 Scala, 由于 SAM 转换, 所以可以写成 val words = lines.flatMap { line => line.split(" ")}
- @Override
- public Iterable<String> call(String line) throws Exception {
- return Arrays.asList(line.split(" "));
- }
- });
- /*
- * 第四步: 对初始的 DStream 进行 Transformation 级别的处理, 例如 map,filter 等高阶函数等的编程, 来进行具体的数据计算
- * 第 4.2 步: 在单词拆分的基础上对每个单词实例计数为 1, 也就是 Word => (Word, 1)
- */
- JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String Word) throws Exception {
- return new Tuple2<String, Integer>(Word, 1);
- }
- });
- /*
- * 第四步: 对初始的 DStream 进行 Transformation 级别的处理, 例如 map,filter 等高阶函数等的编程, 来进行具体的数据计算
- * 第 4.3 步: 在这里是通过 updateStateByKey 来以 Batch Interval 为单位来对历史状态进行更新,
- * 这是功能上的一个非常大的改进, 否则的话需要完成同样的目的, 就可能需要把数据保存在 Redis,
- * Tagyon 或者 HDFS 或者 HBase 或者数据库中来不断的完成同样一个 key 的 State 更新, 如果你对性能有极为苛刻的要求,
- * 且数据量特别大的话, 可以考虑把数据放在分布式的 Redis 或者 Tachyon 内存文件系统中;
- * 当然从 Spark1.6.x 开始可以尝试使用 mapWithState,Spark2.X 后 mapWithState 应该非常稳定了.
- */
- JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { // 对相同的 Key, 进行 Value 的累计(包括 Local 和 Reducer 级别同时 Reduce)
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state)
- throws Exception {
- Integer updatedValue = 0 ;
- if(state.isPresent()){
- updatedValue = state.get();
- }
- for(Integer value: values){
- updatedValue += value;
- }
- return Optional.of(updatedValue);
- }
- });
- /*
- * 此处的 print 并不会直接出发 Job 的执行, 因为现在的一切都是在 Spark Streaming 框架的控制之下的, 对于 Spark Streaming
- * 而言具体是否触发真正的 Job 运行是基于设置的 Duration 时间间隔的
- * 诸位一定要注意的是 Spark Streaming 应用程序要想执行具体的 Job, 对 Dtream 就必须有 output Stream 操作,
- *output Stream 有很多类型的函数触发, 类 print,saveAsTextFile,saveAsHadoopFiles 等, 最为重要的一个
- * 方法是 foraeachRDD, 因为 Spark Streaming 处理的结果一般都会放在 Redis,DB,DashBoard 等上面, foreachRDD
- * 主要就是用用来完成这些功能的, 而且可以随意的自定义具体数据到底放在哪里!!!
- */
- wordsCount.print();
- /*
- * Spark Streaming 执行引擎也就是 Driver 开始运行, Driver 启动的时候是位于一条新的线程中的, 当然其内部有消息循环体, 用于
- * 接受应用程序本身或者 Executor 中的消息;
- */
- jsc.start();
- jsc.awaitTermination();
- jsc.close();
- }
4 窗口操作
窗口操作理解图:
假设每隔 5s 1 个 batch, 上图中窗口长度为 15s, 窗口滑动间隔 10s.
窗口长度和滑动间隔必须是 batchInterval 的整数倍. 如果不是整数倍会检测报错.
优化后的 Windows 窗口操作示意图:
优化后的 Windows 操作要保存状态所以要设置 checkpoint 路径, 没有优化的 Windows 操作可以不设置 checkpoint 路径.
代码:
- /**
- * 基于滑动窗口的热点搜索词实时统计
- * @author root
- *
- */
- public class WindowOperator {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("WindowHotWord");
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
- /**
- * 设置日志级别为 WARN
- *
- */
- jssc.sparkContext().setLogLevel("WARN");
- /**
- * 注意:
- * 没有优化的窗口函数可以不设置 checkpoint 目录
- * 优化的窗口函数必须设置 checkpoint 目录
- */
- // jssc.checkpoint("hdfs://node1:9000/spark/checkpoint");
- jssc.checkpoint("./checkpoint");
- JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node5", 9999);
- JavaDStream<String> Windows = searchLogsDStream.Windows(Durations.seconds(15), Durations.seconds(5));
- //Word 1
- JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public Iterable<String> call(String t) throws Exception {
- return Arrays.asList(t.split(" "));
- }
- });
- // 将搜索词映射为 (searchWord, 1) 的 tuple 格式
- JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair(
- new PairFunction<String, String, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple2<String, Integer> call(String searchWord)
- throws Exception {
- return new Tuple2<String, Integer>(searchWord, 1);
- }
- });
- /**
- * 每隔 10 秒, 计算最近 60 秒内的数据, 那么这个窗口大小就是 60 秒, 里面有 12 个 rdd, 在没有计算之前, 这些 rdd 是不会进行计算的.
- * 那么在计算的时候会将这 12 个 rdd 聚合起来, 然后一起执行 reduceByKeyAndWindow 操作 ,
- * reduceByKeyAndWindow 是针对窗口操作的而不是针对 DStream 操作的.
- */
- // JavaPairDStream<String, Integer> searchWordCountsDStream =
- //
- // searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
- //
- // private static final long serialVersionUID = 1L;
- //
- // @Override
- // public Integer call(Integer v1, Integer v2) throws Exception {
- // return v1 + v2;
- // }
- // }, Durations.seconds(15), Durations.seconds(5));
- /**
- * Windows 窗口操作优化:
- */
- JavaPairDStream<String, Integer> searchWordCountsDStream =
- searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- },new Function2<Integer, Integer, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 - v2;
- }
- }, Durations.seconds(15), Durations.seconds(5));
- searchWordCountsDStream.print();
- jssc.start();
- jssc.awaitTermination();
- jssc.close();
- }
- }
spark 记录(0)SparkStreaming 算子操作
来源: http://www.bubuko.com/infodetail-2976393.html