综述:
在 Flink 中 DataStream 程序是在数据流上实现了转换的常规程序.
1. 示范程序
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
- public class WindowWordCount { public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<String, Integer>> dataStream = env
- .socketTextStream("localhost", 9999)
- .flatMap(new Splitter())
- .keyBy(0)
- .timeWindow(Time.seconds(5))
- .sum(1);
- dataStream.print();
- env.execute("Window WordCount");
- }
- public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
- for (String word: sentence.split(" ")) {
- out.collect(new Tuple2<String, Integer>(word, 1));
- }
- }
- }
- }
- nc -lk 9999
2. 数据源
程序从源读取输入. 可以通过 StreamExecutionEnvironment.addSource(sourceFunction)给程序附上源.
在 StreamExecutionEnvironment 中有一些可访问的预定义的流数据源: readTextFile(path) 逐行作为字符串读取文本文件 readFile(fileInputFormat, path) 通过指定的文件输入格式 (the specified file input format) 读取文件 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 这是一个被前两个方法内部调用的方法. 它基于给定 fileInputFormat 在 path 下读取文件, 根据提供的
watchType, 这个源会定期监测 (每 interval ms) 新数据的路径.
基于套接字的 socketTextStream 从套接字读取. 元素可以由一个分隔符分开.
基于集合的 fromCollection(Collection) 从 Java Java.util.Collection 创建一个数据流, 集合中的所有元素必须是相同类型的. fromCollection(Iterator, Class) 从一个迭代器创建一个数据流, 类指定迭代器返回的元素的数据类型. fromElements(T ...) 从给定的对象的序列创建一个数据流, 所有对象必须是相同类型的. fromParallelCollection(SplittableIterator, Class) 在并行执行中, 从一个迭代器创建一个数据流, 类指定迭代器返回的元素的数据类型. generateSequence(from, to) 在给定的时间间隔内, 生成的数字序列, 并行执行.
自定义的 addSource 附上一个新的源函数. 例如要从 Apache Kafka 读取, 可以用 addSource(new FlinkKafkaConsumer08<>(...)).
3.DataStream Transformations 参照运算符.
4.Data Sinks 数据接收
来源: http://www.bubuko.com/infodetail-2717223.html