- /**
- * MQ 消息拉取者
- *
- */
- class MqPuller{
- private static final Logger logger = Logger.getLogger(MqPuller.class);
- private Ice.Communicator ic;
- private MQPollPrx mqPoll;
- private String proxy;//要连接的ICE服务
- private String token;//权限令牌
- private String uri;//分配URL
- private String queueNames;//消息类型
- private int count;//每次拉取多少条消息
- private boolean debugMode;
- /**
- * MQ 客户端初始化
- *
- * Properties default value is:
- *
- * jq.proxy=MQPollS:tcp -p 10000 -h 10.10.224.63
- * jq.token=qazwsx
- * jq.uri=test
- * jq.queueNames=31.30,31.40,31.50
- * jq.count=100
- * jq.debugMode=true
- *
- * @param prop
- */
- public void init(Properties prop) throws Exception{
- if (prop == null) {
- logger.error("init() parameter prop is null.");
- throw new IllegalArgumentException("prop cannot null!");
- }
- logger.info("init()...");
- proxy = prop.getProperty("jq.proxy", "MQPollS:tcp -p 10000 -h 10.10.224.63");
- token = prop.getProperty("jq.token", "654321");
- uri = prop.getProperty("jq.uri", "test");
- queueNames = prop.getProperty("jq.queueNames", "31.30,31.40,31.50");
- count = Integer.parseInt(prop.getProperty("jq.count", "100"));
- debugMode = Boolean.parseBoolean(prop.getProperty("jq.debugMode", "true"));
- logger.info("jq.proxy=" + proxy );
- logger.info("jq.token=" + token );
- logger.info("jq.uri=" + uri );
- logger.info("jq.queueNames=" + queueNames );
- logger.info("jq.count=" + count );
- logger.info("jq.debugMode=" + debugMode );
- if(ic == null || ic.isShutdown()){
- ic = Ice.Util.initialize("--Ice.MessageSizeMax=2000".split(";"));
- Ice.ObjectPrx objPrx = ic.stringToProxy(proxy);
- mqPoll = MQPollPrxHelper.checkedCast(objPrx);
- Map<String, String> map = new HashMap<String, String>();
- map.put("token", token);
- mqPoll = (MQPollPrx) mqPoll.ice_context(map);
- logger.info("init() complete!");
- }
- else{
- logger.info("init: ic is running !");
- }
- }
- /**
- * 从MQ拉取消息
- *
- * @return List<String[]> {queueName, msgId, msgContent}
- */
- public List<String[]> pullMessage() throws Exception{
- List<String[]> result = new ArrayList<String[]>();
- String[] queueNamesArray = queueNames.split(",");
- for (String queueName : queueNamesArray) {
- List<Integer> ids = mqPoll.pollIds(queueName, uri, count, 0);
- if(ids != null && ids.size()>0){
- for (Integer msgId : ids) {
- String msgContent = mqPoll.pollMsg(msgId);
- if(msgContent != null && msgContent.trim().length()>0){
- String arr[] = {queueName, msgId.toString(), msgContent};
- result.add(arr);
- }
- }
- }
- }
- return result;
- }
- public boolean commitMessage(String msgid) throws Exception{
- return mqPoll.CommitMsg(msgid);
- }
- public boolean isInit() throws Exception{
- return (ic != null && !ic.isShutdown());
- }
- public boolean isDebugMode() {
- return debugMode;
- }
- //该片段来自于http://www.codesnippet.cn/detail/160720134617.html
来源: http://www.codesnippet.cn/detail/160720134617.html