- final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
- /*
- * Filter
- */
- DataStream<Long> input = streamExecutionEnvironment.generateSequence(-5, 5);
- input.filter(new FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) throws Exception {
- // TODO Auto-generated method stub
- return value>= 0;
- }
- }).print();
- streamExecutionEnvironment.execute();
- /*
- * Connect
- */
- DataStream<Long> someStream = streamExecutionEnvironment.generateSequence(0, 10);
- DataStream<String> otherStream = streamExecutionEnvironment.fromElements(WordCountData.WORDS);
- ConnectedStreams<Long, String> connectedStreams = someStream.connect(otherStream);
- DataStream<String> result = connectedStreams.flatMap(new CoFlatMapFunction<Long, String, String>() {
- @Override
- public void flatMap1(Long value, Collector<String> out) throws Exception {
- // TODO Auto-generated method stub
- out.collect(value.toString());
- }
- @Override
- public void flatMap2(String value, Collector<String> out) throws Exception {
- // TODO Auto-generated method stub
- Arrays.asList(value.split("\\W+")).stream().forEachOrdered(str -> out.collect(str));
- }
- });
- result.print();
- streamExecutionEnvironment.execute();
- /*
- * KeyBy
- */
- DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT);
- KeyedStream<Tuple4<String, String, String, Integer>, Tuple> keyedStream = input.keyBy("f0");
- keyedStream.print();
- keyedStream.maxBy("f3").print();
- streamExecutionEnvironment.execute();
- public static final Tuple4[] TRANSCRIPT = new Tuple4[] {
- Tuple4.of("class1","张三","语文",100),
- Tuple4.of("class1","李四","语文",78),
- Tuple4.of("class1","王五","语文",99),
- Tuple4.of("class2","赵六","语文",81),
- Tuple4.of("class2","钱七","语文",59),
- Tuple4.of("class2","马二","语文",97)
- };
- /*
- * Map
- */
- DataStream<Long> input = streamExecutionEnvironment.generateSequence(0, 10);
- DataStream<Long> plusOne = input.map(new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- // TODO Auto-generated method stub
- return value + 1;
- }
- });
- plusOne.print();
- streamExecutionEnvironment.execute();
- /*
- * Fold
- */
- DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT);
- DataStream<String> result = input.keyBy(0).fold("Start", new FoldFunction<Tuple4<String, String, String, Integer>, String>() {
- @Override
- public String fold(String str, Tuple4<String, String, String, Integer> value) throws Exception {
- // TODO Auto-generated method stub
- return str + "=" + value.f1 + " ";
- }
- });
- result.print();
- streamExecutionEnvironment.execute();
- public static final Tuple4[] TRANSCRIPT = new Tuple4[] {
- Tuple4.of("class1","张三","语文",100),
- Tuple4.of("class1","李四","语文",78),
- Tuple4.of("class1","王五","语文",99),
- Tuple4.of("class2","赵六","语文",81),
- Tuple4.of("class2","钱七","语文",59),
- Tuple4.of("class2","马二","语文",97)
- };
- /**
- 1> Start = 赵六
- 1> Start = 赵六 = 钱七
- 1> Start = 赵六 = 钱七 = 马二
- 2> Start = 张三
- 2> Start = 张三 = 李四
- 2> Start = 张三 = 李四 = 王五
- */
- /*
- * Reduce
- */
- DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT);
- KeyedStream<Tuple4<String, String, String, Integer>, Tuple> keyedStream = input.keyBy(0);
- keyedStream.reduce(new ReduceFunction<Tuple4<String, String, String, Integer>>() {
- @Override
- public Tuple4<String, String, String, Integer> reduce(Tuple4<String, String, String, Integer> value1,
- Tuple4<String, String, String, Integer> value2) throws Exception {
- // TODO Auto-generated method stub
- value1.f3 += value2.f3;
- return value1;
- }
- }).print();
- streamExecutionEnvironment.execute();
- /**
- 2> (class1, 张三, 语文, 100)
- 2> (class1, 张三, 语文, 178)
- 2> (class1, 张三, 语文, 277)
- 1> (class2, 赵六, 语文, 81)
- 1> (class2, 赵六, 语文, 140)
- 1> (class2, 赵六, 语文, 237)
- */
- /*
- * Project
- */
- DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT);
- DataStream<Tuple2<String, Integer>> output = input.project(1, 3);
- output.print();
- streamExecutionEnvironment.execute();
- /**
- 4> (张三, 100)
- 4> (钱七, 59)
- 2> (王五, 99)
- 3> (赵六, 81)
- 1> (李四, 78)
- 1> (马二, 97)
- */
- /*
- * SplitAndSelect
- */
- DataStream<Long> input = streamExecutionEnvironment.generateSequence(0, 10);
- SplitStream<Long> splitStream = input.split(new OutputSelector<Long>() {
- @Override
- public Iterable<String> select(Long value) {
- // TODO Auto-generated method stub
- List<String> output = new ArrayList<>();
- if (value % 2 == 0) {
- output.add(EVEN);
- } else {
- output.add(ODD);
- }
- return output;
- }
- });
- // splitStream.print();
- DataStream<Long> even = splitStream.select(EVEN);
- DataStream<Long> odd = splitStream.select(ODD);
- DataStream<Long> all = splitStream.select(EVEN, ODD);
- odd.print();
- streamExecutionEnvironment.execute();
- /*
- * FlatMap
- */
- DataStream<String> input = streamExecutionEnvironment.fromElements(WordCountData.WORDS);
- DataStream<String> wordStream = input.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String value, Collector<String> out) throws Exception {
- // TODO Auto-generated method stub
- Arrays.asList(value.toLowerCase().split("\\W+")).stream().filter(str -> str.length()> 0).forEach(str -> out.collect(str));
- }
- });
- wordStream.print();
- streamExecutionEnvironment.execute();
来源: http://www.bubuko.com/infodetail-3294420.html