首先和大家一起回顾一下 Java 消息服务,在我之前的博客中,我为大家分析了:
然后在另一篇博客中,和大家一起从 0 到 1 的开启了一个 ActiveMq 的项目,在项目开发的过程中,我们对 ActiveMq 有了一定的了解:
在接下来的这篇博客中,我会和大家一起来整合 Spring 和 ActiveMq,这篇博文, 我们基于 Spring+JMS+ActiveMQ+Tomcat,实现了 Point-To-Point 的异步队列消息和 PUB/SUB(发布 / 订阅)模型,简单实例,不包含任何业务。
IDE 选择了 IDEA(建议大家使用),为了避免下载 jar 的各种麻烦,底层使用 maven 搭建了一个项目,整合了 Spring 和 ActiveMq
View Code
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>
- 4.0.0
- </modelVersion>
- <groupId>
- Crawl-Page
- </groupId>
- <artifactId>
- Crawl-Page
- </artifactId>
- <packaging>
- war
- </packaging>
- <version>
- 1.0-SNAPSHOT
- </version>
- <name>
- Crawl-Page Maven Webapp
- </name>
- <url>
- http://maven.apache.org
- </url>
- <!-- 版本管理 -->
- <properties>
- <springframework>
- 4.1.8.RELEASE
- </springframework>
- </properties>
- <dependencies>
- <dependency>
- <groupId>
- junit
- </groupId>
- <artifactId>
- junit
- </artifactId>
- <version>
- 4.10
- </version>
- <scope>
- test
- </scope>
- </dependency>
- <!-- JSP相关 -->
- <dependency>
- <groupId>
- jstl
- </groupId>
- <artifactId>
- jstl
- </artifactId>
- <version>
- 1.2
- </version>
- </dependency>
- <dependency>
- <groupId>
- javax.servlet
- </groupId>
- <artifactId>
- servlet-api
- </artifactId>
- <scope>
- provided
- </scope>
- <version>
- 2.5
- </version>
- </dependency>
- <!-- spring -->
- <dependency>
- <groupId>
- org.springframework
- </groupId>
- <artifactId>
- spring-core
- </artifactId>
- <version>
- ${springframework}
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.springframework
- </groupId>
- <artifactId>
- spring-context
- </artifactId>
- <version>
- ${springframework}
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.springframework
- </groupId>
- <artifactId>
- spring-tx
- </artifactId>
- <version>
- ${springframework}
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.springframework
- </groupId>
- <artifactId>
- spring-webmvc
- </artifactId>
- <version>
- ${springframework}
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.springframework
- </groupId>
- <artifactId>
- spring-jms
- </artifactId>
- <version>
- ${springframework}
- </version>
- </dependency>
- <!-- xbean 如<amq:connectionFactory />
- -->
- <dependency>
- <groupId>
- org.apache.xbean
- </groupId>
- <artifactId>
- xbean-spring
- </artifactId>
- <version>
- 3.16
- </version>
- </dependency>
- <!-- activemq -->
- <dependency>
- <groupId>
- org.apache.activemq
- </groupId>
- <artifactId>
- activemq-core
- </artifactId>
- <version>
- 5.7.0
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.apache.activemq
- </groupId>
- <artifactId>
- activemq-pool
- </artifactId>
- <version>
- 5.12.1
- </version>
- </dependency>
- <!-- 自用jar包,可以忽略-->
- <dependency>
- <groupId>
- commons-httpclient
- </groupId>
- <artifactId>
- commons-httpclient
- </artifactId>
- <version>
- 3.1
- </version>
- </dependency>
- </dependencies>
- <build>
- <finalName>
- Crawl-Page
- </finalName>
- <plugins>
- <plugin>
- <groupId>
- org.apache.tomcat.maven
- </groupId>
- <artifactId>
- tomcat7-maven-plugin
- </artifactId>
- <configuration>
- <port>
- 8080
- </port>
- <path>
- /
- </path>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
因为这里 pom.xml 文件有点长,就不展开了。
我们可以看到其实依赖也就几个,1、Spring 核心依赖 2、ActiveMq core 和 pool(这里如果同学们选择导入 jar,可以直接导入我们上一篇博客中说道的那个 activemq-all 这个 jar 包)3、java servlet 相关依赖
这里面我们选择的 ActiveMq pool 的依赖版本会和之后的 dtd 有关系,需要版本对应,所以同学们等下配置 activemq 文件的时候,需要注意 dtd 版本选择
web.xml 也大同小异,指定 Spring 配置文件,springMvc 命名,编码格式
- <?xml version="1.0" encoding="UTF-8" ?>
- <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
- http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0">
- <display-name>
- Archetype Created Web Application
- </display-name>
- <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
- <context-param>
- <param-name>
- contextConfigLocation
- </param-name>
- <param-value>
- classpath:applicationContext*.xml;
- </param-value>
- </context-param>
- <listener>
- <listener-class>
- org.springframework.web.context.ContextLoaderListener
- </listener-class>
- </listener>
- <servlet>
- <servlet-name>
- springMVC
- </servlet-name>
- <servlet-class>
- org.springframework.web.servlet.DispatcherServlet
- </servlet-class>
- <init-param>
- <param-name>
- contextConfigLocation
- </param-name>
- <param-value>
- classpath:spring-mvc.xml
- </param-value>
- </init-param>
- <load-on-startup>
- 1
- </load-on-startup>
- </servlet>
- <servlet-mapping>
- <servlet-name>
- springMVC
- </servlet-name>
- <url-pattern>
- /
- </url-pattern>
- </servlet-mapping>
- <!-- 处理编码格式 -->
- <filter>
- <filter-name>
- characterEncodingFilter
- </filter-name>
- <filter-class>
- org.springframework.web.filter.CharacterEncodingFilter
- </filter-class>
- <init-param>
- <param-name>
- encoding
- </param-name>
- <param-value>
- UTF-8
- </param-value>
- </init-param>
- <init-param>
- <param-name>
- forceEncoding
- </param-name>
- <param-value>
- true
- </param-value>
- </init-param>
- </filter>
- <filter-mapping>
- <filter-name>
- characterEncodingFilter
- </filter-name>
- <url-pattern>
- /*
- </url-pattern>
- </filter-mapping>
- </web-app>
这里面的 SpringMVC 没什么特别,有需要的同学可以参考一下:
View Code
- <?xml version="1.0" encoding="UTF-8"?>
- <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:mvc="http://www.springframework.org/schema/mvc"
- xmlns:tx="http://www.springframework.org/schema/tx"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/aop
- http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-4.0.xsd
- http://www.springframework.org/schema/mvc
- http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
- http://www.springframework.org/schema/tx
- http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
- <!-- 启用MVC注解 -->
- <mvc:annotation-driven />
- <!-- 指定Sping组件扫描的基本包路径 -->
- <context:component-scan base-package="com.Jayce" >
- <!-- 这里只扫描Controller,不可重复加载Service -->
- <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
- </context:component-scan>
- <!-- JSP视图解析器-->
- <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
- <property name="prefix" value="/WEB-INF/views/" />
- <property name="suffix" value=".jsp" />
- <!-- 定义其解析视图的order顺序为1 -->
- <property name="order" value="1" />
- </bean>
- </beans>
applicationContext.xml 主要使用来装载 Bean,我们项目中并没有什么特别的 Java Bean,因此只用来指出包扫描路径:
View Code
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:jms="http://www.springframework.org/schema/jms"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:mvc="http://www.springframework.org/schema/mvc"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-4.1.xsd
- http://www.springframework.org/schema/mvc
- http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
- http://www.springframework.org/schema/jms
- http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
- http://activemq.apache.org/schema/core
- http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd">
- <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
- <!-- 配置扫描路径 -->
- <context:component-scan base-package="com.Jayce">
- <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
- <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
- </context:component-scan>
- </beans>
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:jms="http://www.springframework.org/schema/jms"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:mvc="http://www.springframework.org/schema/mvc"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-4.1.xsd
- http://www.springframework.org/schema/mvc
- http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
- http://www.springframework.org/schema/jms
- http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
- http://activemq.apache.org/schema/core
- http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
- >
- <context:component-scan base-package="com.Jayce" />
- <mvc:annotation-driven />
- <amq:connectionFactory id="amqConnectionFactory"
- brokerURL="tcp://192.168.148.128:61616"
- userName="admin"
- password="admin" />
- <!-- 配置JMS连接工长 -->
- <bean id="connectionFactory"
- class="org.springframework.jms.connection.CachingConnectionFactory">
- <constructor-arg ref="amqConnectionFactory" />
- <property name="sessionCacheSize" value="100" />
- </bean>
- <!-- 定义消息队列(Queue) -->
- <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
- <!-- 设置消息队列的名字 -->
- <constructor-arg>
- <value>Jaycekon</value>
- </constructor-arg>
- </bean>
- <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="defaultDestination" ref="demoQueueDestination" />
- <property name="receiveTimeout" value="10000" />
- <!-- true是topic,false是queue,默认是false,此处显示写出false -->
- <property name="pubSubDomain" value="false" />
- </bean>
- <!-- 配置消息队列监听者(Queue) -->
- <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />
- <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
- <bean id="queueListenerContainer"
- class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="demoQueueDestination" />
- <property name="messageListener" ref="queueMessageListener" />
- </bean>
- </beans>
这里和大家讲解一下这个配置文件,如果大家能够从上述配置文件中看懂,可以跳过。同学们也可以在中的查看。
1、ActiveMq 中的 DTD,我们在声明相关配置之前,我们需要先导入 ActiveMq 中的 DTD,不然 Spring 并不理解我们的标签是什么意思。
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd
我们在 pom.xml 文件中有配置了 activemq 的版本依赖我们这里的版本,需要和依赖的版本一样,不然是找不到相关的 dtd
2、amq:connectionFactory:很直白的一个配置项,用于配置我们链接工厂的地址和用户名密码,这里需要注意的是选择 tcp 连接而不是 http 连接
3、jmsTemplate:比较重要的一个配置,这里指定了连接工厂,默认消息发送目的地,还有连接时长,发布消息的方式
- package com.Jayce.Service;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.core.MessageCreator;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.Session;
- /**
- * Created by Administrator on 2017/1/5.
- */
- @Service
- public class ProducerService {
- @Resource(name="jmsTemplate")
- private JmsTemplate jmsTemplate;
- public void sendMessage(Destination destination,final String msg){
- System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
- jmsTemplate.send(destination, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(msg);
- }
- });
- }
- public void sendMessage(final String msg){
- String destination = jmsTemplate.getDefaultDestinationName();
- System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
- jmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(msg);
- }
- });
- }
- }
将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用 ProducerService 实例中的 sendMessage 方法就可以向默认目的发送一个消息。
这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。
有兴趣的同学可以和我上一篇文章中 ActiveMq 发送消息的方式对比一下,可以发现一些不同。
- package com.Jayce.Service;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.TextMessage;
- /**
- * Created by Administrator on 2017/1/5.
- */
- @Service
- public class ConsumerService {
- @Resource(name="jmsTemplate")
- private JmsTemplate jmsTemplate;
- public TextMessage receive(Destination destination){
- TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
- try{
- System.out.println("从队列" + destination.toString() + "收到了消息:\t"
- + textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- return textMessage;
- }
- }
因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用 jmsTemplate 中的 receive 方法,就可以从里面获取到一条消息。
再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样 ActiveMQ 中才会确定这条消息已经被正确读取了。而整合了 Spring 之后,事务将由 Spring 来管理。
- package com.Jayce.Controller;
- import com.Jayce.Service.ConsumerService;
- import com.Jayce.Service.ProducerService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.ResponseBody;
- import javax.annotation.Resource;
- import javax.jms.Destination;
- import javax.jms.TextMessage;
- /**
- * Created by Administrator on 2017/1/5.
- */
- @Controller
- public class MessageController {
- private Logger logger = LoggerFactory.getLogger(MessageController.class);
- @Resource(name = "demoQueueDestination")
- private Destination destination;
- //队列消息生产者
- @Resource(name = "producerService")
- private ProducerService producer;
- //队列消息消费者
- @Resource(name = "consumerService")
- private ConsumerService consumer;
- @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
- @ResponseBody
- public void send(String msg) {
- logger.info(Thread.currentThread().getName()+"------------send to jms Start");
- producer.sendMessage(msg);
- logger.info(Thread.currentThread().getName()+"------------send to jms End");
- }
- @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
- @ResponseBody
- public Object receive(){
- logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
- TextMessage tm = consumer.receive(destination);
- logger.info(Thread.currentThread().getName()+"------------receive from jms End");
- return tm;
- }
- }
控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。
现在服务层和控制层都好了,接下来我们就进行一个简单的测试
先确定你的 ActiveMQ 服务已经开启。
项目使用了 Tomcat 插件,避免了本地再下载 Tomcat 的麻烦,有需要的同学可以使用一下。
- <plugins>
- <plugin>
- <groupId>
- org.apache.tomcat.maven
- </groupId>
- <artifactId>
- tomcat7-maven-plugin
- </artifactId>
- <configuration>
- <port>
- 8080
- </port>
- <path>
- /
- </path>
- </configuration>
- </plugin>
- </plugins>
这里用了 Chrome 的一个插件 PostMan 有兴趣的同学可以了解一下,在 Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!
我们发送了一个 post 请求之后,看一下服务器的效果:
我们可以看到,已经向队列发送了一条消息。我们看一下 ActiveMq 现在的状态:
我们可以看到,一条消息已经成功发送到了 ActiveMq 中。
使用 get 请求访问服务器后台:
服务的输出:
ActiveMq 服务器状态:
我们可以看到,消费者已经消费了一条信息,并且没有断开与 ActiveMq 之间的链接。
在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到 ActiveMq 了,可以用一个 Redis 就足够了。
不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。
在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。
- <!-- 配置消息队列监听者(Queue) -->
- <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener"
- />
- <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器
- -->
- <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="demoQueueDestination" />
- <property name="messageListener" ref="queueMessageListener" />
- </bean>
我们需要创建一个类实现 MessageListener 接口:
- package com.Jayce.Filter;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
- /**
- * Created by Administrator on 2017/1/5.
- */
- public class QueueMessageListener implements MessageListener {
- public void onMessage(Message message) {
- TextMessage tm = (TextMessage) message;
- try {
- System.out.println("QueueMessageListener监听到了文本消息:\t"
- + tm.getText());
- //do something ...
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
实现接口的 onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者 - 中间件 - 消费者,这样一个解耦的操作了。
和上面一样,使用 postMan 发送 post 请求,我们可以看到控制台里面,消息马上就能打印出来:
再看看 ActiveMQ 服务器的状态:
我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。
这样子一整个项目下来,我们已经成功的整合了 Spring 和 ActiveMQ。
这里其实也算不上什么压力测试,在配置 pom.xml 文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用 httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:
- package com.Jaycekon.test;
- import org.apache.commons.httpclient.HttpClient;
- import org.apache.commons.httpclient.methods.PostMethod;
- import org.junit.Test;
- import java.io.IOException;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Created by Administrator on 2017/1/5.
- */
- public class Client {
- @Test
- public void test() {
- HttpClient httpClient = new HttpClient();
- new Thread(new Sender(httpClient)).start();
- }
- }
- class Sender implements Runnable {
- public static AtomicInteger count = new AtomicInteger(0);
- HttpClient httpClient;
- public Sender(HttpClient client) {
- httpClient = client;
- }
- public void run() {
- try {
- System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
- PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
- post.addParameter("msg", "Hello world!");
- httpClient.executeMethod(post);
- System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
这里面用了 HttpClient 来向服务器发送 Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。
github:
有兴趣的同学可以下载下来,该项目后续会继续整合 Redis 以及其他一些架构相关的技术
来源: