前言
前篇文章 《从 0 到 1 学习 Flink》-- Data Sink 介绍 介绍了 Flink Data Sink, 也介绍了 Flink 自带的 Sink, 那么如何自定义自己的 Sink 呢? 这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去.
准备工作
我们先来看下 Flink 从 Kafka topic 中获取数据的 demo, 首先你需要安装好了 FLink 和 Kafka .
运行启动 Flink,Zookepeer,Kafka,
好了, 都启动了!
数据库建表
- DROP TABLE IF EXISTS `student`;
- CREATE TABLE `student` (
- `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
- `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
- `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
- `age` int(10) DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
实体类
Student.java
- package com.zhisheng.flink.model;
- /**
- * Desc:
- * weixin: zhisheng_tian
- * blog: http://www.54tianzhisheng.cn/
- */
- public class Student {
- public int id;
- public String name;
- public String password;
- public int age;
- public Student() {
- }
- public Student(int id, String name, String password, int age) {
- this.id = id;
- this.name = name;
- this.password = password;
- this.age = age;
- }
- @Override
- public String toString() {
- return "Student{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", password='" + password + '\'' +
- ", age=" + age +
- '}';
- }
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getPassword() {
- return password;
- }
- public void setPassword(String password) {
- this.password = password;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- }
工具类
工具类往 kafka topic student 发送数据
- import com.alibaba.fastjson.JSON;
- import com.zhisheng.flink.model.Metric;
- import com.zhisheng.flink.model.Student;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- /**
- * 往 kafka 中写数据
- * 可以使用这个 main 函数进行测试一下
- * weixin: zhisheng_tian
- * blog: http://www.54tianzhisheng.cn/
- */
- public class KafkaUtils2 {
- public static final String broker_list = "localhost:9092";
- public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topic
- public static void writeToKafka() throws InterruptedException {
- Properties props = new Properties();
- props.put("bootstrap.servers", broker_list);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer producer = new KafkaProducer<String, String>(props);
- for (int i = 1; i <= 100; i++) {
- Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
- ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
- producer.send(record);
- System.out.println("发送数据:" + JSON.toJSONString(student));
- }
- producer.flush();
- }
- public static void main(String[] args) throws InterruptedException {
- writeToKafka();
- }
- }
- SinkToMySQL
该类就是 Sink Function, 继承了 RichSinkFunction , 然后重写了里面的方法. 在 invoke 方法中将数据插入到 MySQL 中.
- package com.zhisheng.flink.sink;
- import com.zhisheng.flink.model.Student;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.API.functions.sink.RichSinkFunction;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- /**
- * Desc:
- * weixin: zhisheng_tian
- * blog: http://www.54tianzhisheng.cn/
- */
- public class SinkToMySQL extends RichSinkFunction<Student> {
- PreparedStatement ps;
- private Connection connection;
- /**
- * open() 方法中建立连接, 这样不用每次 invoke 的时候都要建立连接和释放连接
- *
- * @param parameters
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- connection = getConnection();
- String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
- ps = this.connection.prepareStatement(sql);
- }
- @Override
- public void close() throws Exception {
- super.close();
- // 关闭连接和释放资源
- if (connection != null) {
- connection.close();
- }
- if (ps != null) {
- ps.close();
- }
- }
- /**
- * 每条数据的插入都要调用一次 invoke() 方法
- *
- * @param value
- * @param context
- * @throws Exception
- */
- @Override
- public void invoke(Student value, Context context) throws Exception {
- // 组装数据, 执行插入操作
- ps.setInt(1, value.getId());
- ps.setString(2, value.getName());
- ps.setString(3, value.getPassword());
- ps.setInt(4, value.getAge());
- ps.executeUpdate();
- }
- private static Connection getConnection() {
- Connection con = null;
- try {
- Class.forName("com.mysql.jdbc.Driver");
- con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
- } catch (Exception e) {
- System.out.println("-----------mysql get connection has exception , msg ="+ e.getMessage());
- }
- return con;
- }
- }
Flink 程序
这里的 source 是从 kafka 读取数据的, 然后 Flink 从 Kafka 读取到数据 (JSON) 后用阿里 fastjson 来解析成 student 对象, 然后在 addSink 中使用我们创建的 SinkToMySQL, 这样就可以把数据存储到 MySQL 了.
- package com.zhisheng.flink;
- import com.alibaba.fastjson.JSON;
- import com.zhisheng.flink.model.Student;
- import com.zhisheng.flink.sink.SinkToMySQL;
- import org.apache.flink.API.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.API.datastream.DataStreamSource;
- import org.apache.flink.streaming.API.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.API.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.API.functions.sink.PrintSinkFunction;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
- import java.util.Properties;
- /**
- * Desc:
- * weixin: zhisheng_tian
- * blog: http://www.54tianzhisheng.cn/
- */
- public class Main3 {
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("zookeeper.connect", "localhost:2181");
- props.put("group.id", "metric-group");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "latest");
- SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
- "student", // 这个 kafka topic 需要和上面的工具类的 topic 一致
- new SimpleStringSchema(),
- props)).setParallelism(1)
- .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
- student.addSink(new SinkToMySQL()); // 数据 sink 到 MySQL
- env.execute("Flink add sink");
- }
- }
结果
运行 Flink 程序, 然后再运行 KafkaUtils2.java 工具类, 这样就可以了.
如果数据插入成功了, 那么我们查看下我们的数据库:
数据库中已经插入了 100 条我们从 Kafka 发送的数据了. 证明我们的 SinkToMySQL 起作用了. 是不是很简单?
项目结构
怕大家不知道我的项目结构, 这里发个截图看下:
最后
本文主要利用一个 demo, 告诉大家如何自定义 Sink Function, 将从 Kafka 的数据 Sink 到 MySQL 中, 如果你项目中有其他的数据来源, 你也可以换成对应的 Source, 也有可能你的 Sink 是到其他的地方或者其他不同的方式, 那么依旧是这个套路: 继承 RichSinkFunction 抽象类, 重写 invoke 方法.
关注我
转载请务必注明原创地址为:
微信公众号: zhisheng
另外我自己整理了些 Flink 的学习资料, 目前已经全部放到微信公众号 (zhisheng) 了, 你可以回复关键字: Flink 即可无条件获取到. 另外也可以加我微信 你可以加我的微信: yuanblog_tzs, 探讨技术!
更多私密资料请加入知识星球!
GitHub 代码仓库
https://github.com/zhisheng17/flink-learning/
以后这个项目的所有代码都将放在这个仓库里, 包含了自己学习 flink 的一些 demo 和博客
博客
1,Flink 从 0 到 1 学习 -- Apache Flink 介绍
2,Flink 从 0 到 1 学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3,Flink 从 0 到 1 学习 -- Flink 配置文件详解
4,Flink 从 0 到 1 学习 -- Data Source 介绍
5,Flink 从 0 到 1 学习 -- 如何自定义 Data Source ?
6,Flink 从 0 到 1 学习 -- Data Sink 介绍
7,Flink 从 0 到 1 学习 -- 如何自定义 Data Sink ?
8,Flink 从 0 到 1 学习 -- Flink Data transformation(转换)
9,Flink 从 0 到 1 学习 -- 介绍 Flink 中的 Stream Windows
10,Flink 从 0 到 1 学习 -- Flink 中的几种 Time 详解
11,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 Elasticsearch
12,Flink 从 0 到 1 学习 -- Flink 项目如何运行? http://www.54tianzhisheng.cn/2019/01/05/Flink-run/
13,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 Kafka
14,Flink 从 0 到 1 学习 -- Flink JobManager 高可用性配置
15,Flink 从 0 到 1 学习 -- Flink parallelism 和 Slot 介绍
16,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据批量写入到 MySQL
17,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 RabbitMQ
18,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 HBase https://t.zsxq.com/zV7MnuJ
19,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 HDFS https://t.zsxq.com/zV7MnuJ
20,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 Redis https://t.zsxq.com/zV7MnuJ
21,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 Cassandra https://t.zsxq.com/uVbi2nq
22,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 Flume https://t.zsxq.com/zV7MnuJ
23,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 InfluxDB https://t.zsxq.com/zV7MnuJ
24,Flink 从 0 到 1 学习 -- Flink 读取 Kafka 数据写入到 RocketMQ https://t.zsxq.com/zV7MnuJ
25,Flink 从 0 到 1 学习 -- 你上传的 jar 包藏到哪里去了
26,Flink 从 0 到 1 学习 -- 你的 Flink job 日志跑到哪里去了 https://t.zsxq.com/zV7MnuJ
27, 阿里巴巴开源的 Blink 实时计算框架真香 http://www.54tianzhisheng.cn/2019/02/28/blink/
28,Flink 从 0 到 1 学习 -- Flink 中如何管理配置?
29,Flink 从 0 到 1 学习 -- Flink 不可以连续 Split(分流)?
30,Flink 从 0 到 1 学习 -- 分享四本 Flink 国外的书和二十多篇 Paper 论文
31,Flink 架构, 原理与部署测试
32, 为什么说流处理即未来?
33,OPPO 数据中台之基石: 基于 Flink SQL 构建实时数据仓库
34, 流计算框架 Flink 与 Storm 的性能对比
35,Flink 状态管理和容错机制介绍
36,Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
37,360 深度实践: Flink 与 Storm 协议级对比
38, 如何基于 Flink+TensorFlow 打造实时智能异常检测平台? 只看这一篇就够了
39,Apache Flink 1.9 重大特性提前解读
40,Flink 全网最全资源(视频, 博客, PPT, 入门, 实战, 源码解析, 问答等持续更新)
41,Flink 灵魂两百问, 这谁顶得住? https://mp.weixin.qq.com/s/ok-YwuVbwAVtJz7hUCiZxg
42,Flink 从 0 到 1 学习 -- 如何使用 Side Output 来分流?
43, 你公司到底需不需要引入实时计算引擎?
44, 一文让你彻底了解大数据实时计算引擎 Flink http://www.54tianzhisheng.cn/2019/08/19/flink/
源码解析
1,Flink 源码解析 -- 源码编译运行
2,Flink 源码解析 -- 项目结构一览
3,Flink 源码解析 -- local 模式启动流程 http://www.54tianzhisheng.cn/tags/Flink/
4,Flink 源码解析 -- standalone session 模式启动流程
5,Flink 源码解析 -- Standalone Session Cluster 启动流程深度分析之 Job Manager 启动
6,Flink 源码解析 -- Standalone Session Cluster 启动流程深度分析之 Task Manager 启动
7,Flink 源码解析 -- 分析 Batch WordCount 程序的执行过程
8,Flink 源码解析 -- 分析 Streaming WordCount 程序的执行过程
9,Flink 源码解析 -- 如何获取 JobGraph?
10,Flink 源码解析 -- 如何获取 StreamGraph?
11,Flink 源码解析 -- Flink JobManager 有什么作用?
12,Flink 源码解析 -- Flink TaskManager 有什么作用?
13,Flink 源码解析 -- JobManager 处理 SubmitJob 的过程
14,Flink 源码解析 -- TaskManager 处理 SubmitJob 的过程
15,Flink 源码解析 -- 深度解析 Flink Checkpoint 机制
16,Flink 源码解析 -- 深度解析 Flink 序列化机制
17,Flink 源码解析 -- 深度解析 Flink 是如何管理好内存的?
18,Flink Metrics 源码解析 -- Flink-metrics-core
19,Flink Metrics 源码解析 -- Flink-metrics-datadog
20,Flink Metrics 源码解析 -- Flink-metrics-dropwizard
21,Flink Metrics 源码解析 -- Flink-metrics-graphite
22,Flink Metrics 源码解析 -- Flink-metrics-influxdb
23,Flink Metrics 源码解析 -- Flink-metrics-jmx
24,Flink Metrics 源码解析 -- Flink-metrics-slf4j
25,Flink Metrics 源码解析 -- Flink-metrics-statsd
26,Flink Metrics 源码解析 -- Flink-metrics-prometheus
26,Flink Annotations 源码解析
27,Flink 源码解析 -- 如何获取 ExecutionGraph ?
28, 大数据重磅炸弹 -- 实时计算框架 Flink https://t.zsxq.com/UvrRNJM
29,Flink Checkpoint-轻量级分布式快照 https://t.zsxq.com/QVFqjea
30,Flink Clients 源码解析
来源: https://www.cnblogs.com/zhisheng/p/11562566.html