本文要点
大多数的流处理技术, 需要开发人员使用 Java 或 Scala 等编程语言编写代码.
KSQL 是 Apache Kafka 的数据流 SQL 引擎, 它使用 SQL 语句替代编写大量代码去实现流处理任务.
KSQL 基于 Kafka 的 Stream API 构建, 它支持过滤, 转换, 聚合, 连接, 加窗操作和 Sessionization(即捕获单一会话期间的所有的流事件)等流处理操作.
KSQL 的用例涉及实现实时报表和仪表盘, 基础设施和物联网设备监控, 异常检测和欺骗行为报警等.
你会根据一分钟前的交通信号灯过马路吗? 当然不会! 当前, 现代企业或者出于竞争上的压力, 或者因为企业的客户对产品或服务的交互方式有着更高的期望, 它们也面对着同样的需求.
如果人们在 iPad 上轻点按钮就可以租赁和观看最新的影片, 那么为什么还要因为银行账户吃紧而必须等待数小时?
- CREATE STREAM fraudulent_payments AS
- SELECT * FROM payments-kafka-stream
- WHERE fraud_probability> 0.8
- // Using Kafka's Streams API
- object FraudFilteringApplication extends App {
- val builder: StreamsBuilder = new StreamsBuilder()
- val fraudulentPayments: KStream[String, Payment] = builder
- .stream[String, Payment]("payments-kafka-topic")
- .filter((_ ,payment) => payment.fraudProbability> 0.8)
- fraudulentPayments.to("fraudulent-payments-topic")
- val config = new java.util.Properties
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-filtering-app")
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
- val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
- streams.start()
- }
- CREATE STREAM vip_users_clickstream AS
- SELECT user_id, user_country, web_page, action
- FROM website_clickstream c
- LEFT JOIN users u ON u.user_id = c.user_id
- WHERE u.level = 'Platinum';
- CREATE STREAM anonymized_vip_clickstream AS
- SELECT user_country, web_page, action
- FROM vip_users_clickstream;
- CREATE TABLE possibly_failing_vehicles AS
- SELECT vehicle, COUNT(*)
- FROM vehicle_monitoring_stream
- WINDOW TUMBLING (SIZE 5 MINUTES)
- WHERE event_type = 'ERROR'
- GROUP BY vehicle
- HAVING COUNT(*)>= 3;
- CREATE TABLE possibly_failing_vehicles AS
- SELECT vehicle, COUNT(*)
- FROM vehicle_monitoring_stream
- WINDOW TUMBLING (SIZE 5 MINUTES)
- WHERE event_type = 'ERROR'
- GROUP BY vehicle
- HAVING COUNT(*)>= 3;
- CREATE TABLE geo_location_checkins_per_user AS
- SELECT username, COUNT(*)
- FROM geo_location_updates
- GROUP BY username;
- CREATE TABLE orders_hourly_aggregates AS
- SELECT
- order_status,
- COUNT(*) AS order_count,
- MAX(ORDER_TOTAL) AS max_order_total,
- MIN(ORDER_TOTAL) AS min_order_total,
- SUM(ORDER_TOTAL) AS sum_order_total,
- FROM orders
- WINDOW TUMBLING (SIZE 1 HOUR)
- GROUP BY order_status
来源: http://www.infoq.com/cn/articles/democratizing-stream-processing-apache-kafka-ksql