今天来和大家聊一下如何使用 Spark SQL 进行流式数据的机器学习处理. 本文主要分为以下几个章节:
什么是流式机器学习
机器学习模型获取途径
系统演示
1. 什么是流式机器学习
通常, 当我们听到有人提到实时数据机器学习时, 其实他们是讨论:
他们希望有一个模型, 这个模型利用最近历史信息来进行预测分析. 举一个天气的例子, 如果最近几天都是晴天, 那么未来几天极小概率会出现雨雪和低温天气
这个模型还需要是可更新的. 当数据流经系统时, 模型是可以随之进化升级. 举个例子, 随着业务规模的扩大, 我们希望零售销售模型仍然保持准确.
第一个例子我们可以将它归为时序预测. 第二个例子中, 模型需要更新或者重新训练, 这是一个 non-stationarity 问题. 时序预测和 non-stationarity 数据分布是两类不同的问题. 本文主要关注第二类问题, 对于这类问题, 一般的解决方案主要有:
增量式算法: 有一些算法支持通过数据逐步学习. 也就是说, 每次进来一些新的数据时, 模型会被更新. SVM, 神经网络等算法都有增量式版本, 此外贝叶斯网络也可以用作增量学习.
周期重新学习: 一个更加直接的方法就是用一批最新数据重新训练我们的模型. 这种方法可以用到的绝大多数的算法上.
2. 机器学习模型获取途径
实时机器学习应用分成两块, 一部分是模型实时训练, 另一部分是数据实时预测分析. 现实中, 我们可能没法实现模型的实时训练, 只能退而求其次地使用已经训练好模型. 这些模型可能会周期性地使用历史数据训练更新一次. 所以, 我们可以根据实际的算法和模型时效性要求, 来选择实时训练模型还是使用预训练好的模型.
模型算法支持增量训练: 可以选择用流式数据实时训练更新
模型算法不自持增量训练: 可以选择用离线数据预先训练好模式
回到主题上, 我们要实现使用 Spark SQL 进行流式机器学习. 前面几篇文章已经简单介绍了 EMR 如何使用 Spark SQL 进行流式 ETL 处理. 既然要进行机器学习, 我们很自然地想到 Spark MLlib.DataBricks 有篇文档介绍了在 Spark Structured Streaming 进行机器学习, 大家有兴趣的可以看下. 如果想将 Spark MLlib 应用到 Spark SQL 上, 我们可以简单地将 MLlib 算法包装成 UDF 使用. 另外一个模型获取途径是利用阿里云上的一些在线机器学习服务, 我们可以将在线机器学习服务使用 UDF 封装后使用.
使用 UDF 封装现有的 Spark MLlib 算法
使用 UDF 封装阿里云在线机器学习服务
限于篇幅, 我会分两篇文章分别介绍这两个方式, 本文将简单介绍如何利用 Spark MLlib 进行流式机器学习.
3. 系统演示
本节, 我们将演示一下如何利用逻辑回归算法进行演示.
3.1 系统架构
下面这张图展示了整个实时监测系统的架构, 前端接 LogService 数据, 实时监测分析结果写入到 RDS, 最后通过 DataV 展示出来.
3.2 测试数据集
测试数据集使用 Spark 自带的 sample_libsvm_data.txt, 我们要做的是写一个数据生成器, 将数据集的数据不断地向 SLS 中发送, 模拟流式数据.
算法模型准备
Spark MLlib 提供了大量的机器学习算法实现, 可以方便的再 RDD 或者 DataFrame API 上使用, 但是无法直接用在 SQL API 上, 所以我们需要使用 UDF 来封装一下. 这里, 我们选用逻辑回归算法, 具体的实现就不细说了, 可以参考这里的代码: LogisticRegressionUDF.scala
3.4 部署测试
- CLI
- ## emr datasources 包还没有发布, 需要手动编译出来
- Git clone Git@GitHub.com:aliyun/aliyun-emapreduce-sdk.Git
- cd aliyun-emapreduce-sdk
- Git checkout -b master-2.x origin/master-2.x
- mvn clean package -DskipTests
- ## 编译完后, assembly/target 目录下会生成 emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar,
- spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar --driver-class-path emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar
建表
- spark-sql> CREATE DATABASE IF NOT EXISTS default;
- spark-sql> USE default;
-- 测试数据源
- spark-sql> CREATE TABLE IF NOT EXISTS sls_dataset
- USING loghub
- OPTIONS (
- sls.project = "${logProjectName}",
- sls.store = "${logStoreName}",
- access.key.id = "${accessKeyId}",
- access.key.secret = "${accessKeySecret}",
- endpoint = "${endpoint}");
- spark-sql> DESC sls_dataset
- __logProject__ string NULL
- __logStore__ string NULL
- __shard__ int NULL
- __time__ timestamp NULL
- __topic__ string NULL
- __source__ string NULL
- label string NULL
- features string NULL
- __tag__hostname__ string NULL
- __tag__path__ string NULL
- __tag__receive_time__ string NULL
- Time taken: 0.058 seconds, Fetched 11 row(s)
-- 结果数据源
- spark-sql> CREATE TABLE IF NOT EXISTS rds_result
- USING jdbc2
- OPTIONS (
- url="${rdsUrl}",
- driver="com.mysql.jdbc.Driver",
- dbtable="${rdsTableName}",
- user="${user}",
- password="${password}",
- batchsize="100",
- isolationLevel="NONE");
- spark-sql> DESC rds_result;
- acc double NULL
- label double NULL
- time string NULL
- Time taken: 0.457 seconds, Fetched 3 row(s)
提交执行
- SET streaming.query.name=lr_prediction;
- SET spark.sql.streaming.checkpointLocation.lr_prediction=hdfs:///tmp/spark/lr_prediction;
- SET spark.sql.streaming.query.outputMode.lr_prediction=update;
-- 由于 DataSource 是基于 JDBC 实现的, 所以我们需要设置向 RDS 表插入数据的 SQL
-- 这里我的 RDS 表名是 `result`
- SET streaming.query.lr_prediction.sql=insert into `result`(`time`, `label`, `acc`) values(?, ?, ?);
- INSERT INTO
- rds_result
- SELECT
- Windows.start,
- label,
- sum(if(tb.predict = tb.label, 1, 0)) / count(tb.label) as acc
- FROM(
- SELECT
- default.Logistic_Regression("${LR_model_path}", concat_ws(" ", label, features)) as predict,
- label,
- __time__ as time
- FROM sls_dataset) tb
- GROUP BY TUMBLING(tb.time, interval 10 second), tb.label;
3.5 效果展示
在 DataV 中配置上面的 RDS 结果表, 使用折线图查看 label=1 的预测准确率, 如下:
4. 小结
本文简要介绍了流式机器学习面临的几个问题, 以及相应的解决方法. 并使用 Spark SQL 结合 Spark MLlib 演示了一个流式机器学习的案例. 下一篇, 我会简要介绍 Spark SQL 如何结合阿里云的在线机器学习服务来进行流式机器学习应用开发.
来源: https://yq.aliyun.com/articles/706563