准备工作
1, 安装查看 Java 的版本号, 推荐使用 Java 8.
安装 Flink
2, 在 Mac OS X 上安装 Flink 是非常方便的. 推荐通过 homebrew 来安装.
brew install apache-flink
3, 检查安装:
flink --version
结果:
Version: 1.6.0, Commit ID: ff472b4
4, 启动 flink
- zhisheng@zhisheng /usr/local/Cellar/apache-flink/1.6.0/libexec/bin ./start-cluster.sh
- Starting cluster.
- Starting standalonesession daemon on host zhisheng.
- Starting taskexecutor daemon on host zhisheng.
接着就可以进入 web 页面 ( http://localhost:8081/ ) 查看
demo
1, 新建一个 maven 项目
创建一个 SocketTextStreamWordCount 文件, 加入以下代码:
- package com.zhisheng.flink;
- import org.apache.flink.API.common.functions.FlatMapFunction;
- import org.apache.flink.API.java.tuple.Tuple2;
- import org.apache.flink.streaming.API.datastream.DataStreamSource;
- import org.apache.flink.streaming.API.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.API.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- /**
- * Created by zhisheng_tian on 2018/9/18
- */
- public class SocketTextStreamWordCount {
- public static void main(String[] args) throws Exception {
- // 参数检查
- if (args.length != 2) {
- System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
- return;
- }
- String hostname = args[0];
- Integer port = Integer.parseInt(args[1]);
- // set up the streaming execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 获取数据
- DataStreamSource<String> stream = env.socketTextStream(hostname, port);
- // 计数
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
- .keyBy(0)
- .sum(1);
- sum.print();
- env.execute("Java WordCount from SocketTextStream Example");
- }
- public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
- String[] tokens = s.toLowerCase().split("\\W+");
- for (String token: tokens) {
- if (token.length()> 0) {
- collector.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
- }
接着进入工程目录, 使用以下命令打包.
mvn clean package -Dmaven.test.skip=true
然后我们开启监听 9000 端口:
nc -l 9000
最后进入 flink 安装目录 bin 下执行以下命令跑程序:
flink run -c com.zhisheng.flink.SocketTextStreamWordCount /Users/zhisheng/IdeaProjects/flink/Word-count/target/original-Word-count-1.0-SNAPSHOT.jar 127.0.0.1 9000
注意换成你自己项目的路径.
执行完上述命令后, 我们可以在 webUI 中看到正在运行的程序:
我们可以在 nc 监听端口中输入 text, 比如:
然后我们通过 tail 命令看一下输出的 log 文件, 来观察统计结果. 进入目录 apache-flink/1.6.0/libexec/log, 执行以下命令:
tail -f flink-zhisheng-taskexecutor-0-zhisheng.out
注意: 切换成你自己的路径和查看自己的目录.
总结
本文描述了如何在 Mac 电脑上安装 Flink, 及运行它. 接着通过一个简单的 Flink 程序来介绍如何构建及运行 Flink 程序.
来源: https://juejin.im/post/5be99ea2f265da613a538e41