1, 实现功能
希望使用一套 API, 实现两种模式下的消息发送和接收功能, 方便业务程序调用
1, 发送 Topic
2, 发送 Queue
3, 接收 Topic
4, 接收 Queue
2, 接口设计
根据功能设计公共调用接口
- /**
- * 数据分发接口 (用于发送, 接收消息队列数据)
- *
- * @author eguid
- *
- */
- public interface MsgDistributeInterface {
- /**
- * 发送到主题
- *
- * @param topicName - 主题
- * @param data - 数据
- * @return
- */
- public boolean sendTopic(String topicName, byte[] data);
- /**
- * 发送到主题
- * @param topicName - 主题
- * @param data - 数据
- * @param offset - 偏移量
- * @param length - 长度
- * @return
- */
- boolean sendTopic(String topicName, byte[] data, int offset, int length);
- /**
- * 发送到队列
- *
- * @param queueName - 队列名称
- * @param data - 数据
- * @return
- */
- public boolean sendQueue(String queueName, byte[] data);
- /**
- * 发送到队列
- * @param queueName - 队列名称
- * @param data - 数据
- * @param offset
- * @param length
- * @return
- */
- public boolean sendQueue(String queueName, byte[] data, int offset, int length);
- /**
- * 接收队列消息
- * @param queueName 队列名称
- * @param listener
- * @throws JMSException
- */
- void receiveQueue(String queueName, MessageListener listener) throws JMSException;
- /**
- * 订阅主题
- * @param topicName - 主题名称
- * @param listener
- * @throws JMSException
- */
- void receiveTopic(String topicName, MessageListener listener) throws JMSException;
- }
3, 基于 ActiveMQ 的接口实现
- /**
- * 基于 activeMQ 的消息生产者 / 消费者实现 (初始化该对象时即初始化连接消息队列, 如果无法连接到消息队列, 立即抛出异常)
- *
- * @author eguid
- *
- */
- public class ActiveMQImpl implements MsgDistributeInterface {
- private String userName;
- private String password;
- private String brokerURL;
- private boolean persistentMode; // 持久化模式
- // 连接工厂
- ConnectionFactory connectionFactory;
- // 发送消息的线程
- Connection connection;
- // 事务管理
- Session session;
- // 存放各个线程订阅模式生产者
- ThreadLocal < MessageProducer > topicThreadLocal = new ThreadLocal < MessageProducer > ();
- // 存放各个线程队列模式生产者
- ThreadLocal < MessageProducer > queueThreadLocal = new ThreadLocal < MessageProducer > ();
- public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
- this(userName, password, brokerURL, true);
- }
- public ActiveMQImpl(String userName, String password, String brokerURL, boolean persistentMode) throws JMSException {
- this.userName = userName;
- this.password = password;
- this.brokerURL = brokerURL;
- this.persistentMode = persistentMode;
- init();
- }
- public void init() throws JMSException {
- try {
- // 创建一个链接工厂
- connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
- // 从工厂中创建一个链接
- connection = connectionFactory.createConnection();
- // 开启链接
- connection.start();
- // 创建一个事务 (订阅模式, 事务采用自动确认方式)
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- } catch(JMSException e) {
- throw e;
- }
- }@Override public boolean sendTopic(String topicName, byte[] data) {
- return sendTopic(topicName, data, 0, data.length);
- }@Override public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
- return send(true, topicName, data, offset, length);
- }@Override public boolean sendQueue(String queueName, byte[] data) {
- return sendQueue(queueName, data, 0, data.length);
- }@Override public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
- return send(false, queueName, data, offset, length);
- }
- /**
- * 发送数据
- *
- * @param name
- * @param data
- * @param offset
- * @param length
- * @param type
- * - 类型
- * @return
- */
- private boolean send(boolean type, String name, byte[] data, int offset, int length) {
- try {
- MessageProducer messageProducer = getMessageProducer(name, type);
- BytesMessage msg = createBytesMsg(data, offset, length);
- System.err.println(Thread.currentThread().getName() + "发送消息");
- // 发送消息
- messageProducer.send(msg);
- } catch(JMSException e) {
- return false;
- }
- return false;
- }
- public void receive(String topicName) throws JMSException {
- final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(topicName);
- MessageConsumer consumer = session.createConsumer(topic);
- consumer.setMessageListener(new MessageListener() {@Override public void onMessage(Message message) {
- BytesMessage msg = (BytesMessage) message;
- System.err.println(Thread.currentThread().getName() + "收到消息:" + msg.toString());
- }
- });
- }
- /**
- * 创建字节数组消息
- *
- * @param data
- * @param offset
- * @param length
- * @return
- * @throws JMSException
- */
- private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
- BytesMessage msg = session.createBytesMessage();
- msg.writeBytes(data, offset, length);
- return msg;
- }
- /**
- * 创建对象序列化消息
- * @param obj
- * @return
- * @throws JMSException
- */
- private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
- // MapMessage msg = session.createMapMessage();//key-value 形式的消息
- ObjectMessage msg = session.createObjectMessage(obj);
- return msg;
- }
- /**
- * 创建字符串消息
- * @param text
- * @return
- * @throws JMSException
- */
- private TextMessage createTextMsg(String text) throws JMSException {
- TextMessage msg = session.createTextMessage(text);
- return msg;
- }
- /**
- * 获取创建者
- *
- * @param name - 名称 (主题名称和队列名称)
- * @param type - 类型 (true:topic,false:queue)
- * @return
- * @throws JMSException
- */
- private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
- return type ? getTopicProducer(name) : getQueueProducer(name);
- }
- /**
- * 创建或获取队列
- * @param queueName
- * @return
- * @throws JMSException
- */
- private MessageProducer getQueueProducer(String queueName) throws JMSException {
- MessageProducer messageProducer = null;
- if ((messageProducer = queueThreadLocal.get()) == null) {
- Queue queue = session.createQueue(queueName);
- messageProducer = session.createProducer(queue);
- // 是否持久化 (1 - 不持久化 (如果没有消费者, 消息就也会自动失效),2 - 持久化 (如果没有消费者进行消费, 消息队列也会缓存消息等待消费者进行消费))
- messageProducer.setDeliveryMode(persistentMode ? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT);
- queueThreadLocal.set(messageProducer);
- }
- return messageProducer;
- }
- /**
- * 创建或获取主题
- * @param topicName
- * @return
- * @throws JMSException
- */
- private MessageProducer getTopicProducer(String topicName) throws JMSException {
- MessageProducer messageProducer = null;
- if ((messageProducer = topicThreadLocal.get()) == null) {
- Topic topic = session.createTopic(topicName);
- messageProducer = session.createProducer(topic);
- // 是否持久化 (1 - 不持久化 (如果没有消费者, 消息就也会自动失效),2 - 持久化 (如果没有消费者进行消费, 消息队列也会缓存消息等待消费者进行消费))
- messageProducer.setDeliveryMode(persistentMode ? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT);
- topicThreadLocal.set(messageProducer);
- }
- return messageProducer;
- }
- public String getPassword() {
- return password;
- }
- public void setPassword(String password) {
- this.password = password;
- }@Override public void receiveQueue(String queueName, MessageListener listener) throws JMSException {
- final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Queue topic = session.createQueue(queueName);
- MessageConsumer consumer = session.createConsumer(topic);
- consumer.setMessageListener(listener);
- }@Override public void receiveTopic(String topicName, MessageListener listener) throws JMSException {
- final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(topicName);
- MessageConsumer consumer = session.createConsumer(topic);
- consumer.setMessageListener(listener);
- }
4, 测试一下 Topic 和 Queue
- public static void main(String[] args) throws JMSException {
- // 如果创建失败会立即抛出异常
- MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
- Test testMq = new Test();
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- //Thread 1
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 2
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 3
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 4
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 5
- new Thread(testMq.new ProductorMq(producter)).start();
- //Thread 6
- new Thread(testMq.new ProductorMq(producter)).start();
- // 订阅接收线程 Thread 1
- new Thread(new Runnable() {@Override public void run() {
- try {
- producter.receiveTopic("eguid-topic", new MessageListener() {@Override public void onMessage(Message message) {
- BytesMessage msg = (BytesMessage) message;
- System.err.println(Thread.currentThread().getName() + "订阅主题消息:" + msg.toString());
- }
- });
- } catch(JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }).start();
- // 订阅接收线程 Thread 2
- new Thread(new Runnable() {@Override public void run() {
- try {
- producter.receiveTopic("eguid-topic", new MessageListener() {@Override public void onMessage(Message message) {
- BytesMessage msg = (BytesMessage) message;
- System.err.println(Thread.currentThread().getName() + "订阅主题消息:" + msg.toString());
- }
- });
- } catch(JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }).start();
- // 队列消息生产线程 Thread-1
- new Thread(testMq.new QueueProductor(producter)).start();
- // 队列消息生产线程 Thread-2
- new Thread(testMq.new QueueProductor(producter)).start();
- // 队列接收线程 Thread 1
- new Thread(new Runnable() {@Override public void run() {
- try {
- producter.receiveQueue("eguid-queue", new MessageListener() {@Override public void onMessage(Message message) {
- BytesMessage msg = (BytesMessage) message;
- System.err.println(Thread.currentThread().getName() + "收到队列消息:" + msg.toString());
- }
- });
- } catch(JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }).start();
- // 队列接收线程 Thread2
- new Thread(new Runnable() {@Override public void run() {
- try {
- producter.receiveQueue("eguid-queue", new MessageListener() {@Override public void onMessage(Message message) {
- BytesMessage msg = (BytesMessage) message;
- System.err.println(Thread.currentThread().getName() + "收到队列消息:" + msg.toString());
- }
- });
- } catch(JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }).start();
- }
- private class ProductorMq implements Runnable {
- Jtt809MsgProducter producter;
- public ProductorMq(Jtt809MsgProducter producter) {
- this.producter = producter;
- }@Override public void run() {
- while (true) {
- try {
- String wang = Thread.currentThread().getName() + "Hello eguid! This is topic.";
- producter.sendTopic("eguid-topic", wang.getBytes());
- Thread.sleep(2000);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private class QueueProductor implements Runnable {
- Jtt809MsgProducter producter;
- public QueueProductor(Jtt809MsgProducter producter) {
- this.producter = producter;
- }@Override public void run() {
- while (true) {
- try {
- String eguid = Thread.currentThread().getName() + "Hello eguid! This is queue.";
- producter.sendQueue("eguid-queue", eguid.getBytes());
- Thread.sleep(2000);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }-------------------End--------------------
来源: http://www.bubuko.com/infodetail-2771958.html