维基百科在 IRC 频道上记录 Wiki 被修改的日志, 我们可以通过监听这个 IRC 频道, 来实时监控给定时间窗口内的修改事件. Apache Flink 作为流计算引擎, 非常适合处理流数据, 并且, 类似于 Hadoop MapReduce 等框架, Flink 提供了非常良好的抽象, 使得业务逻辑代码编写非常简单. 我们通过这个简单的例子来感受一下 Flink 的程序的编写.
通过 Flink Quickstart 构建 Maven 工程
Flink 提供了
flink-quickstart-java
和
flink-quickstart-scala
插件, 允许使用 Maven 的开发者创建统一的项目模版, 应用项目模板可以规避掉很多部署上的坑.
构建这次工程的命令如下
$ mvn archetype:generate \
- -DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=flink-quickstart-java \
- -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
- -DarchetypeVersion=1.6-SNAPSHOT \
- -DgroupId=wiki-edits \
- -DartifactId=wiki-edits \
- -Dversion=0.1 \
- -Dpackage=wikiedits \
- -DinteractiveMode=false
注意高版本的 Maven 不支持 -DarchetypeCatalog 参数, 可以将第一行改为
mvn org.apache.maven.plugins:maven-archetype-plugin:2.4::generate \
或者去掉 -DarchetypeCatalog 行, 并将 .m2/settings.xml 修改如下, 其中主要是在
//profiles/profile/repositories
下设置好搜索 archetype 的仓库地址
- <settings xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/settings-1.0.0.xsd">
- <profiles>
- <profile>
- <id>acme</id>
- <repositories>
- <repository>
- <id>archetype</id>
- <name>Apache Development Snapshot Repository</name>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
- </profile>
- </profiles>
<activeProfiles>
<activeProfile>acme</activeProfile>
- </activeProfiles>
- </settings>
成功下载项目模板后, 在当前目录下应当能看到 wiki-edit 目录. 执行命令
rm wiki-edits/src/main/java/wikiedits/*.java
清除模板自带的 Java 文件.
为了监听维基百科的 IRC 频道, 在 pom.xml 文件下添加如下依赖, 分别是 Flink 的客户端和 WikiEdit 的连接器
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
编写 Flink 程序
接下来的代码编写工作假定你是在 IDE 下编写的, 主要是为了避免啰嗦的 import 语句. 包含 import 等模板代码的全部代码在末尾给出.
首先我们创建用于运行的主程序代码
- src/main/java/wikiedits/WikipediaAnalysis.java
- package wikiedits;
- public class WikipediaAnalysis {
- public static void main(String[] args) throws Exception {
- }
- }
流处理的 Flink 程序的第一步是创建流处理执行上下文
StreamExecutionEnvironment
, 它类似于其他框架内的 Configuration 类, 用于配制 Flink 程序和运行时的各个参数, 对应的语句如下
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
下一步我们以维基百科 IRC 频道的日志作为数据源创建连接
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
这个语句创建了填充 WikipediaEditEvent 的 DataStream, 拿到数据流之后我们就可以对它做进一步的操作了.
我们的目标是统计给定时间窗口内, 比如说五秒内, 用户对维基百科的修改字节数. 因此我们对每个 WikipediaEditEvent 以用户名作为键来标记(keyed).Flink 兼容 Java 1.6 版本, 因此古老的版本中 Flink 提供 KeySelector 函数式接口来标记
- KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
- .keyBy(new KeySelector<WikipediaEditEvent, String>() {
- @Override
- public String getKey(WikipediaEditEvent event) {
- return event.getUser();
- }
- });
当前版本的 Flink 主要支持的是 Java 8 版本, 因此我们也可以用 Lambda 表达式来改写这段较为繁琐的代码
- KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
- .keyBy(WikipediaEditEvent::getUser);
这个语句定义了 keyedEdits 变量, 它是一个概念上形如
(String, WikipediaEditEvent)
的数据流, 即以字符串 (用户名) 为键, WikipediaEditEvent 为值的数据的流. 这一步骤类似于 MapReduce 的 Shuffle 过程, 针对 keyedEdits 的处理将自动按照键分组, 因此我们可以直接对数据进行 fold 操作以折叠聚合同一用户名的修改字节数
- DataStream<Tuple2<String, Long>> result = keyedEdits
- .timeWindow(Time.seconds(5))
- .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
- @Override
- public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
- acc.f0 = event.getUser();
- acc.f1 += event.getByteDiff();
- return acc;
- }
- });
在新版的 Flink 中, FoldFunction 因为无法支持部分聚合被废弃了, 如果对程序有强迫症, 我们可以采用类似于 MapReduce 的办法来改写上边的代码, 各个方法调用的作用与它们的名字一致, 其中, 为了绕过类型擦除导致的问题使用了 returns 函数
- DataStream<Tuple2<String, Long>> result = keyedEdits
- .map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff())))
- .returns(new TypeHint<Tuple2<String, Long>>(){})
- .timeWindowAll(Time.seconds(5))
- .reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1));
经过处理后的数据流 result 中就包含了我们所需要的信息, 具体地说是填充了
Tuple2<String, Long>
, 即 (用户名, 修改字节数) 元组的流, 我们可以使用 result.print() 来打印它.
程序至此主要处理逻辑就写完了, 但是 Flink 还需要在
StreamExecutionEnvironment
类型的变量上调用 execute 方法以实际执行整个 Flink 程序, 该方法执行时将整个 Flink 程序转化为任务图并提交到 Flink 集群中.
整个程序的代码, 包括模板代码, 如下所示
- package wikiedits;
- import org.apache.flink.api.common.typeinfo.TypeHint;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
- import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
- import org.apache.flink.api.java.tuple.Tuple2;
- public class WikipediaAnalysis {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
- KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
- .keyBy(WikipediaEditEvent::getUser);
- DataStream<Tuple2<String, Long>> result = keyedEdits
- .map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff())))
- .returns(new TypeHint<Tuple2<String, Long>>(){})
- .timeWindowAll(Time.seconds(5))
- .reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1));
- result.print();
- see.execute();
- }
- }
可以通过 IDE 运行程序, 在控制台看到类似下面格式的输出, 每一行前面的数字代表了这是由 print 的并行实例中的编号为几的实例运行的结果
- 1> (LilHelpa,1966)
- 2> (1.70.80.5,2066)
- 3> (Beyond My Ken,-6550)
- 4> (Aleksandr Grigoryev,725)
- 1> (6.77.155.31,1943)
- 2> (Serols,1639)
- 3> (ClueBot NG,1907)
- 4> (GSS,3155)
来源: https://www.cnblogs.com/wander4096/p/9011989.html