代码参考:
生产者, 根据某个标识将消息放到同一个队列中
- public class Producer {
- public static void main(String[] args) throws MQClientException {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
- producer.setNamesrvAddr("10.130.41.36:9876");
- producer.setInstanceName("Producer");
- producer.setVipChannelEnabled(false);
- producer.start();
- String[] tags = {"tagA","tagB"};
- for (int i = 1; i <= 10; i++) {
- try {
- Message msg = new Message("TopicTest",tags[i%tags.length],"key1"+i,("订单一号" + i).getBytes());
- SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),1);
- System.out.println(sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- for (int i = 1; i <= 10; i++) {
- try {
- Message msg = new Message("TopicTest",tags[i%tags.length],"key2"+i,("订单二号" + i).getBytes());
- SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),2);
- System.out.println(sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- for (int i = 1; i <= 10; i++) {
- try {
- Message msg = new Message("TopicTest",tags[i%tags.length],"key3"+i,("订单三号" + i).getBytes());
- SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),3);
- System.out.println(sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- producer.shutdown();
- }
- }
Topic 队列中的内容:
消费者:
一. 顺序消费
使用 MessageListenerOrderly, 顺序消费同一个队列中的数据, 只有第一个数据消费成功了才会消费第二个数据.
模拟在消费某个数据时出现了阻塞状态.
- public class ConsumerOrderly {
- public static void main(String[] args) throws InterruptedException,
- MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
- consumer.setNamesrvAddr("10.130.41.36:9876");
- consumer.setInstanceName("Consumer1");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- // 设置自动提交, 如果不设置自动提交就算返回 SUCCESS, 消费者关闭重启 还是会重复消费的
- context.setAutoCommit(true);
- try {
- for (MessageExt msg:msgs) {
- String msgKey = msg.getKeys();
- if(msgKey.equals("key13") || msgKey.equals("key22")){
- Thread.sleep(1000);
- }
- System.out.println("消费者 1 ==> 当前线程:"+Thread.currentThread().getName()+",quenuID:"+msg.getQueueId()+ ",content:" + new String(msg.getBody()));
- }
- } catch (Exception e) {
- e.printStackTrace();
- // 如果出现异常, 消费失败, 挂起消费队列一会会, 稍后继续消费
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // 消费成功
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- /**
- * Consumer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- */
- consumer.start();
- System.out.println("C1 Started.");
- }
- }
测试结果如下:
当 "订单一号 3" 没有消费时, 他所在队列中后面的数据是不能被消费的."订单二号 2" 也是同样的情况.
二. 并发消费
使用 MessageListenerConcurrently, 并发消费同一个队列中的数据, 不能保证消费的顺序.
模拟在消费某个数据时出现了阻塞状态.
- public class ConsumerConcurrently {
- public static void main(String[] args) throws InterruptedException,
- MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
- consumer.setNamesrvAddr("10.130.41.36:9876");
- consumer.setInstanceName("Consumer1");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- for (MessageExt msg:msgs) {
- String msgKey = msg.getKeys();
- if(msgKey.equals("key13") || msgKey.equals("key22")){
- Thread.sleep(1000);
- }
- System.out.println("消费者 1 ==> 当前线程:"+Thread.currentThread().getName()+",quenuID:"+msg.getQueueId()+ ",content:" + new String(msg.getBody()));
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- // 消费成功
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("C1 Started.");
- }
- }
测试结果如下
当消费 "订单二号 3" 阻塞时, 会将后面的数据交给其他线程消费, 所以 "订单一号 4" 在 "订单一号 3" 之前消费了.
三. 集群消费
不同消费者设置成相同的组名, 在 MessageModel.CLUSTERING 模式下, 不同消费者会消费不同的队列, 同一个消费者中保证顺序
消费者 1
- public class ConsumerOrderly_1 {
- public static void main(String[] args) throws InterruptedException,
- MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
- consumer.setNamesrvAddr("10.130.41.36:9876");
- consumer.setInstanceName("Consumer1");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- // 设置自动提交, 如果不设置自动提交就算返回 SUCCESS, 消费者关闭重启 还是会重复消费的
- context.setAutoCommit(true);
- try {
- for (MessageExt msg:msgs) {
- String msgKey = msg.getKeys();
- if(msgKey.equals("key13")){
- Thread.sleep(1000);
- }
- System.out.println("消费者 1 ==> 当前线程:"+Thread.currentThread().getName()+",quenuID:"+msg.getQueueId()+ ",content:" + new String(msg.getBody()));
- }
- } catch (Exception e) {
- e.printStackTrace();
- // 如果出现异常, 消费失败, 挂起消费队列一会会, 稍后继续消费
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // 消费成功
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- /**
- * Consumer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- */
- consumer.start();
- System.out.println("C1 Started.");
- }
- }
消费者 2
- public class ConsumerOrderly_2 {
- public static void main(String[] args) throws InterruptedException,
- MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
- consumer.setNamesrvAddr("10.130.41.36:9876");
- consumer.setInstanceName("Consumer2");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- // 设置自动提交, 如果不设置自动提交就算返回 SUCCESS, 消费者关闭重启 还是会重复消费的
- context.setAutoCommit(true);
- try {
- for (MessageExt msg:msgs) {
- String msgKey = msg.getKeys();
- if(msgKey.equals("key22")){
- Thread.sleep(1000);
- }
- System.out.println("消费者 2 ==> 当前线程:"+Thread.currentThread().getName()+",quenuID:"+msg.getQueueId()+ ",content:" + new String(msg.getBody()));
- }
- } catch (Exception e) {
- e.printStackTrace();
- // 如果出现异常, 消费失败, 挂起消费队列一会会, 稍后继续消费
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // 消费成功
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- /**
- * Consumer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- */
- consumer.start();
- System.out.println("C2 Started.");
- }
- }
测试结果如下:
消费者 1 负责队列 1, 并保证队列 1 中的所有消息是按照顺序消费的
消费者 2 负责队列 2 和队列 3, 根据 "订单二号 2" 可以看出, 他保证了队列 2 和队列 3 的顺序消费.
四. 消费者 A 和消费者 B 同组, 消费者 A 消费 tagA, 消费者 B 消费 tagB 如图
在这种情况下, 因为集群中订阅消息不一致, 导致消费出现问题, 最后启动的消费者才可以正常消费消息.
要解决这个问题, 需要保证集群中的消费者拥有统一的订阅消息, Topic 和 Tag 要一致才可以.
参考:
- https://www.jianshu.com/p/524ef06ce25a
- https://mp.weixin.qq.com/s/HbIS0yEJsCPMYwwYDBIvMQ
五. 消费者 A 和消费者 B 不同组, 消费者 A 消费 tagA, 消费者 B 消费 tagB 如图
在消费者 1 中, 能保证 tagA1,tagA2 顺序的消费, 消费者 2 中能保证 tagB1,tagB2 顺序的消费.
但是不能保证 tagA1 和 tagB1 的消费顺序.
测试代码:
消费者 1
- public class ConsumerOrderly_1 {
- public static void main(String[] args) throws InterruptedException,
- MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
- consumer.setNamesrvAddr("10.130.41.36:9876");
- consumer.setInstanceName("Consumer1");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("TopicTest", "tagA");
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- // 设置自动提交, 如果不设置自动提交就算返回 SUCCESS, 消费者关闭重启 还是会重复消费的
- context.setAutoCommit(true);
- try {
- for (MessageExt msg:msgs) {
- System.out.println("消费者 1 ==> 当前线程:"+Thread.currentThread().getName()+",quenuID:"+msg.getQueueId()+ ",content:" + new String(msg.getBody()));
- }
- } catch (Exception e) {
- e.printStackTrace();
- // 如果出现异常, 消费失败, 挂起消费队列一会会, 稍后继续消费
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // 消费成功
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- /**
- * Consumer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- */
- consumer.start();
- System.out.println("C1 Started.");
- }
- }
消费者 2
- public class ConsumerOrderly_2 {
- public static void main(String[] args) throws InterruptedException,
- MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName1");
- consumer.setNamesrvAddr("10.130.41.36:9876");
- consumer.setInstanceName("Consumer2");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("TopicTest", "tagB");
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- // 设置自动提交, 如果不设置自动提交就算返回 SUCCESS, 消费者关闭重启 还是会重复消费的
- context.setAutoCommit(true);
- try {
- for (MessageExt msg:msgs) {
- String msgKey = msg.getKeys();
- if(msgKey.equals("key11")){
- Thread.sleep(1000);
- }
- System.out.println("消费者 2 ==> 当前线程:"+Thread.currentThread().getName()+",quenuID:"+msg.getQueueId()+ ",content:" + new String(msg.getBody()));
- }
- } catch (Exception e) {
- e.printStackTrace();
- // 如果出现异常, 消费失败, 挂起消费队列一会会, 稍后继续消费
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // 消费成功
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- /**
- * Consumer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- */
- consumer.start();
- System.out.println("C2 Started.");
- }
- }
测试结果:
消费者 1
消费者 2
"订单一号 2" 在 "订单一号 1" 前被消费了.
来源: https://www.cnblogs.com/Sicwen/p/10528201.html