本文主要讲述 ActiveMQ 与 spring 整合的方案.介绍知识点包括 spring,jms,activemq 基于配置文件模式管理消息,消息监听器类型,消息转换类介绍,spring 对 JMS 事物管理.
1.spring 整合 activemq 配置文件说明
1.1 配置 ConnectionFactory
ConnectionFactory 是用于产生到 JMS 服务器的链接的,Spring 提供了多个 ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory.SingleConnectionFactory 对于建立 JMS 服务器链接的请求会一直返回同一个链接,并且会忽略 Connection 的 close 方法调用.CachingConnectionFactory 继承了 SingleConnectionFactory,所以它拥有 SingleConnectionFactory 的所有功能,同时它还新增了缓存功能,它可以缓存 Session,MessageProducer 和 MessageConsumer.这里使用 SingleConnectionFactory 来作为示例.
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
Spring 提供的 ConnectionFactory 只是 Spring 用于管理 ConnectionFactory 的,真正产生到 JMS 服务器链接的 ConnectionFactory 还得是由 JMS 服务厂商提供,并且需要把它注入到 Spring 提供的 ConnectionFactory 中.这里使用的是 ActiveMQ 实现的 JMS,所以在这里真正的可以产生 Connection 的就应该是由 ActiveMQ 提供的 ConnectionFactory. ActiveMQ 提供了一个 PooledConnectionFactory,通过往里面注入一个 ActiveMQConnectionFactory 可以用来将 Connection,Session 和 MessageProducer 池化,这样可以大大的减少资源消耗.当使用 PooledConnectionFactory 时,在定义一个 ConnectionFactory 时应该是如下定义:
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<!-- Spring Caching连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.3.3:61616"
userName="admin" password="admin" />
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
1.2 配置生产者
<property name="sessionCacheSize" value="100" />
</bean>
配置好 ConnectionFactory 之后就需要配置生产者.生产者负责产生消息并发送到 JMS 服务器,这通常对应的是一个业务逻辑服务实现类.但是服务实现类是怎么进行消息的发送的呢?这通常是利用 Spring 提供的 JmsTemplate 类来实现的,所以配置生产者其实最核心的就是配置进行消息发送的 JmsTemplate.对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,在定义 JmsTemplate 的时候需要往里面注入一个 Spring 提供的 ConnectionFactory 对象.
<!-- Spring提供的JMS工具类,它可以进行消息发送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
在真正利用 JmsTemplate 进行消息发送的时候,需要知道消息发送的目的地,即 destination.在 Jms 中有一个用来表示目的地的 Destination 接口,它里面没有任何方法定义,只是用来做一个标识而已.当在使用 JmsTemplate 进行消息发送时没有指定 destination 的时候将使用默认的 Destination.默认 Destination 可以通过在定义 jmsTemplate bean 对象时通过属性 defaultDestination 或 defaultDestinationName 来进行注入,defaultDestinationName 对应的就是一个普通字符串.在 ActiveMQ 中实现了两种类型的 Destination,一个是点对点的 ActiveMQQueue,另一个就是支持订阅 / 发布模式的 ActiveMQTopic.在定义这两种类型的 Destination 时我们都可以通过一个 name 属性来进行构造,如:
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的-->
<!--这个是主题目的地,一对多的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
1.3 配置消费者
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
生产者往指定目的地 Destination 发送消息后,接下来就是消费者对指定目的地的消息进行消费了.那么消费者是如何知道有生产者发送消息到指定目的地 Destination 了呢?这是通过 Spring 封装的消息监听容器 MessageListenerContainer 实现的,它负责接收信息,并把接收到的信息分发给真正的 MessageListener 进行处理.每个消费者对应每个目的地都需要有对应的 MessageListenerContainer.对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个 JMS 服务器,这是通过在配置 MessageConnectionFactory 的时候往里面注入一个 ConnectionFactory 来实现的.所以在配置一个 MessageListenerContainer 的时候有三个属性必须指定:
一个是表示从哪里监听的 ConnectionFactory;一个是表示监听什么的 Destination;一个是接收到消息以后进行消息处理的 MessageListener.
Spring 一共提供了多种类型 MessageListenerContainer,SimpleMessageListenerContainer 和 DefaultMessageListenerContainer.SimpleMessageListenerContainer 会在一开始的时候就创建一个会话 session 和消费者 Consumer,并且会使用标准的 JMS MessageConsumer.setMessageListener() 方法注册监听器让 JMS 提供者调用监听器的回调函数.它不会动态的适应运行时需要和参与外部的事务管理.兼容性方面,它非常接近于独立的 JMS 规范,但一般不兼容 Java EE 的 JMS 限制.大多数情况下使用的 DefaultMessageListenerContainer,跟 SimpleMessageListenerContainer 相比,DefaultMessageListenerContainer 会动态的适应运行时需要,并且能够参与外部的事务管理.它很好的平衡了对 JMS 提供者要求低,先进功能如事务参与和兼容 Java EE 环境.
<!--这个是队列目的地-->
<!-- 消息监听器 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
<!-- 消息监听容器 -->
1.4 定义处理消息的 MessageListener
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
</bean>
要定义处理消息的 MessageListener 只需要实现 JMS 规范中的 MessageListener 接口就可以了.MessageListener 接口中只有一个方法 onMessage 方法,当接收到消息的时候会自动调用该方法.
至此生成者和消费者都配置完成了,这也就意味着 spring 整合 ActiveMQ 已经完成了.整个 ActiveMQ.xml 文件配置如下:
View Code
1.5 实例分析
编写一个 sessionAwareQueue 目的队列,向改队列发送消息,接受消息成功后,并回复一条消息.监控消息函数为:ConsumerSessionAwareMessageListener.
xml 配置文件
<!-- Spring提供的JMS工具类,它可以进行消息发送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
<!-- 消息转换器 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- <property name="messageConverter" ref="emailMessageConverter"/>
-->
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.210.128:61616" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<!--这个是队列目的地-->
<property name="targetConnectionFactory" ref="targetConnectionFactory"
/>
</bean>
<!--这个是sessionAwareQueue目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>
queue
</value>
</constructor-arg>
</bean>
<!-- 消息监听器 -->
<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>
sessionAwareQueue
</value>
</constructor-arg>
</bean>
<!-- 可以获取session的MessageListener -->
<bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener">
<property name="messageConverter" ref="emailMessageConverter" />
</bean>
<!-- 消息监听容器 -->
<bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">
<property name="destination" ref="queueDestination" />
</bean>
消息生产者定义发送消息方法
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
</bean>
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="sessionAwareQueue" />
<property name="messageListener" ref="consumerSessionAwareMessageListener"
/>
</bean>
</beans>
消费者定义接收消息方法
@Component
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
private Destination responseDestination;
public void sendMessage(Destination destination, final String message) {
System.out.println("---------------生产者发送消息-----------------");
System.out.println("---------------生产者发了一个消息:" + message);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
public void sendMessage(final Destination destination, final Serializable obj) {
jmsTemplate.convertAndSend(destination, obj);
}
}
测试发送消息
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener < TextMessage > {
private Destination destination;
public void onMessage(TextMessage message, Session session) throws JMSException {
System.out.println("收到一条消息");
System.out.println("消息内容是:" + message.getText());
MessageProducer producer = session.createProducer(destination);
System.out.println("发送一条回复消息");
Message textMessage = session.createTextMessage("回复消息内容:ConsumerSessionAwareMessageListener...");
producer.send(textMessage);
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
示例运行截图:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
@Autowired
private ProducerService producerService;
@Autowired
@Qualifier("queueDestination")
private Destination destination;
@Autowired
@Qualifier("sessionAwareQueue")
private Destination sessionAwareQueue;
@Test
public void testSessionAwareMessageListener() {
producerService.sendMessage(sessionAwareQueue, "测试SessionAwareMessageListener");
}
}
2. 消息监听器 MessageListener 介绍
在 Spring 整合 JMS 的应用中,在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是 MessageListener,SessionAwareMessageListener 和 MessageListenerAdapter.下面就分别来介绍一下这几种类型的区别.
2.1 消息监听器 MessageListener
MessageListener 是最原始的消息监听器,它是 JMS 规范中定义的一个接口.其中定义了一个用于处理接收到的消息的 onMessage 方法,该方法只接收一个 Message 参数.示例代码如下:
2.2 消息监听器 SessionAwareMessageListener
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
TextMessage textMsg = (TextMessage) message;
System.out.println("接收到一个纯文本消息.");
try {
System.out.println("消息内容是:" + textMsg.getText());
} catch(JMSException e) {
e.printStackTrace();
}
}
}
SessionAwareMessageListener 是 Spring 为提供的,它不是标准的 JMS MessageListener.MessageListener 的设计只是纯粹用来接收消息的,假如在使用 MessageListener 处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,这个时候就需要在代码里面去重新获取一个 Connection 或 Session.SessionAwareMessageListener 的设计就是为了方便在接收到消息后发送一个回复的消息,它同样提供了一个处理接收到的消息的 onMessage 方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息 Message,另一个就是可以用来发送消息的 Session 对象.
2.3MessageListenerAdapter
MessageListenerAdapter 类实现了 MessageListener 接口和 SessionAwareMessageListener 接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的 Java 类进行处理.MessageListenerAdapter 会把接收到的消息做如下转换:
TextMessage 转换为 String 对象;
BytesMessage 转换为 byte 数组;
MapMessage 转换为 Map 对象;
ObjectMessage 转换为对应的 Serializable 对象.
在 xml 配置文件中可定义普通的 java 处理类,样例代码如下:
<!-- 消息监听适配器 -->
<!-- 消息监听适配器 方案2 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.tiantian.springintejms.listener.ConsumerListener" />
</constructor-arg>
</bean>
3. 消息转换器 MessageConverter 介绍
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate">
<bean class="com.tiantian.springintejms.listener.ConsumerListener" />
</property>
<property name="defaultListenerMethod" value="receiveMessage" />
</bean>
MessageConverter 的作用主要有两方面,一方面它可以把非标准化 Message 对象转换成目标 Message 对象,这主要是用在发送消息的时候;另一方面它又可以把的 Message 对象转换成对应的目标对象,这主要是用在接收消息的时候.MessageConverter 可用 spring 提供的简单模型或者自己编写转换定义类.Spring 在初始化 JmsTemplate 的时候指定了其对应的 MessageConverter 为一个 SimpleMessageConverter,所以如果平常没有什么特殊要求的时候可以直接使用 JmsTemplate 的 convertAndSend 系列方法进行消息发送,而不必繁琐的在调用 send 方法时自己 new 一个 MessageCreator 进行相应 Message 的创建.可在源码中查看 SimpleMessageConverter 的定义,如果觉得它不能满足业务的要求,那可以对它里面的部分方法进行重写,或者是完全实现自定义的 MessageConverter.
示例代码
4.spring 对 JMS 的事务管理
//方案1:调用对象转换函数发送对象
TestMqBean bean = new TestMqBean();
bean.setAge(13);
for (int i = 0; i < 10; i++) {
bean.setName("send to data -" + i);
producer.send(session.createObjectMessage(bean));
}
//方案2:调用JMSconvertAndSend方法发送对象
public void sendMessage(final Destination destination, final Serializable obj) {
jmsTemplate.convertAndSend(destination, obj);
}
//接收消息时,调用message.getObject()获取发送的消息对象
public void receiveMessage(ObjectMessage message) throws JMSException {
System.out.println(message.getObject());
}
Spring 提供了一个 JmsTransactionManager 用于对 JMS ConnectionFactory 做事务管理.这将允许 JMS 应用利用 Spring 的事务管理特性.JmsTransactionManager 在执行本地资源事务管理时将从指定的 ConnectionFactory 绑定一个 ConnectionFactory/Session 这样的配对到线程中.JmsTemplate 会自动检测这样的事务资源,并对它们进行相应操作.在 Java EE 环境中,ConnectionFactory 会池化 Connection 和 Session,这样这些资源将会在整个事务中被有效地重复利用.在一个独立的环境中,使用 Spring 的 SingleConnectionFactory 时所有的事务将公用一个 Connection,但是每个事务将保留自己独立的 Session.JmsTemplate 可以利用 JtaTransactionManager 和能够进行分布式的 JMS ConnectionFactory 处理分布式事务. 在 Spring 整合 JMS 的应用中,如果要进行本地的事务管理的话只需要在定义对应的消息监听容器时指定其 sessionTransacted 属性为 true,如:
该属性值默认为 false,这样 JMS 在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时 JMS 就会对接收到的消息进行回滚,对于 SessionAwareMessageListener 在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制. 如果想接收消息和数据库访问处于同一事务中,可配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如 DefaultMessageListenerContainer).要配置这样一个参与分布式事务管理的消息监听容器,可以配置一个 JtaTransactionManager,当然底层的 JMS ConnectionFactory 需要能够支持分布式事务管理,并正确地注册 JtaTransactionManager.这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作.配置示例文件如下:
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
<property name="sessionTransacted" value="true"/>
</bean>
5. 参考网址
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource" />
</bean>
<jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource" />
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"
/>
<tx:annotation-driven transaction-manager="jtaTransactionManager" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
<property name="transactionManager" ref="jtaTransactionManager" />
</bean>
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"
/>
Spring 整合 JMS(四)——事务管理 Spring 整合 JMS(二)——三种消息监听器 Spring 整合 JMS(一)——基于 ActiveMQ 实现
5. 源码下载
在 Git 上面下载: https://github.com/wuya11/SpringinteJMSonActiveMQ
来源: https://www.cnblogs.com/wlandwl/p/activemqtwo.html