图片来源于网络
场景
在工业互联网场景中, 工厂的现场积累了很多从设备上采集到的历史数据, 可以用于数据模型的训练, 同时通过现场的 OPC Server 和 Modbus Master 可以连接设备上的传感器进行实时数据的采集, 将采集到的实时流数据应用到训练的模型, 用于现场的设备故障预测. 本文主要讨论基于 Spark Structured Streaming 进行模型应用的实际应用.
方案
基于工厂的实际情况, 历史数据基于 Spark 的 ML 库进行模型的训练和模型的存储, 采集的实时数据会汇总到 Kafka 中, 同时使用 Spark Structured Streaming 加载存储的数据模型进行模型应用, 最终的预测数据重新写回到 Kafka 中, 通过 websocket 协议推送到前端进行可视化显示.
选择使用 Spark Streaming, 主要三个原因:
1, 由于 Spark 的 2.4 版本已经不再进行 Spark Streaming 的更新, 2.4 版本对于流式处理方面特性都是基于 Structured Streaming 的更新
2,Structured Streaming 提供了基于 Dataset/Dataframe 的统一 API, 使用更友好, 与 ML 库的结合更便捷.
3,Structured Streaming 不在是微批次的进行数据的处理, 可以进行毫秒级的实时数据处理.
实践
资源准备
- ,Kafka 2.12
- ,Spark 2.4
- ,Scala 2.11
4,CentOS 7.4 64 位
具体代码
创建 SparkSession, 建立与 Spark 集群的建立, 对于生产环节推荐使用集群模式的 Spark 集群, 最少也要是 Standalone 的方式.
SparkSessionspark = SparkSession.builder().master("spark://master:7077").appName("ModelApplicationTask").getOrCreate();
建立 source 完成与 kafka 的连接, 基于 Topic 进行数据的消费, 获取到的 DataSet 的 Scheml 格式如下:
Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers",bootstrapServers).option("subscribe",topic).option("startingOffsets","earliest").load();
整理 Dataset 中的数据, 生成模型应用需要的特征列
Dataset data = df.withColumn("splitcol",split(col("value"),",")).select(col("splitcol").getItem(0).as("dpname"),col("splitcol").getItem(1).as("time"),col("splitcol").getItem(2).as("dptype"),col("splitcol").getItem(3).as("dpvalue")).drop("splitcol");
根据数据点名称, 过滤数据集中的数据, 生成用于预测的数据集
- Dataset filterData = data.where("dpname ='"+dataPointName+"'");
- Dataset predict_data = filterData.selectExpr("CAST(time AS STRING)","CAST(dpname AS STRING)","CAST(dptype AS STRING)","CAST(dpvalue AS DOUBLE)");
- VectorAssembler assembler_for_predict =newVectorAssembler().setInputCols(newString[]{
- "dpvalue"
- }).setOutputCol("features");
- Dataset<Row> vecDF_for_predict = assembler_for_predict.transform(predict_data);
加载 hdfs 或本地磁盘加载训练后的模型, 如果没有 hdfs 环境, 可以通过 Spark 集群广播的方式将本地加载的模型, 广播到 Spark 集群中.
LinearRegressionModellrModel = LinearRegressionModel.load(modelPath);
设置模型特征列, 应用模型进行预测
Datasetprediction = lrModel.transform(vecDF_for_predict);
将预测结果, 重新组合成逗号分隔的字符串, 写回到 Kafka 中
- String separator =",";
- StreamingQuery ds = prediction.select(concat_ws(separator, col("dpname"), col("time"), col("dptype"), col("prediction")).cast("STRING").as("value")).writeStream().option("checkpointLocation", checkpointLocation).format("kafka").option("kafka.bootstrap.servers",bootstrapServers).option("topic","test").start();
- ds.awaitTermination();
打包成 jar 包, 通过 submit 提交到 spark 集群
sudobin/spark-submit --master spark://master:7077 --name test --total-executor-cores1--executor-memory600M--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0--class ModelApplicationTask ./jars/TestSpark.jar
通过消费 kakfa 中数据验证, 模型是否应用成功.
./kafka-console-consumer.sh --Bootstrap-server172.16.22.210:9092--topic test
问题
问题 1:java.lang.AssertionError: assertion failed: Concurrent update to thelog. Multiple streamingjobsdetectedfor1936
原因: 主要是 checkpointLocation 目录一样了, 导致更新 log 是出错啦
解决办法: 一个提交的 App 占用一个独立的 checkpointLocation 目录
问题 2:Please deploy the applicationasper the deployment sectionof"Structured Streaming + Kafka Integration Guide".
原因: 官方提供的 Spark 2.4 版本发布包的 jars 目录中缺少 kafka 的包
解决办法: 在 / spark-submit 是加入以上参数 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
问题 3:WARN TaskSchedulerImpl: Initial job has not accepted any resources;checkyour cluster UItoensure that workersareregisteredandhave sufficient resources
原因: Spark Structured Streaming 采用预分配资源的方式, 不支持动态资源调度, 长期占用系统资源, 无法最大化利用系统硬件资源
解决方法: 通过 total-executor-cores 和 executor-memory 参数, 限定每个任务占用的 core 和内存大小
总结
Spark Structured Streaming 结合 Kafka 数据源, 应用 ML 机器学习库可以快捷的实现数据模型的流式应用, 但是采用的预分配硬件资源的方式, 将长期占用系统资源, 无法最大化利用 Spark 集群的资源, 需要结合具体的情况进行权衡后选择.
来源: http://www.jianshu.com/p/b7db4d26361b