其中用到了滑动窗口函数大小 30 秒, 间隔 15 秒, 且大于窗口 10 秒的数据, 被丢弃.(实际业务这三个值 应为是 10 分钟, 1 分钟, 5 分钟). 代码先记录一下
- public static void main(String[] arg) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableSysoutLogging();// 开启 Sysout 打日志
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置窗口的时间单位为 process time
- Properties props = new Properties();
- props.put("bootstrap.servers", "kafkaip:9092");
- props.put("group.id", "metric-group4");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "earliest"); //value 反序列化
- DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
- "im-message-topic3", //kafka topic
- new SimpleStringSchema(), // String 序列化
- props)).setParallelism(1);
- DataStream<Message> bean3DataStream = dataStreamSource.map(new MapFunction<String, Message>() {
- @Override
- public Message map(String value) throws Exception {
- logger.info("receive msg:"+value);
- JSONObject jsonObject =JSONObject.parseObject(value);
- Message s= new Message(
- jsonObject.getString("sessionId"),
- jsonObject.getString("fromUid"),
- jsonObject.getString("toUid"),
- jsonObject.getString("chatType"),
- jsonObject.getString("type"),
- jsonObject.getString("msgId"),
- jsonObject.getString("msg"),
- jsonObject.getLong("timestampSend")
- );
- return s;
- }
- });
- // 设置水印, 并过滤数据
- DataStream<Message> bean3DataStreamWithAssignTime =
- bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction<Message, Message,TimeWindow>() {
- @Override
- public void apply(TimeWindow Windows, Iterable<Message> values, Collector<Message> out)
- throws Exception {
- for (Message t: values) {
- logger.info("window start time:"+new Date(Windows.getStart()).toString());
- logger.info("real time:"+new Date(t.getTimestampSend()).toString());
- if(t.getTimestampSend()<Windows.getStart()+1000*10) {
- logger.info("yes");
- out.collect(t);
- }else {
- logger.info("no");
- }
- }
- }
- });
- //bean3DataStreamWithAssignTime.addSink(new Sink());
- //bean3DataStreamWithAssignTime.writeAsText("/usr/local/whk3", WriteMode.OVERWRITE);
- StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
- tableEnv.registerDataStream("myTable", bean3DataStreamWithAssignTime, "sessionId, fromUid,toUid,chatType,type,msgId,msg,timestampSend,rowtime.rowtime");
- Table temp=tableEnv.scan("myTable");
- System.out.println("schema is:");
- temp.printSchema();
- // Table tb3 = tableEnv.sqlQuery("select * from myTable");
- // DataStream<Row> appendStream =tableEnv.toAppendStream(tb3, Row.class);
- // appendStream.addSink(new Sink());
- // 对过滤后的数据, 使用正则匹配数据
- Table tb2 = tableEnv.sqlQuery(
- "SELECT" +
- "*" +
- "FROM myTable" +
- " " +
- "MATCH_RECOGNIZE (" +
- "PARTITION BY sessionId" +
- "ORDER BY rowtime" +
- "MEASURES" +
- "e2.timestampSend as answerTime,"+
- "LAST(e1.timestampSend) as customer_event_time," +
- "e2.fromUid as empUid," +
- "e1.timestampSend as askTime," +
- "1 as total_talk" +
- "ONE ROW PER MATCH" +
- "AFTER MATCH SKIP TO LAST e2" +
- "PATTERN (e1+ e2+?)" +
- "DEFINE" +
- "e1 as e1.type ='yonghu'," +
- "e2 as e2.type ='guanjia' " +
- ")"+
- ""
- );
- DataStream<Row> appendStream2 =tableEnv.toAppendStream(tb2, Row.class);
- appendStream2.addSink(new Sink2());
- env.execute("msg v5");
- }
- public static class TruckTimestamp extends AscendingTimestampExtractor<Message> {
- private static final long serialVersionUID = 1L;
- @Override
- public long extractAscendingTimestamp(Message element) {
- return element.getTimestampSend();
- }
- }
- public static class Sink implements SinkFunction<Row> {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public void invoke(Row value) throws Exception {
- System.out.println(new Date().toString()+"orinal time:"+value.toString());
- }
- }
- public static class Sink2 implements SinkFunction<Row> {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public void invoke(Row value) throws Exception {
- System.out.println(new Date().toString()+"new time:"+value.toString());
- }
- }
来源: http://www.bubuko.com/infodetail-3118688.html