以电信通话记录为例
移动呼叫及其持续时间将作为对 Apache Storm 的输入, Storm 将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数.
编程思想:
在 storm 中, 把对数据的处理过程抽象成一个 topology, 这个 topology 包含的组件主要是 spout,bolt, 以及以 tuple 形式在组件之间传输的数据流. 这个数据流在 topology 流一遍, 就是对数据的一次处理.
1, 创建 Spout 类
这一部分, 是创建数据流的源头.
创建一个类, 实现 IRichSpout 接口, 实现相应方法. 其中几个方法的含义:
open - 为 Spout 提供执行环境. 执行器将运行此方法来初始化喷头. 一般写一些第一次运行时要处理的逻辑
nextTuple - 通过收集器发出生成的数据. 核心, 用于生成数据流
close - 当 spout 将要关闭时调用此方法.
declareOutputFields - 声明元组的输出模式. 即, 声明了从此 spout 出去的流都的数据格式
ack - 确认处理了特定元组.
fail - 指定不处理和不重新处理特定元组.
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - 为此 spout 提供 storm 配置.
context - 提供有关拓扑中的 spout 位置, 其任务 ID, 输入和输出信息的完整信息.
collector - 使我们能够发出将由 bolts 处理的元组.
nextTuple()
nextTuple() 从与 ack() 和 fail() 方法相同的循环中定期调用. 它必须释放线程的控制, 当没有工作要做, 以便其他方法有机会被调用. 因此, nextTuple 的第一行检查处理是否已完成. 如果是这样, 它应该休眠至少一毫秒, 以减少处理器在返回之前的负载.
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - 它用于声明输出流 id, 输出字段等, 此方法用于指定元组的输出模式.
ack(Object msgId)
该方法确认已经处理了特定元组.
fail(Object o)
此方法通知特定元组尚未完全处理. Storm 将重新处理特定的元组
- package com.jing.calllogdemo;
- import org.apache.storm.spout.SpoutOutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.IRichSpout;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Values;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- /*
- spout 类, 负责产生数据流
- */
- public class CallLogSpout implements IRichSpout {
- //spout 输出收集器
- private SpoutOutputCollector collector;
- // 是否完成
- private boolean completed = false;
- // 上下文对象
- private TopologyContext context;
- // 随机发生器
- private Random randomGenerator = new Random();
- // 索引
- private Integer idx = 0;
- @Override
- public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- // 第一次运行要做的事
- this.context = topologyContext;
- this.collector = spoutOutputCollector;
- }
- @Override
- public void close() {
- }
- @Override
- public void activate() {
- }
- @Override
- public void deactivate() {
- }
- @Override
- public void nextTuple() {
- // 产生第一条数据,
- if (this.idx <= 1000){
- List<String> mobileNumbers = new ArrayList<String>();
- mobileNumbers.add("1234123401");
- mobileNumbers.add("1234123402");
- mobileNumbers.add("1234123403");
- mobileNumbers.add("1234123404");
- Integer localIdx = 0;
- while (localIdx++ <100 && this.idx++ <1000){
- // 取出主叫
- String caller = mobileNumbers.get(randomGenerator.nextInt(4));
- // 取出被叫
- String callee = mobileNumbers.get(randomGenerator.nextInt(4));
- while (caller == callee){
- // 重新取出被叫
- callee = mobileNumbers.get(randomGenerator.nextInt(4));
- }
- // 模拟通话时长
- Integer duration = randomGenerator.nextInt(60);
- // 输出元祖
- this.collector.emit(new Values(caller,callee,duration));
- }
- }
- }
- @Override
- public void ack(Object o) {
- }
- @Override
- public void fail(Object o) {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- // 声明输出字段, 定义元组的结构, 定义输出字段名称
- outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
- CallLogSpout
2, 创建 Bolt 类
这一部分是完成对数据流的处理, Bolt 把元组作为输入, 对元组进行处理后, 产生新的元组.
创建一个类, 实现 IRichBolt 接口, 实现相应方法.
prepare - 为 bolt 提供要执行的环境. 执行器将运行此方法来初始化 spout.
execute - 处理单个元组的输入
cleanup - 当 spout 要关闭时调用.
declareOutputFields - 声明元组的输出模式.
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf - 为此 bolt 提供 Storm 配置.
context - 提供有关拓扑中的 bolt 位置, 其任务 ID, 输入和输出信息等的完整信息.
collector - 使我们能够发出处理的元组.
execute(Tuple tuple)
这是 bolt 的核心方法, 这里的元组是要处理的输入元组. execute 方法一次处理单个元组. 元组数据可以通过 Tuple 类的 getValue 方法访问. 不必立即处理输入元组. 多元组可以被处理和输出为单个输出元组. 处理的元组可以通过使用 OutputCollector 类发出.
- cleanup()
- declareOutputFields(OutputFieldsDeclarer declarer)
这个方法用于指定元组的输出模式, 参数 declarer 用于声明输出流 id, 输出字段等.
这里有两个 bolt
呼叫日志创建者 bolt 接收呼叫日志元组. 呼叫日志元组具有主叫方号码, 接收方号码和呼叫持续时间. 此 bolt 通过组合主叫方号码和接收方号码简单地创建一个新值. 新值的格式为 "来电号码 - 接收方号码", 并将其命名为新字段 "呼叫"
- package com.jing.calllogdemo;
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.IRichBolt;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
- import java.util.Map;
- /*
- 创建 calllog 日志的 bolt
- */
- public class CallLogCreatorBolt implements IRichBolt {
- private OutputCollector collector;
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.collector = outputCollector;
- }
- @Override
- public void execute(Tuple tuple) {
- // 处理新的同话记录
- String from = tuple.getString(0);
- String to = tuple.getString(1);
- Integer duration = tuple.getInteger(2);
- // 产生新的 tuple
- String fromTO = from + "-" + to;
- collector.emit(new Values(fromTO, duration));
- }
- @Override
- public void cleanup() {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- // 设置输出字段的名称
- outputFieldsDeclarer.declare(new Fields("call", "duration"));
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
- CallLogCreatorBolt
呼叫日志创建者 bolt 接收呼叫日志元组. 呼叫日志元组具有主叫方号码, 接收方号码和呼叫持续时间. 此 bolt 通过组合主叫方号码和接收方号码简单地创建一个新值. 新值的格式为 "来电号码 - 接收方号码", 并将其命名为新字段 "呼叫".
- package com.jing.calllogdemo;
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.IRichBolt;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import java.util.HashMap;
- import java.util.Map;
- /*
- 通话记录计数器 bolt
- */
- public class CallLogCounterBolt implements IRichBolt {
- Map<String, Integer> counterMap;
- private OutputCollector collector;
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.counterMap = new HashMap<String, Integer>();
- this.collector = outputCollector;
- }
- @Override
- public void execute(Tuple tuple) {
- String call = tuple.getString(0);
- Integer duration = tuple.getInteger(1);
- if(!counterMap.containsKey(call)){
- counterMap.put(call, 1);
- }else {
- Integer c = counterMap.get(call) + duration;
- counterMap.put(call, c);
- }
- collector.ack(tuple);
- }
- @Override
- public void cleanup() {
- for(Map.Entry<String, Integer> entry : counterMap.entrySet()){
- System.out.println(entry.getKey() + ":" + entry.getValue());
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("call"));
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
- CallLogCounterBolt
3, 创建执行入口类, 构建 Topology
Storm 拓扑基本上是一个 Thrift 结构. TopologyBuilder 类提供了简单而容易的方法来创建复杂的拓扑. TopologyBuilder 类具有设置 spout(setSpout) 和设置 bolt(setBolt) 的方法. 最后, TopologyBuilder 有 createTopology 来创建拓扑. 使用以下代码片段创建拓扑 -
- package com.jing.calllogdemo;
- import org.apache.storm.Config;
- import org.apache.storm.LocalCluster;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.generated.AlreadyAliveException;
- import org.apache.storm.generated.AuthorizationException;
- import org.apache.storm.generated.InvalidTopologyException;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.tuple.Fields;
- public class App {
- public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {
- TopologyBuilder builder = new TopologyBuilder();
- // 设置 spout
- builder.setSpout("spout", new CallLogSpout());
- // 设置 creator-bolt
- builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
- // 设置 countor-bolt
- builder.setBolt("counter-bolt", new CallLogCounterBolt()).
- fieldsGrouping("creator-bolt", new Fields("call"));
- Config config = new Config();
- config.setDebug(true);
- /* 本地模式
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
- Thread.sleep(10000);
- cluster.shutdown();
- */
- StormSubmitter.submitTopology("myTop", config, builder.createTopology());
- }
- }
- App
为了开发目的, 我们可以使用 "LocalCluster" 对象创建本地集群, 然后使用 "LocalCluster" 类的 "submitTopology" 方法提交拓扑. "submitTopology" 的参数之一是 "Config" 类的实例."Config" 类用于在提交拓扑之前设置配置选项. 此配置选项将在运行时与集群配置合并, 并使用 prepare 方法发送到所有任务 (spout 和 bolt). 一旦拓扑提交到集群, 我们将等待 10 秒钟, 集群计算提交的拓扑, 然后使用 "LocalCluster" 的 "shutdown" 方法关闭集群. 完整的程序代码如下 -
参考:
作者: raincoffee
链接: https://www.jianshu.com/p/7af9693d9ffc
来源: http://www.bubuko.com/infodetail-3093864.html