以下代码演示 function, filter, projection 的使用, 可结合注释
省略部分代码, 省略部分可参考: https://blog.csdn.net/nickta/article/details/79666918
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
- new Values("nickt1", 4),
- new Values("nickt2", 7),
- new Values("nickt3", 8),
- new Values("nickt4", 9),
- new Values("nickt5", 7),
- new Values("nickt6", 11),
- new Values("nickt7", 5)
- );
- spout.setCycle(false);
- TridentTopology topology = new TridentTopology();
- topology.newStream("spout1", spout)
- .shuffle()
- .each(new Fields("user"),new BaseFilter() {
- @Override
- public boolean isKeep(TridentTuple tuple) {
- if(tuple.getString(0).equals("nickt2")) {
- return false;
- }
- return true;
- }
- })// 过滤点 user 为 nickt2 的 tuple
- .each(new Fields("user", "score"),new Debug("filter print:"))
- .each(new Fields("score"), new BaseFunction() {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- collector.emit(new Values(tuple.getIntegerByField("score") + 100));
- }
- }, new Fields("sum"))// 把 score 加上 100 后, 生成新的 sum 字段, 并追加到原字段后面, 此步操作后就包括了 user/score/sum 三个字段
- .each(new Fields("user", "score", "sum"),new Debug("function print:"))
- .project(new Fields("user"))
- .each(new Fields("user"),new Debug("project print:"));//project 投射之后, 只有 user 字段了
输出:
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt1, 4]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt1, 4, 104]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt1]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt3, 8]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt3, 8, 108]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt3]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt4, 9]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt4, 9, 109]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt4]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt5, 7]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt5, 7, 107]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt5]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt6, 11]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt6, 11, 111]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt6]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt7, 5]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt7, 5, 105]
- <Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt7]
来源: http://www.bubuko.com/infodetail-2537133.html