前言:
本篇介绍两种方式实现 activemq 的 queue 模式和 topic 模式 queue 模式是连接在一个目标队列的消费者共享消息, 即 n 消费者平分消息队列中的消息 topic 模式是每个消费者在订阅该 distation 后, 都可以获取 topic 中所有的消息
1.linux 环境下安装 ActiveMQ
到官网下载 linux 环境下的压缩包, 在 linux 任意目录下解压
切换到 bin 目录执行./activemq start
执行 ps -ef|grep activemq 查看 activemq 是否运行起来了
2.java 程序集成 activemq 实现 queue 和 topic 两种模式
maven 项目的 pom.xml
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-all</artifactId>
- <version>5.9.0</version>
- </dependency>
在使用 activemq 的时候一般就是获取 Connection, 启动 connection, 通过连接创建会话 session, 然后在穿件消费者 / 生产者
queue 模式下的生产者代码
- package com.yzz.jms.queen;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * Created by yzz on 2018/3/18.
- * mail:yzzstyle@163.com
- * 消息提供者
- */
- public class AppProducer {
- public static final String url = "tcp://192.168.1.2:61616";
- public static final String queueName = "queue-test";
- public static void main(String[] args) throws JMSException {
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
- // 创建连接
- Connection connection = connectionFactory.createConnection();
- // 启动连接
- connection.start();
- // 创建会话 (单一线程)
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 创建目标
- Destination destination = session.createQueue(queueName);
- // 创建生产者
- MessageProducer producer = session.createProducer(destination);
- // 发布消息
- for (int i = 0; i <100; i++) {
- TextMessage textMessage = session.createTextMessage("jms-test" + i);
- producer.send(textMessage);
- System.out.println(textMessage.getText()+"已发送");
- }
- // 关闭连接
- connection.close();
- }
- }
queue 模式下的消费者代码
- package com.yzz.jms.queen;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * Created by yzz on 2018/3/18.
- * mail:yzzstyle@163.com
- */
- public class AppConsumer {
- public static final String url = "tcp://192.168.1.2:61616";
- public static final String queueName = "queue-test";
- public static void main(String[] args) throws JMSException {
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
- // 创建连接
- Connection connection = connectionFactory.createConnection();
- // 启动连接
- connection.start();
- // 创建会话 (单一线程)
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 创建目标
- Destination destination = session.createQueue(queueName);
- // 创建消费者
- MessageConsumer consumer = session.createConsumer(destination);
- // 创建一个监听器
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- try {
- TextMessage textMessage = (TextMessage) message;
- System.out.println(textMessage.getText()+"i");
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
topic 模式下的生产者代码
- package com.yzz.jms.topic;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * Created by yzz on 2018/3/18.
- * mail:yzzstyle@163.com
- * 消息提供者
- */
- public class AppProducer {
- public static final String url = "tcp://192.168.1.2:61616";
- public static final String topicName = "topic-test";
- public static void main(String[] args) throws JMSException {
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
- // 创建连接
- Connection connection = connectionFactory.createConnection();
- // 启动连接
- connection.start();
- // 创建会话 (单一线程)
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 创建目标
- Destination destination = session.createTopic(topicName);
- // 创建生产者
- MessageProducer producer = session.createProducer(destination);
- // 发布消息
- for (int i = 0; i < 100; i++) {
- TextMessage textMessage = session.createTextMessage("jms-test" + i);
- producer.send(textMessage);
- System.out.println(textMessage.getText()+"已发送");
- }
- // 关闭连接
- connection.close();
- }
- }
-topic 模式下的消费者代码
- package com.yzz.jms.topic;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * Created by yzz on 2018/3/18.
- * mail:yzzstyle@163.com
- */
- public class AppConsumer {
- public static final String url = "tcp://192.168.1.2:61616";
- public static final String topicName = "topic-test";
- public static void main(String[] args) throws JMSException {
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
- // 创建连接
- Connection connection = connectionFactory.createConnection();
- // 启动连接
- connection.start();
- // 创建会话 (单一线程)
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 创建目标
- Destination destination = session.createTopic(topicName);
- // 创建消费者
- MessageConsumer consumer = session.createConsumer(destination);
- // 创建一个监听器
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- try {
- TextMessage textMessage = (TextMessage) message;
- System.out.println(textMessage.getText()+"i");
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
3.spring 集成 activemq 实现 queue 和 topic 两种模式
先引入 spring 所需的 jar 包
引入 spring-jms,activemq-core(去掉 spring-context)
queen 模式
新建 ProducerServiceImpl 类来负责发消息, 通过 JmsTemplate 来发送
- package com.yzz.jms.queue.producer.impl;
- import com.yzz.jms.queue.producer.IProducerService; import
- org.springframework.beans.factory.annotation.Autowired; import
- org.springframework.jms.core.JmsTemplate; import
- org.springframework.jms.core.MessageCreator;
- import javax.annotation.Resource; import javax.jms.Destination;
- import javax.jms.JMSException; import javax.jms.Message; import
- javax.jms.Session;
- /** * Created by yzz on 2018/3/18. * mail:yzzstyle@163.com */
- public class ProducerServiceImpl implements IProducerService{
- @Autowired
- JmsTemplate jmsTemplate;
- @Resource(name="queueDestination")
- Destination destination;
- public void sendMessage(final String message) {
- jmsTemplate.send(destination, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(message);
- }
- });
- } }
新建 spring 配置文件 common.xml,consumer.xml,producer.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
- <!-- 开启注解 -->
- <context:annotation-config/>
- <!--activemq 提供的连接工厂 -->
- <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="tcp://192.168.1.2:61616"></property>
- </bean>
- <!--spring jms 提供的连接池 -->
- <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
- <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
- </bean>
- <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
- <constructor-arg value="queue"/>
- </bean>
- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
- <constructor-arg value="topic"/>
- </bean>
- </beans>
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
- <import resource="common.xml"/>
- <!-- 消息监听者 -->
- <bean id="consumerMessageListener" class="com.yzz.jms.queue.consumer.ConsumerMessageListener"/>
- <!-- 消息监听者容器 -->
- <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <!-- 连接工厂 -->
- <property name="connectionFactory" ref="connectionFactory"/>
- <!-- 目的地 -->
- <property name="destination" ref="queueDestination"/>
- <!-- 消息监听类 -->
- <property name="messageListener" ref="consumerMessageListener"/>
- </bean>
- </beans>
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
- <import resource="common.xml"/>
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="connectionFactory"></property>
- </bean>
- <bean class="com.yzz.jms.queue.producer.impl.ProducerServiceImpl"/>
- </beans>
生产者 main 方法
- package com.yzz.jms.topic.producer.impl;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- /**
- * Created by yzz on 2018/3/18.
- * mail:yzzstyle@163.com
- */
- public class AppProducerLaunch {
- public static void main(String[] args){
- ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("topicProducer.xml");
- ProducerServiceImpl producerService = context.getBean(ProducerServiceImpl.class);
- for (int i = 0; i <100 ; i++) {
- producerService.sendMessage("哈哈"+i);
- System.out.println("============"+i);
- }
- context.close();
- }
- }
消费者 main 方法
- public class AppConsumerLaunch {
- public static void main(String[] args){
- ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("topicConsumer.xml");
- }
- }
topic 模式下的消费者代码
在 common.xml 添加 topic 目的地的 bean, 构造参数是 topic,class 是 org.apache.activemq.command.ActiveMQTopic
将生产者的 desition 换成 topic 的 desition
将消费者的 listenerContainer 的 desition 换成 topic 的 desition
来源: http://blog.csdn.net/qq_22271479/article/details/79604986