- public Consumer2() {
- try {
- String group_name = "test_model_consumer_name";
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
- consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
- consumer.subscribe("test_model_topic2", "TagA");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- //consumer.setMessageModel(MessageModel.BROADCASTING);
- consumer.registerMessageListener(new Listener());
- consumer.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- class Listener implements MessageListenerConcurrently {
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- for(MessageExt msg : msgs){
- String topic = msg.getTopic();
- String msgBody = new String(msg.getBody(),"utf-8");
- String tags = msg.getTags();
- //if(tags.equals("TagA")) {
- System.out.println("收到消息:" + "topic :" + topic + ",tags :" + tags + ",msg :" + msgBody);
- //
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
- public static void main(String[] args) {
- Consumer2 c2 = new Consumer2();
- System.out.println("c2 start..");
- }
- }
来源: http://www.bubuko.com/infodetail-2944650.html