本文要点
针对客户操作, 操作仪表板, 在线分析等应用场景, 使用 Apache Kafka 和 KSQL 构建数据集成和处理应用程序.
流处理的主要好处包括: 数据增强一次性完成, 低延迟处理, 向客户 Ops 团队实时发送通知.
你可以使用变化数据捕获 (CDC) 工具把数据库的数据以及任何后续的变化镜像到 Kafka 主题.
使用 KSQL, 很容易把业务数据流和源自数据库, 在 Kafka 主题中维护的相关信息合流.
扩展应用程序使其可以处理更多的通知, 而不必修改过滤逻辑.
这是文章 "使用 Apache Kafka 和 KSQL 实现流处理普及化" 的第二部分. 第一部分在这里.
在本文中, 我们将介绍如何使用 Apache Kafka® 和 KSQL 构建数据集成和处理应用程序. 这是一个来自电商领域的简单示例: 在一个网站上, 通过一系列事件跟踪用户评论. 关于这些用户的信息, 如姓名, 联系方式, 尊贵客户俱乐部资深会员, 保存在数据库的某个地方. 对于这类评论数据, 至少有三种用途:
客户操作 -- 如果一个尊贵客户俱乐部资深会员留下了差评, 我们希望可以马上做些事情, 降低流失这类客户的风险. 我们希望应用程序在出现满足此条件的评论时立即通知我们. 这样, 我们就可以马上为客户提供服务, 这远远好于我们等待一段时间后才运行的批处理为我们标记出需要联系的用户.
操作仪表板 实时展示评论输入, 滚动聚合, 计算数量和平均分数, 等等, 并按用户区域划分.
结合其他数据(不论在数据湖中, 还是在数据仓库中) 在线分析 分析评论数据. 这可以扩展到更广泛的数据科学实践和机器学习应用. 所有这些都需要访问评论信息及用户的详细信息.
我们将介绍下如何使用一种更为现代化的模式基于流平台实现上述功能. 我们将使用开源项目 Apache Kafka 和 KSQL 来实现. KSQL 是一个面向 Apache Kafka 的流 SQL 引擎, 基于 Kafka Streams API 实现, 后者是 Apache Kafka 的一个组成部分.
下图展示了流应用程序示例的工作原理.
图 1. 流数据应用程序
事件是用户提交到网站的评论, 它们被以流的方式直接传递给 Kafka. 从这里, 它们可以实时和用户信息联系起来, 经过充实的结果数据会写回到 Kafka. 转换完成后, 这些数据就可以用于驱动上述应用和目标了. 转换逻辑只需要执行一次. 数据一次性从源系统提取. 转换后的数据可以供不相关的应用程序多次使用. 不用对现有组件做任何修改, 就可以添加新的源和目标. 所有这些操作的延迟都非常低.
因此, 高层设计是这样的:
web 应用直接向 Kafka 发送评论;
Kafka Connect 把数据库用户数据快照以流的方式发送给 Kafka, 并且直接与 CDC 保持同步;
流处理把用户数据添加到评论事件, 并写回到一个新的 Kafka 主题;
流处理会针对 VIP 用户的差评筛选出充实后的 Kafka 主题, 并写入一个新的 Kafka 主题;
事件驱动应用会监听 Kafka 主题, 在 VIP 用户留下差评后立即推送通知;
Kafka Connect 把数据以流的方式传入 Elasticsearch, 供操作仪表板使用;
Kafka Connect 把数据以流的方式传入 S3, 供长期在线分析使用以及和其他数据集一起使用.
其中, 主要好处包括:
数据增强一次性完成, 可供任何应用程序消费;
数据处理延迟低;
可以在 VIP 客户留下差评后立即通知客户 Ops 团队 -- 提供更好的客户体验, 增加业务保留机会;
容易扩展, 可以按需增加新节点, 实现更大的吞吐量.
实现
让我们看一下构建这个应用程序的详细过程. GitHub https://GitHub.com/confluentinc/infoq-kafka-ksql 上提供了所有示例的代码以及 docker-compose 文件.
把数据写入 Kafka
Web 应用程序有多种方式可以使事件流入 Kafka.
许多客户端库都提供了 Producer API , 面向的语言包括 Java,.NET,Python,C/C++,Go,node.JS 等;
开源 REST 代理 , 可以发起 HTTP 调用把数据发送到 Kafka.
在我们的例子中, 应用程序使用了 Producer API.
Web 应用程序发送给 Kafka 主题 "评级(ratings)" 的消息格式如下:
- {
- "rating_id": 604087,
- "user_id": 7,
- "stars": 1,
- "route_id": 2777,
- "rating_time": 1528800546808,
- "channel": "Android",
- "message": "thank you for the most friendly, helpful experience today at your new lounge"
- }
使 Kafka 可以访问数据库中的数据
在构建应用程序的时候, 经常需要使用存储在数据库中的数据. 在我们的例子中, 用户数据保存在 MySQL 中, 不过, 设计模式都是一样的, 与采用哪种具体的 RDBMS 技术无关.
在使用 Kafka 编写流处理应用程序时, 集成保存在数据库中的数据的标准方法是, 确保数据本身在 Kafka 中保存和维护. 这比听上去简单 -- 我们只需要使用数据变化捕获 (CDC) 工具把数据库中的数据和任何后续的变化镜像到一个 Kafka 主题.
这样做的好处是隔离了数据库和流处理. 这主要有两个好处: 数据库不会因为我们的请求增加开销, 我们可以自由使用我们选取的数据, 而又不会使我们的开发和部署流程和数据库所有者的相耦合.
CDC 技术和工具不止一种 , 我们这里就不介绍了. 由于数据在 MySQL 中, 我们使用 Debezium http://debezium.io/ 项目作为我们的 CDC 工具. 它会把用户表的内容快照到 Kafka, 并使用 MySQL 的 binlog 即时检测后续 MySQL 中数据的变化并复制到 Kafka.
图 2 详细展示了数据变化捕获过程的数据流动.
图 2. 流应用程序变化数据捕获
从数据库流出, 流入 Kafka 主题 asgard.demo.CUSTOMERS 的消息格式如下:
- {
- "id": 1,
- "first_name": "Rica",
- "last_name": "Blaisdell",
- "email": "rblaisdell0@rambler.ru",
- "gender": "Female",
- "club_status": "bronze",
- "comments": "Universal optimal hierarchy",
- "create_ts": "2018-06-12T11:47:30Z",
- "update_ts": "2018-06-12T11:47:30Z",
- "messagetopic": "asgard.demo.CUSTOMERS",
- "messagesource": "Debezium CDC from MySQL on asgard"
- }
使用数据库信息充实事件流
使用 KSQL, 很容易就可以把源于数据库, 在 Kafka 主题中维护的相关信息合并到评级中.
合并细节如图 3 所示:
第一步是确保客户主题中的消息以关联列为键, 在这个例子中是客户 ID. 我们实际上可以使用 KSQL 进行重新分区. KSQL CREATE STREAM 的输出被写入一个 Kafka 主题, 在默认情况下, 会以流本身的名称命名:
-- 处理流中所有现有的数据以及将来的数据
SET 'auto.offset.reset' = 'earliest';
-- 声明源流
- CREATE STREAM CUSTOMERS_SRC WITH \
- (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
-- 在 ID 列上重新分区, 设置目标主题, 使其分区数量与作为源的评级主题一致:
- CREATE STREAM CUSTOMERS_SRC_REKEY WITH (PARTITIONS=1) AS \
- SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;
现在, 到达 asgard.demo.CUSTOMERS 主题的每条信息都将写入正确设置了消息键的 Kafka 主题 CUSTOMERS_SRC_REKEY . 注意, 我们不一定要声明任何模式, 因为我们在使用 Avro.KSQL 和 Kafka Connect 都无缝集成了开源的 Confluent Schema Registry, 序列化 / 反序列化 Avro 数据, 并在 Schema Registry 中保存 / 检索模式.
为了进行合并, 我们使用标准的 SQL 联合查询语法:
-- 把 CUSTOMER 注册为一张 KSQL 表,
-- 源自重新分区后的主题
- CREATE TABLE CUSTOMERS WITH \
- (KAFKA_TOPIC='CUSTOMERS_SRC_REKEY', VALUE_FORMAT ='AVRO', KEY='ID');
-- 把 RATINGS 数据注册到一个 KSQL 流, 源自 ratings 主题
CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
-- 执行联合查询, 写入新主题 -- 注意, 主题名称是显式设置的.
-- 如果移除 KAFKA_TOPIC 参数, 那么目标主题将使用所创建的流或者表的名称
- CREATE STREAM RATINGS_ENRICHED WITH \
- (KAFKA_TOPIC='ratings-with-customer-data', PARTITIONS=1) AS \
- SELECT R.RATING_ID, R.CHANNEL, R.STARS, R.MESSAGE, \
- C.ID, C.CLUB_STATUS, C.EMAIL, \
- C.FIRST_NAME, C.LAST_NAME \
- FROM RATINGS R \
- LEFT JOIN CUSTOMERS C \
- ON R.USER_ID = C.ID \
- WHERE C.FIRST_NAME IS NOT NULL ;
我们可以查看这条查询处理的消息数量:
- ksql> DESCRIBE EXTENDED RATINGS_ENRICHED;
- Name : RATINGS_ENRICHED
- Type : STREAM
- Key field : R.USER_ID
- Key format : STRING
- Timestamp field : Not set - using <ROWTIME>
- Value format : AVRO
- Kafka topic : ratings-with-customer-data (partitions: 4, replication: 1)
- [...]
- Local runtime statistics
- ------------------------
- messages-per-sec: 3.61 total-messages: 2824 last-message: 6/12/18 11:58:27 AM UTC
- failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
- (本地 KSQL 服务器与 Kafka 主题 ratings-with-customer-data 的交互统计)
实际上, 这条 SQL 语句本身就是一个应用程序, 就像我们在 Java,Python,C...... 中编写的代码一样. 它不断地执行, 接收输入数据, 处理数据, 输出数据. 我们在上面看到的输出是该应用程序的运行时指标.
使用 KSQL 过滤数据流
我们前面创建的 JOIN 查询其输出是一个 Kafka 主题, 在源自源主题 ratings 的事件的驱动下实时填充, 如下图 4 所示:
我们可以构建第二个 KSQL 应用程序, 由这个派生主题所驱动, 并对数据做进一步地处理. 这里, 我们将简单地过滤所有评级流, 识别那些同时满足如下两个条件的评级:
差评(评级范围 1 到 5, 小于 3 即为差评)
"铂金" 客户留下的评级
SQL 给出的语义几乎可以从字面上表达上述需求. 我们可以首先使用 KSQL CLI 验证该查询:
- SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \
- FROM RATINGS_ENRICHED \
- WHERE STARS <3 \
- AND CLUB_STATUS = 'platinum';
- platinum | ltoopinc@icio.us | 1 | worst. flight. ever. #neveragain
- platinum | italboyd@imageshack.us | 2 | (expletive deleted)
然后, 和以前一样, 这个持续查询的结果可以持久化到一个 Kafka 主题, 只需为语句加上 CREATE STREAM ... AS (通常使用缩写 CSAS)前缀. 注意, 我们可以选择所有的源列( SELECT * ), 或者创建一个可用字段的子集( SELECT COL1, COL2 ), 使用哪一个取决于创建流的目的. 此外, 我们将把目标消息写成 JSON 格式:
- CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS \
- WITH (VALUE_FORMAT='JSON', PARTITIONS=1) AS \
- SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \
- FROM RATINGS_ENRICHED \
- WHERE STARS < 3 \
- AND CLUB_STATUS = 'platinum';
查看生成的 Kafka 主题, 我们可以看到, 它只包含我们感兴趣的事件. 再次强调一下, 这是一个 Kafka 主题 -- 我们可以使用 KSQL 查询它 -- 这里, 我将跳过 KSQL, 使用流行的 https://GitHub.com/edenhill/kafkacat/ 工具查看它:
- kafka-console-consumer \
- --Bootstrap-server kafka:9092 \
- --topic UNHAPPY_PLATINUM_CUSTOMERS | jq '.'
- {
- "CLUB_STATUS": {
- "string": "platinum"
- },
- "EMAIL": {
- "string": "italboyd@imageshack.us"
- },
- "STARS": {
- "int": 1
- },
- "MESSAGE": {
- "string": "Surprisingly good, maybe you are getting your mojo back at long last!"
- }
- }
在离开 KSQL 之前, 我们给自己提个醒, 我们实际上仅写了三个流应用程序:
- ksql> SHOW QUERIES;
- Query ID | Kafka Topic | Query String
- ------------------------------------------------------------------------------------------------------------
- CSAS_CUSTOMERS_SRC_REKEY_0 | CUSTOMERS_SRC_REKEY | CREATE STREAM CUSTOMERS_SRC_REKEY [...]
- CSAS_RATINGS_ENRICHED_1 | RATINGS_ENRICHED | CREATE STREAM RATINGS_ENRICHED [...]
- CSAS_UNHAPPY_PLATINUM_CUSTOMERS_2 | UNHAPPY_PLATINUM_CUSTOMERS | CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS [...]
由 Kafka 主题驱动的推送通知
我们在上面创建的主题 UNHAPPY_PLATINUM_CUSTOMERS 可以用于驱动一个应用程序, 如果有重要客户留下了差评, 它就会给客户运营团队发送警报. 这里的关键是, 我们基于一个刚刚发生的事件驱动了一个实时的动作. 基于批处理的分析下周才告诉我们, 上周我们让一位客户失望了, 这就没用了. 我们希望现在就知道, 以便我们现在就可以采取行动, 向那位客户提供更好的体验.
Kafka 客户端库有面向各种语言的 -- 你几乎可以选择任何语言. 这里, 我们使用 面向 Python 的开源 Confluent Kafka 库 . 这是一个构建事件驱动应用程序的简单例子. 它在一个 Kafka 主题上监听事件, 然后生成一个推送通知. 我们将使用 Slack 作为我们的通知发送平台. 为了简化说明, 下面的代码片段删除了所有的错误处理代码. 我们可以把一个 API(如 Slack 的 API) https://API.Slack.com/Web 和一个 Kafka 主题集成, 在这个主题上监听事件, 从而触发一个动作.
- from slackclient import SlackClient
- from confluent_kafka import Consumer, KafkaError
- sc = SlackClient('API-token-xxxxxxx')
- settings = {
- 'Bootstrap.servers': 'localhost:9092',
- 'group.id': 'python_kafka_notify.py',
- 'default.topic.config': {'auto.offset.reset': 'largest'}
- }
- c = Consumer(settings)
- c.subscribe(['UNHAPPY_PLATINUM_CUSTOMERS'])
- while True:
- msg = c.poll(0.1)
- if msg is None:
- continue
- else:
- email=app_msg['EMAIL']
- message=app_msg['MESSAGE']
- channel='unhappy-customers'
- text=('`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_' % (email, message))
- sc.api_call('chat.postMessage', channel=channel,
- text=text, username='KSQL Notifications',
- icon_emoji=':rocket:')
- finally:
- c.close()
下图 5 展示了使用 Slack API 发送用户通知.
[点击查看大图]
这里有必要重申一下, 我们正在构建的应用程序 (如果你愿意, 可以把它称为微服务) 是事件驱动的. 就是说, 该应用程序会等待一个事件, 然后执行动作. 它不是尝试处理所有数据并查找特定的条件, 也不是一个响应某个命令的同步请求 - 响应服务. 我们已经分离出了这些职责:
根据确定的条件过滤实时事件流 是由 KSQL 完成的(使用我们前面介绍的
CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS
语句), 匹配的事件被写入一个 Kafka 主题;
通知服务 有一个唯一的职责, 它负责从 Kafka 主题获得事件, 并基于它生成一个推送通知. 这是异步完成的.
这样做的好处很明显:
我们可以横向扩展应用程序, 使其处理更多通知, 而不必修改过滤逻辑;
我们可以使用可选的其他应用程序替换这个应用程序, 而不必修改过滤逻辑;
我们可以替换或修改过滤逻辑, 而不必触及通知应用程序.
Kafka 和请求 / 响应模式
对于基于 Kafka 平台编写应用程序, 有一种常见的质疑, 就是事件驱动模式不适用于应用程序的流程, 并由此推论, Kafka 也不适合. 这种观点是错误的, 有两个关键点需要记住:
事件驱动模式和请求 / 响应模式都完全可以使用 -- 它们不是互斥的, 有些需求需要使用请求 / 响应模式;
决定因素应该是需求; 应该挑战现有方法的惯性. 在部分或全部应用程序的消息传递中使用事件驱动架构, 你可以从它带来的异步性, 可扩展性以及与 Kafka 的集成中受益, 其他所有使用 Kafka 的系统和应用程序也是如此.
要了解有关这个问题的进一步讨论, 可以查阅 Ben Stopford 的 系列文章 及其最新著作《 事件驱动系统设计 》.
使数据从 Kafka 流入 Elasticsearch, 用于操作分析
使用 Kafka Connect 很容易就可以使数据从 Kafka 流入 Elasticsearch. 它提供了一个由配置文件控制的可扩展的流集成. 有一个开源的 Elasticsearch 连接器, 既可以 单独存在 , 也可以作为 Confluent 平台 https://www.confluent.io/download/ 的一部分. 这里, 我们将使原始评级及警告信息流入 Elasticsearch:
- "name": "es_sink",
- "config": {
- "connector.class": "io.confluent.connect.Elasticsearch.ElasticsearchSinkConnector",
- "topics": "ratings-with-customer-data,UNHAPPY_PLATINUM_CUSTOMERS",
- "connection.url": "http://Elasticsearch:9200"
- [...]
- }
- }
在从 Kafka Connect 到 Elasticsearch 的数据流上, 使用 Kibana 很容易在经过充实, 过滤的数据上构建一个实时仪表板, 如图 6 所示.
[点击查看大图]
使数据从 Kafka 流入数据湖
最后, 我们将使充实后的评级流入数据湖. 在这里, 它可以用于在线分析, 训练机器学习模型和数据科学项目, 等等.
Kafka 中的数据可以流入 使用 Kafka Connect 的各种类型的目标 https://hub.confluent.io/ . 这里, 我们将看下 S3 和 BigQuery, 但是, 使用 HDFS,GCS,Redshift,Snowflake DB 等也同样简单.
就像前面介绍的使数据从 Kafka 流入 Elasticsearch 一样, 针对每项目标技术的设置只是一个简单的配置文件设置:
- "name": "s3-sink-ratings",
- "config": {
- "connector.class": "io.confluent.connect.s3.S3SinkConnector",
- "topics": "ratings-with-customer-data",
- "s3.region": "us-west-2",
- "s3.bucket.name": "rmoff-demo-ratings",
数据流入 S3 后, 我们可以在桶里查看, 如图 7 所示.
[点击查看大图]
我们还可以使同样的数据流入谷歌的 BigQuery:
- "name": "gbq-sink-ratings",
- "config": {
- "connector.class":"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
- "topics": "ratings-with-customer-data",
- "project":"rmoff",
- "datasets":".*=ratings",
[点击查看大图]
谷歌的 Data Studio 是众多可以用于分析这些来自云对象存储的数据的应用程序之一:
[点击查看大图]
这里的重点不是上面介绍的具体技术, 不管你选择使用什么样的数据湖技术, 使用 Kafka Connect 都很容易使数据流入它.
和 KSQL 及流平台一起走向未来
在这篇文章中, 我们已经看了把流平台作为数据架构核心组成部分的其中多个有力的论据. 它提供了一个可扩展的基础, 由于其解耦特性, 使系统可以灵活地集成和演进. 分析工作可以从流平台的强大集成能力中获益. 它是流平台, 因此, 实时不是其主要动因. 应用程序可以从流平台获益, 因为它是实时的, 而且也因为它的集成能力.
借助 KSQL, 可以使用许多开发人员都熟悉的语言编写流处理应用程序. 这些应用程序可以是简单的 Kafka 事件流过滤器, 也可以是复杂的充实模式, 从包括数据库在内的其他系统获取数据.
要了解更多有关 KSQL 的信息, 你可以 观看教程 并自己 试一下 . 文档中介绍了调整和部署实践. 在 Confluent Community Slack 群组 http://cnfl.io/Slack 中, 有一个与此相关的活跃社区. GitHub https://GitHub.com/confluentinc/infoq-kafka-ksql 上提供了本文的示例.
来源: http://www.tuicool.com/articles/uM7raum