我们在学习 ack 机制的时候,我们知道 Storm 的 Bolt 有 BaseBasicBolt 和 BaseRichBolt。
在 BaseBasicBolt 中,BasicOutputCollector 在 emit 数据的时候,会自动和输入的 tuple 相关联,而在 execute 方法结束的时候那个输入 tuple 会被自动 ack。
在使用 BaseRichBolt 需要在 emit 数据的时候,显示指定该数据的源 tuple 要加上第二个参数 anchor tuple,以保持 tracker 链路,即 collector.emit(oldTuple, newTuple); 并且需要在 execute 执行成功后调用 OutputCollector.ack(tuple), 当失败处理时,执行 OutputCollector.fail(tuple);
那么我们来看看 BasicBolt 的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要 To see is to believe。
为了方便看源代码,我先上我们的继承类:
- public class SplitSentenceBolt extends BaseBasicBolt {
- public void prepare(Map stormConf, TopologyContext context) {
- super.prepare(stormConf, context);
- }
- //5:执行我们自己的逻辑处理方法,接收传入的参数。
- public void execute(Tuple input, BasicOutputCollector collector) {
- String sentence = (String) input.getValueByField("sentence");
- String[] words = sentence.split(" ");
- for (String word: words) {
- word = word.trim();
- word = word.toLowerCase();
- collector.emit(new Values(word, 1)); //这个地方就是调用OutputCollector的包装类,来发消息
- }
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "num"));
- }
- }
通过打断点,我们发现,bolt 的 task 会创建这个类下面会标准执行顺序
- public class BasicBoltExecutor implements IRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
- private IBasicBolt _bolt;
- private transient BasicOutputCollector _collector;
- //1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。
- public BasicBoltExecutor(IBasicBolt bolt) {
- _bolt = bolt;
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _bolt.declareOutputFields(declarer); //这里就是调用SplitSentenceBolt对象的方法了。
- } //2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- _bolt.prepare(stormConf, context);
- _collector = new BasicOutputCollector(collector);
- } //3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]
- public void execute(Tuple input) {
- _collector.setContext(input); //把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。
- try {
- _bolt.execute(input, _collector); //这个地方是调用我们实现类SplitSentenceBolt的ececute方法。
- _collector.getOutputter().ack(input); //这个地方就是响应
- } catch(FailedException e) {
- if (e instanceof ReportedFailedException) {
- _collector.reportError(e);
- }
- _collector.getOutputter().fail(input); //这个地方就是响应
- }
- }
- public void cleanup() {
- _bolt.cleanup();
- }
- public Map getComponentConfiguration() {
- return _bolt.getComponentConfiguration();
- }
- }
- public class BasicOutputCollector implements IBasicOutputCollector {
- private OutputCollector out;
- private Tuple inputTuple;
- public BasicOutputCollector(OutputCollector out) {
- this.out = out;
- }
- //4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。
- public void setContext(Tuple inputTuple) {
- this.inputTuple = inputTuple;
- } //6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple来保持tracker链路,
- 而这个anchor tuple就是bolt接收到转换前的源tuple数据。public List emit(List < Object > tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, tuple);
- }
- public List emit(String streamId, List tuple) {
- return out.emit(streamId, inputTuple, tuple);
- }
- public void emitDirect(int taskId, String streamId, List tuple) {
- out.emitDirect(taskId, streamId, inputTuple, tuple);
- }
- public void emitDirect(int taskId, List tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
- }
- protected IOutputCollector getOutputter() {
- return out;
- }
- public void reportError(Throwable t) {
- out.reportError(t);
- }
- }
这里大家不要纠结 bolt 的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor 对象创建后的执行过程,以这我们来看执行的过程。在 BasicBoltExecutor 的 execute 方法中,我们看到了 ack 和 fail 方法会被自动调用的,当我们的程序抛出异常则会执行 fail 方法的。
这个
来源: http://www.cnblogs.com/intsmaze/p/5924873.html