只是对阿里MQ的ajva-SDK做了个简单封装。
发布者直接调用:
封装类:MQProducerUtils
/**
/**
/**
* 指定时间发送MQ
* @param topic 可理解为一级类别
* @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
* @param body 消息体
* @param date 指定时间发送消息
* @return 消息的唯一ID 如果返回为null就表示发送失败,主流程不需要管返回
*/
监听者启动监听
MQConsumerListener
/**
* 集群订阅消息
* @param topic MQ主题
* @param tag 二级类别
* @param callBack 回调包名+类名
* @param consumerId 订阅者ID
*/
public static void MQListener(String topic, String tag, final MQCallBack callBack,String consumerId)
/**
* 广播的方式发送MQ
* @param topic MQ主题
* @param tag 二级类别
* @param callback 回调包名+类名
* @param consumerId 订阅者ID
*/
错误示例二:不同JVM同一订阅组(CID-ONS-FAQ)订阅的TOPIC相同,但Tag不同(分别NM-ONS-FAQ-1、NM-ONS-FAQ-2)
建议解决方案
请确保在不同JVM中使用相同的ConsumerId启动多个Consumer时,配置的Topic和Tag是一致的。
参见:
启动多个程序,CID topic tag 必须是一套
比方说您有四个程序启动
集群模式:(4个程序一起瓜分消息)
程序1:CID1 topic1 tag1
程序2:CID1 topic1 tag1
程序3:CID1 topic1 tag1
程序4:CID1 topic1 tag1
广播模式:(前面两个程序一组瓜分消息,后面两个程序一组瓜分消息,这两个组都能收到相同消息)
程序1:CID1 topic1 tag1
程序2:CID1 topic1 tag1
程序3:CID2 topic1 tag1
程序4:CID2 topic1 tag1
- import com.zcmall.core.config.AbstractFileConfig;
- /**
- * 邮件配置文件
- *
- * @author wupengfei wupf86@126.com
- *
- */
- public abstract class MQFileConfig extends AbstractFileConfig {
- private static final String FILE = "classpath*:config/mq/mq.properties";
- static {
- load(FILE, getKeyPrefix());
- }
- /**
- * 获取指定键的值
- *
- * @param config
- * @return
- */
- public static String getValue(MQConfigEnum config) {
- return AbstractFileConfig.getValue(getKeyPrefix() + config.getKey(), null);
- }
- /**
- * 获取配置文件中指定key的值,若==null则返回默认值
- *
- * @param config
- * @param defaultValue
- * @return
- */
- public static String getValue(MQConfigEnum config, String defaultValue) {
- return AbstractFileConfig.getValue(getKeyPrefix() + config.getKey(), defaultValue);
- }
- /**
- * 获取前辍
- *
- * @return
- */
- public static String getKeyPrefix() {
- return "mq#";
- }
- /**
- * 获取配置文件的路径
- *
- * @return
- */
- public static String getFile() {
- return FILE;
- }
- }
- /**
- * 阿里MQ
- *
- * @author gjl gjl@163.com
- *
- */
- public enum MQConfigEnum {
- /**
- * 发送端ID
- */
- PRODUCER_ID("producer.id"),
- /**
- * 消费端ID
- */
- CONSUMER_ID("consumer.id"),
- /**
- * 阿里MQ配置
- */
- ACCESS_KEY("access.key"),
- /**
- * MQ订阅者
- */
- LISTENER("listener"),
- /**
- * 阿里MQ配置
- */
- SECRET_KEY("secret.key");
- private String key;
- private MQConfigEnum(String key) {
- this.key = key;
- }
- public String getKey() {
- return key;
- }
- }
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.ConsumeContext;
- public interface MQCallBack {
- public boolean exec(Message message, ConsumeContext context);
- }
- import java.util.Properties;
- import com.aliyun.openservices.ons.api.Action;
- import com.aliyun.openservices.ons.api.ConsumeContext;
- import com.aliyun.openservices.ons.api.Consumer;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.MessageListener;
- import com.aliyun.openservices.ons.api.ONSFactory;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.aliyun.openservices.ons.api.PropertyValueConst;
- import com.zcmall.core.log.Log;
- import com.zcmall.core.log.LogFactory;
- public class MQConsumerListener {
- private static Log log = LogFactory.getLog(MQProducerUtils.class);
- private static Properties props;
- private static String LOG_INFO = "启动集群订阅:topic=%s,tag=%s,consumerId=%s,callBack=%s";
- static {
- props = new Properties();
- props.put(PropertyKeyConst.AccessKey, MQFileConfig.getValue(MQConfigEnum.ACCESS_KEY));
- props.put(PropertyKeyConst.SecretKey, MQFileConfig.getValue(MQConfigEnum.SECRET_KEY));
- }
- /**
- * 集群订阅消息
- * @param topic MQ主题
- * @param tag 二级类别
- * @param callBack 回调包名+类名
- * @param consumerId 订阅者ID
- */
- public static void MQListener(String topic, String tag, final MQCallBack callBack,String consumerId){
- props.put(PropertyKeyConst.ConsumerId, consumerId);
- // props.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
- Consumer consumer = ONSFactory.createConsumer(props);
- System.out.println(consumer);
- consumer.subscribe(topic, tag, new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- boolean bool = callBack.exec(message, context);
- Action result = Action.CommitMessage;
- if(!bool){
- result = Action.ReconsumeLater;
- }
- return result;
- }
- });
- consumer.start();
- log.info(String.format(LOG_INFO, topic,tag,consumerId,callBack));
- }
- /**
- * 广播的方式发送MQ
- * @param topic MQ主题
- * @param tag 二级类别
- * @param callBack 回调包名+类名
- * @param consumerId 订阅者ID
- */
- public static void MQPubSubListener(final String topic,final String tag, final MQCallBack callBack,String consumerId){
- props.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING);
- Consumer consumer = ONSFactory.createConsumer(props);
- consumer.subscribe(topic, tag, new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- boolean bool = callBack.exec(message, context);
- Action result = Action.CommitMessage;
- if(!bool){
- result = Action.ReconsumeLater;
- }
- return result;
- }
- });
- consumer.start();
- log.info(String.format(LOG_INFO, topic,tag,consumerId,callBack));
- }
- }
- import java.util.Date;
- import java.util.Properties;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.ONSFactory;
- import com.aliyun.openservices.ons.api.Producer;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.aliyun.openservices.ons.api.SendResult;
- import com.zcmall.core.log.Log;
- import com.zcmall.core.log.LogFactory;
- public class MQProducerUtils {
- private static Log log = LogFactory.getLog(MQProducerUtils.class);
- private static Properties props;
- static {
- props = new Properties();
- props.put(PropertyKeyConst.ProducerId, MQFileConfig.getValue(MQConfigEnum.PRODUCER_ID));
- props.put(PropertyKeyConst.AccessKey, MQFileConfig.getValue(MQConfigEnum.ACCESS_KEY));
- props.put(PropertyKeyConst.SecretKey, MQFileConfig.getValue(MQConfigEnum.SECRET_KEY));
- }
- /**
- * 发送集群MQ 多个消费者只会有一个消费者接收到。
- * @param topic 可理解为一级类别
- * @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
- * @param body 消息体
- * @return 消息的唯一ID 如果返回为null就表示发送失败,主流程不需要管返回
- */
- public static String sendMQ(String topic, String tag, String body) {
- Message msg = new Message(topic, tag, body.getBytes());
- return sendMQ(msg);
- }
- /**
- * 设置延迟发送MQ
- * @param topic 可理解为一级类别
- * @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
- * @param body 消息体
- * @param delayTime 延迟的时间单位毫秒
- * @return 消息的唯一ID 如果返回为null就表示发送失败,主流程不需要管返回
- */
- public static String sendMQDelayTime(String topic, String tag, String body,long delayTime){
- Message msg = new Message(topic, tag, body.getBytes());
- msg.setStartDeliverTime(System.currentTimeMillis() + delayTime);
- return sendMQ(msg);
- }
- /**
- * 指定时间发送MQ
- * @param topic 可理解为一级类别
- * @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
- * @param body 消息体
- * @param date 指定时间发送消息
- * @return 消息的唯一ID 如果返回为null就表示发送失败,主流程不需要管返回
- */
- public static String sendMqDelayDate(String topic, String tag, String body,Date date){
- Message msg = new Message(topic, tag, body.getBytes());
- msg.setStartDeliverTime(date.getTime());
- return sendMQ(msg);
- }
- /**
- * 发送消息
- * @param msg
- * @return
- */
- private static String sendMQ(Message msg) {
- try {
- Producer producer = ONSFactory.createProducer(props);
- // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
- producer.start();
- // 发送消息,只要不抛异常就是成功
- SendResult sendResult = producer.send(msg);
- producer.shutdown();
- return sendResult.getMessageId();
- } catch (Exception e) {
- // 吃掉这个异常不影响主流程
- log.error("###########消息队列发送失败:" + e.getMessage());
- }
- return null;
- }
- }
来源: https://www.oschina.net/code/snippet_2349285_58611