作者: 木艮, 阿里云 E-MapReduce 开发工程师
我在前一篇文章介绍过基于 Spark SQL 实现对 HDFS 操作的实时监控报警. 今天, 我再举例说明一下如何使用 Spark SQL 进行流式应用的开发. 本文主要分成三部分:
流式计算和 SQL
简要介绍 Spark SQL 流式开发语法
实时归档 SLS 数据到 HDFS
1. 流式计算和 SQL
数据的价值随着时间逐渐降低. 及时尽早的对数据进行处理提升了数据的价值, 所以流式计算系统的应用也越来越广泛. 目前常用的流式计算框架有 Storm,Spark Streaming 及 Flink 等, 也有 Kafka Streams 这类基于 Kafka 的流式处理类库. 各种流式处理框架都有其各自的 API, 开发者不可避免的需要学习如何使用这些 API. 如何提供简单而有效的开发工具, 从而把更多的精力投放在业务处理中. 所以, 各个流式处理系统都逐渐支持 SQL API 作为开发语言, 让使用者可以像处理 Table 一样处理 Stream. 例如 KSQL 支持使用 SQL 进行流式处理 Kafka 数据. Spark 同样提出来 Structured Streaming 作为最新一代的流式处理系统, 底层的处理引擎也是 Spark SQL. 不过在上层 SQL API, 缺少 Structured Streaming 必要的功能, 例如 Windows,watermark 等. EMR 在 Spark 开源版本上进行了功能扩展, 支持使用 SQL API 在 Spark 上进行完整的流式查询开发.
2. Spark SQL 流式开发入门
这节将简要介绍 Spark SQL 中关于流式开的概念和语法.
2.1 建表
当我们需要对流式数据源进行读写操作时, 需要首先创建一张表来表示这个数据源. 定义表的语法如下:
- CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
- USING providerName
- OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);
以上语法中, 针对特殊 source, 不要求一定指定表的列定义. 当不指定列定义时, 会自动识别数据源的 schema 信息. 举一个例子:
- CREATE TABLE driver_behavior
- USING kafka
- OPTIONS (
- kafka.Bootstrap.servers = "${BOOTSTRAP_SERVERS}",
- subscribe = "${TOPIC_NAME}",
- output.mode = "${OUTPUT_MODE}",
- kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
- kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
- kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
当数据源是 Kafka 时, 会根据 Kafka Topic 名去到 Kafka Schema Registry 中查找 schema 信息. 当然, 我们也可以指定列定义, 例如:
- CREATE TABLE driverbehavior(deviceId string, velocity double)
- USING kafka
- OPTIONS (
- kafka.Bootstrap.servers = "${BOOTSTRAP_SERVERS}",
- subscribe = "${TOPIC_NAME}",
- output.mode = "${OUTPUT_MODE}",
- kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
- kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
- kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
当指定列定义时, 要求必须和 Source 中的字段定义是一致的. 当执行完 CREATE TABLE 操作, 表的定义会保存到 Hive MetaStore 中.
2.2 CTAS
我们可以将创建表和将查询结果写入到表的语句合并到一起, 那么就是 CREATE TABLE ... AS SELECT ... 语法:
- CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
- USING providerName
- OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
- AS
- queryStatement;
举一个例子 (引用自这里: q103):
- CREATE TABLE kafka_temp_table
- USING kafka
- OPTIONS (
- kafka.Bootstrap.servers = "${BOOTSTRAP_SERVERS}",
- subscribe = "${TOPIC_NAME}",
- output.mode = "${OUTPUT_MODE}",
- kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
- kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
- kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
- SELECT
- i_brand_id brand_id,
- i_brand brand,
- sum(ss_ext_sales_price) ext_price
- FROM date_dim, kafka_store_sales, item
- WHERE d_date_sk = ss_sold_date_sk
- AND ss_item_sk = i_item_sk
- AND i_manager_id = 28
- AND d_moy = 11
- AND d_year = 1999
- AND delay(ss_data_time) < '2 minutes'
- GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id
当执行完操作, 将创建出表并实际生成一个 StreamQuery 实例, 将查询结果写入到结果表中.
2.3 DML
流式查询 SQL 和离线 SQL 标准语法大部分是一样, 这边主要介绍 insert 操作. 流式查询是不允许单独进行 SELECT 操作, 必须将 SELECT 的查询结果写入到表中. 所以, 需要在 SELECT 操作之前执行 INSERT 操作.
- INSERT INTO tbName[(columnName[,columnName]*)]
- queryStatement;
以上语法为一次流式查询: 这个语句将实际生成一个 StreamQuery 实例, 将查询结果写入到结果表中. 举一个例子:
- INSERT INTO kafka_temp_table
- SELECT
- i_brand_id brand_id,
- i_brand brand,
- sum(ss_ext_sales_price) ext_price
- FROM date_dim, kafka_store_sales, item
- WHERE d_date_sk = ss_sold_date_sk
- AND ss_item_sk = i_item_sk
- AND i_manager_id = 28
- AND d_moy = 11
- AND d_year = 1999
- AND delay(ss_data_time) < '2 minutes'
- GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id
2.4 Windows 及 watermark
限于篇幅, 本文暂且不介绍 Spark SQL 中如何使用 Windows 和 watermak, 有兴趣的可以先看看资料, 后续会专门撰文介绍.
2.5 流式作业配置
使用 SQL 进行流式作业开发时, 有些必要的配置无法在 Query 表达出来, 需要单独进行设置. 这里我们使用 SET 操作进行流式作业必要参数配置, 当前有两个参数需要设置:
每一个流式查询实例前都需要进行配置, 也就是说, 当使用 CTAS 或者 Insert 操作时, 必须前置这两个配置. 一个 SQL 文件支持多个流式查询, 例如:
- -- test.sql
- SET streaming.query.name=query1;
- SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
- INSERT INTO tbName1 [(columnName[,columnName]*)]
- queryStatement1;
- SET streaming.query.name=query2;
- SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
- INSERT INTO tbName2 [(columnName[,columnName]*)]
- queryStatement2;
3. SLS 数据实时归档实战
假定一个场景, 现在通过 SLS 收集了业务服务器上的日志, 需要归档到 HDFS 中, 便于后续进行离线分析. 这里涉及到两个数据源: SLS 和 HDFS.HDFS 是 Spark 官方支持的数据源, 支持流和批的读写. SLS 是阿里云的服务, EMR 已经支持了流式读写.
环境准备
需要 E-MapReduce 3.21.0 以上版本集群环境, 当前正在发布准备中, 很快和大家见面, 敬请期待.
命令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar
注: emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar 将会在 EMR SDK 1.7.0 版本发布出来.
分别创建两张表: sls_service_log 和 hdfs_service_log
- CREATE DATABASE IF NOT EXISTS default;
- USE default;
- DROP TABLE IF EXISTS hdfs_service_log;
- CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
- USING PARQUET
- LOCATION '/tmp/hdfs_service_log';
- DROP TABLE IF EXISTS sls_service_log;
- CREATE TABLE sls_service_log
- USING loghub
- OPTIONS (
- sls.project = "${logProjectName}",
- sls.store = "${logStoreName}",
- access.key.id = "${accessKeyId}",
- access.key.secret = "${accessKeySecret}",
- endpoint = "${endpoint}");
- 通过 Spark SQL 启动一个 Stream Query 将 SLS 数据实时同步到 HDFS 中
- set streaming.query.name=sync_sls_to_hdfs;
- set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;
- INSERT INTO hdfs_service_log
- select
- __tag__hostname__ as instance_name,
- ip,
- content
- from sls_service_log;
查看 HDFS 数据归档情况
image
使用 Spark SQL 对归档的数据进行离线分析: 例如统计一共有多少个 IP
select distinct(ip) from hdfs_service_log;
4. 结语
以上, 我们介绍了 Spark SQL 在流式处理中的一个非常简单的例子. 其实, 我们还可以使用 Spark SQL 进行更加复杂的流式处理任务. 后续文章, 我将介绍窗口操作, watermark 等概念, 以及如何在流式数据上进行简单的机器学习运算.
来源: https://yq.aliyun.com/articles/705649