上一篇介绍了编写 Flink 程序的基本步骤, 以及一些常见 API, 如: map,filter,keyBy 等, 重点介绍了 keyBy 方法. 本篇将继续介绍 Flink 中常用的 API, 主要内容为
指定 transform 函数
Flink 支持的数据类型
累加器
指定 transform 函数
许多 transform 操作需要用户自定义函数来实现, Flink 支持多种自定义 transform 函数, 接下来一一介绍.
实现接口
- /**
- * 实现 MapFunction 接口
- * 其中泛型的第一 String 代表输入类型, 第二个 Integer 代表输出类型
- */
- class MyMapFunction implements MapFunction<String, Integer> {
- @Override
- public Integer map(String value) { return Integer.parseInt(value); }
- }
- // 使用 transform 函数
- data.map(new MyMapFunction());
匿名类
- data.map(new MapFunction<String, Integer> () {
- public Integer map(String value) { return Integer.parseInt(value); }
- });
匿名类是 Java 语言定义的语法, 与 "实现接口" 的方式一样, 只不过不需要显示定义子类. 这种方式比 "实现接口" 更常见一些.
Java 8 Lambda 表达式
- data.map(s -> Integer.parseInt(s));
- // 或者
- data.map(Integer::parseInt);
Java 8 支持 Lambda 表达式, 用法与 Scala 语法很像, 写起来简洁, 并且容易维护, 推荐使用这种方式.
rich function
顾名思义, 比普通的 transform 函数要更丰富, 额外提供了 4 个方法: open,close,getRuntimeContext 和 setRuntimeContext. 它们可以用来创建 / 初始化本地状态, 访问广播变量, 访问累加器和计数器等. 感觉有点像 Hadoop 中的 Mapper 或者 Reducer 类. 实现上, 可以使用自定义类继承 RichMapFunction 类的方式
- /**
- * 与实现 MapFunction 接口类似, 这里是继承了 RichMapFunction 类
- * 同时可以实现父类更多的方法
- */
- class MyRichMapFunction extends RichMapFunction<String, Integer> {
- @Override
- public void open(Configuration parameters) throws Exception { super.open(parameters); }
- @Override
- public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); }
- @Override
- public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); }
- @Override
- public Integer map(String value) throws Exception { return Integer.parseInt(value); }
- @Override
- public void close() throws Exception { super.close(); }
- }
- data.map(new MyRichMapFunction());
也可以使用匿名类的方式
- data.map (new RichMapFunction<String, Integer>() {
- @Override
- public void open(Configuration parameters) throws Exception { super.open(parameters); }
- @Override
- public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); }
- @Override
- public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); }
- @Override
- public Integer map(String value) { return Integer.parseInt(value); }
- @Override
- public void close() throws Exception { super.close(); }
- });
如果在 rich function 中需要写较多的业务, 那么用匿名类的方式并不简洁, 并且可读性差.
Flink 支持的数据类型
目前 Flink 支持 6 种数据类型
Java Tuple 和 Scala Case Class
Java POJO
原子类型
普通类
Values
Hadoop Writable 类型
特殊类
Java Tuple 和 Scala Case Class
Tuple (元组)是一个混合类型, 包含固定数量的属性, 并且每个属性类型可以不同. 例如: 二元组有 2 个属性, 类名为 Tuple2; 三元组有 3 个属性, 类名为 Tuple3, 以此类推. Java 支持的元组为 Tuple1 - Tuple25. 访问属性可以通过属性名直接访问, 如: tuple.f4 代表 tuple 的第 5 个属性. 或者使用 tuple.getField(int position) 方法, 参数 position 从 0 开始.
- /**
- * Tuple2 二元组作为 DataStream 的输入类型
- */
- DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
- new Tuple2<String, Integer>("hello", 1),
- new Tuple2<String, Integer>("world", 2));
- wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer map(Tuple2<String, Integer> value) throws Exception {
- return value.f1;
- }
- });
- Java POJO
POJO(Plain Ordinary Java Object) 叫做简单的 Java 对象. 满足以下条件的 Java 或 Scala 类会被 Flink 看做 POJO 类型
类必须是 public
必须有一个 public 修饰的无参构造方法(默认构造器)
所有属性必须是 public 修饰或者通过 getter 和 setter 方法可以访问到
属性类型必须也是 Flink 支持的, Flink 使用 avro 对其序列化
POJO 类型更易使用, 且 Flink 更高效地处理 POJO 类型的数据.
- public class WordWithCount {
- public String Word;
- public int count;
- public WordWithCount() {}
- public WordWithCount(String Word, int count) {
- this.Word = Word;
- this.count = count;
- }
- }
- DataStream<WordWithCount> wordCounts = env.fromElements(
- new WordWithCount("hello", 1),
- new WordWithCount("world", 2));
原子类型
Flink 支持 Java 和 Scala 中所有的原子类型, 例如: Integer,String 和 Double 等.
普通类
不是 POJO 类型的类都会被 Flink 看做是普通的类类型. Flink 将它们视为黑盒且不会访问它们的内容, 普通类类型使用 Kryo 进行序列化与反序列化. 这里是第二次提到序列化与反序列化, 简单解释下这个概念. 因为在分布式计算的系统中, 不可避免要在不同机器之间传输数据, 因此为了高效传输数据且在不同语言之间互相转化, 需要通过某种协议 (protobuf,kryo,avro,JSON) 将对象转化成另外一种形式 (序列化), 其他机器接到序列化的数据后再转化成之前的对象(反序列化) 就可以正常使用了.
Values
不同于一般的序列化框架, Values 类型通过实现 org.apache.flinktypes.Value 接口里的 write 和 read 方法, 实现自己的序列化和反序列化逻辑. 当一般的序列化框架不够高效的时候, 可以使用 Values 类型. 例如: 对于一个用数组存储的稀疏向量. 由于数组大多数元素为 0 , 可以仅对非 0 元素进行特殊编码, 而一般的序列化框架会对所有元素进行序列化操作.
Flink 已经预定义了几种 Value 类型与基本数据类型相对应. 如: ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue. 这些 Value 类型可以看做是基本数据类型的变体, 他们的值是可变的, 允许程序重复利用对象, 减轻 GC 的压力. 例如: Java 基本数据类型 String 是不可变的, 但是 Flink 的 StringValue 类型是可变的.
Flink 定义的 Value 类型与 Hadoop Writable 类型相似, 本质都是通过改进基本数据类型的缺点, 提供系统整体性能.
Hadoop Writable
Hadoop Writable 类型也是手动实现了比较高效的序列化与反序列化的逻辑. Value 类型实现了 org.apache.finktypes.Value 接口, 而 Hadoop Writable 类型实现了 org.apache.hadoop.Writable 接口, 该接口定义了 write 和 readFields 方法用来手动实现序列化与反序列化逻辑.
特殊类型
特殊类型包括 Scala 中的 Either, Option, and Try 类型, 以及 Java API 中的 Either 类型.
累加器
累加器可以通过 add 操作, 对程序中的某些状态或者操作进行计数, job 结束后会返回计数的结果. 累加器可以用来调试或者记录信息.
可以自定义累加器, 需要实现 Accumulator 接口, 当然 Flink 提供了两种内置的累加器
IntCounter, LongCounter 和 DoubleCounter
Histogram: 统计分布
使用累加器的步骤如下:
在 transform 函数中定义累加器对象
private IntCounter numLines = new IntCounter();
注册累加器对象, 可以在 rich function 的 open 方法进行
getRuntimeContext().addAccumulator("num-lines", this.numLines);
在任何需要统计的地方使用累加器
this.numLines.add(1);
获取累加器结果
myJobExecutionResult.getAccumulatorResult("num-lines")
Job 结束后, 累加器的最终值存储在 JobExecutionResult 对象中, 可以通过 execute 方法返回值来获取 JobExecutionResult 对象. 但是对于批处理无法使用调用这个方法(官网没有提到), 可以通过 env.getLastJobExecutionResult 方法获取. 下面是使用累加器的完整示例
- public static void main(String[] args) throws Exception {
- // set up the batch execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> data = env.readTextFile("你的输入路径");
- // 使用 rich function transform 函数
- DataSet<Integer> dataSet = data.map(new MyRichMapFunction());
- // 执行程序
- dataSet.collect();
- // 获得 job 的结果
- JobExecutionResult jobExecutionResult = env.getLastJobExecutionResult();
- int res = jobExecutionResult.getAccumulatorResult("num-lines");
- // 输出累机器的值
- System.out.println(res);
- }
- // 自定义 rich function
- /**
- * 与实现 MapFunction 接口类似, 这里是继承了 RichMapFunction 类
- * 同时可以实现父类更多的方法
- */
- class MyRichMapFunction extends RichMapFunction<String, Integer> {
- /**
- * 定义累加器
- */
- private IntCounter numLines = new IntCounter();
- @Override
- public void open(Configuration parameters) throws Exception {
- // 注册累加器
- getRuntimeContext().addAccumulator("num-lines", this.numLines);
- }
- @Override
- public Integer map(String value) throws Exception {
- // 累加器自增, 记录处理的行数
- this.numLines.add(1);
- return Integer.parseInt(value);
- }
- }
总结
Flink 基本 API 的使用介绍完了, 本篇主要介绍了自定义的 transform 函数, Flink 支持的数据类型和累加器. 后续会详细介绍 Flink 的原理, 机制以及编程模型.
来源: https://www.cnblogs.com/duma/p/10992125.html