Flink 的编程模型
1, 获取 Flink 上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2, 加载, 创建数据;
DataSet
3, 数据转换;
Transformation
4, 数据结果存放;
5, 触发执行.
env.execution
下面为 flink 输出 wordcount 数据:
- import org.apache.flink.API.common.functions.FlatMapFunction;
- import org.apache.flink.API.java.DataSet;
- import org.apache.flink.API.java.ExecutionEnvironment;
- import org.apache.flink.API.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
- public class FlinkMain {
- @SuppressWarnings("serial")
- public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{
- @SuppressWarnings("rawtypes")
- @Override
- /**
- * @param value 原数据
- * @param out 输出的数据
- */
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
- String[] tokens = value.split(" ");
- for (String token : tokens) {
- if(token!=null && token.length()>0){
- Tuple2 t = new Tuple2<String, Integer>(token,1);
- out.collect(t);
- }
- }
- }
- }
- public static void main(String[] args) throws Exception {
- // 创建 flink 上下文
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- // 创建数据集
- DataSet<String> text = env.fromElements("to be","or no to be","is question");
- // 对数据集转换
- DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
- // 输出转换后的数据集 (print 中包含了 env.execute 执行)
- count.print();
- System.out.println("-----------------------");
- // 对数据集分组统计转换, 0,1 是下标, 对应 Tuple2 类中的参数
- count = count.groupBy(0).sum(1);
- // 控制台输出数据集
- count.print();
- System.out.println("-----------------------");
- }
- }
Flink 使用 sql 方式转换数据
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.flink.API.java.DataSet;
- import org.apache.flink.API.java.ExecutionEnvironment;
- import org.apache.flink.table.API.Table;
- import org.apache.flink.table.API.TableEnvironment;
- import org.apache.flink.table.API.java.BatchTableEnvironment;
- public class FlinkMain2 {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static void main(String[] args) throws Exception {
- // 创建 flink 上下文
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
- List<WordCount> list = new ArrayList();
- String workStr = "to be or no to be is question";
- String[] tokens = workStr.split(" ");
- for (String token : tokens) {
- if(token!=null && token.length()>0){
- list.add( new WordCount(token,1));
- }
- }
- // 创建数据集
- DataSet<WordCount> input = env.fromCollection(list);
- // 注册为数据表 wordCount 为数据库表, Word,frequency 为 wordCount 表字段
- tEnv.registerDataSet("wordCount", input, "word, frequency");
- Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );
- DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
- // 控制台输出
- res.print();
- }
- public static class WordCount {
- public String Word;
- public long frequency;
- public WordCount(){}
- public WordCount(String Word, long frequency) {
- this.Word = Word;
- this.frequency = frequency;
- }
- @Override
- public String toString() {
- return "词语:" + Word + ", 词频:" + frequency;
- }
- }
- }
来源: http://www.bubuko.com/infodetail-3190568.html