1.JMS 简介
JMS 的全称是 Java Message Service,即 Java 消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中可以在特定的时候利用生产者生成消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布 / 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收.
JMS 编程模型
(1) ConnectionFactory
创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和 TopicConnectionFactory 两种.可以通过 JNDI 来查找 ConnectionFactory 对象.
(2) Destination
Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源.对于消息生产者来说,它的 Destination 是某个队列(Queue)或某个主题(Topic); 对于消息消费者来说,它的 Destination 也是某个队列或主题(即消息来源).所以,Destination 实际上就是两种类型的对象:Queue,Topic 可以通过 JNDI 来查找 Destination.
(3) Connection
Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装).Connection 可以产生一个或多个 Session.跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnection 和 TopicConnection.
(4) Session
Session 是操作消息的接口.可以通过 session 创建生产者,消费者,消息等.Session 提供了事务的功能.当需要使用 session 发送 / 接收多个消息时,可以将这些发送 / 接收动作放到一个事务中.同样,也分 QueueSession 和 TopicSession.
(5) 消息的生产者
消息生产者由 Session 创建,并用于将消息发送到 Destination.同样,消息生产者分两种类型:QueueSender 和 TopicPublisher.可以调用消息生产者的方法(send 或 publish 方法)发送消息.
(6) 消息消费者
消息消费者由 Session 创建,用于接收被发送到 Destination 的消息.两种类型:QueueReceiver 和 TopicSubscriber.可分别通过 session 的 createReceiver(Queue) 或 createSubscriber(Topic) 来创建.当然,也可以 session 的 creatDurableSubscriber 方法来创建持久化的订阅者.
(7) MessageListener
消息监听器.如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法.EJB 中的 MDB(Message-Driven Bean)就是一种 MessageListener.
2.ActiveMQ 简介
ActiveMQ 是 Apache 软件基金下的一个开源软件,它遵循 JMS 规范(Java Message Service),是消息驱动中间件软件(MOM).它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障.ActiveMQ 使用 Apache 许可协议.因此,任何人都可以使用和修改它而不必反馈任何改变.这对于商业上将 ActiveMQ 用在重要用途的人尤为关键.MOM 的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者.所以高可用,高性能,高可扩展性尤为关键.
ActiveMQ 特性
⒈支持多种语言客户端, 如: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议有 OpenWire,Stomp REST,WS Notification,XMPP,AMQP.
⒉ 完全支持 JMS1.1 和 J2EE1.4 规范, 它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等.依附于 JMS 规范意味着,不论 JMS 消息提供者是谁,同样的基本特性(持久化,XA 消息,事务) 都是有效的.
⒊ 对 Spring 的支持,ActiveMQ 可以很容易内嵌到使用 Spring 的系统里面去.
⒋ 通过了常见 J2EE 服务器(如 Geronimo,JBoss 4,GlassFish,webLogic) 的测试,其中通过 JCA 1.5 resource adaptors 的配置,可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 商业服务器上.
⒌ ActiveMQ 提供各种连接选择,包括 HTTP,HTTPS,IP 多点传送,SSL,STOMP,TCP,UDP,XMPP 等.大量的连接协议支持使之具有更好的灵活性.很多现有的系统使用一种特定协议并且不能改变,所以一个支持多种协议的消息平台降低了使用的门槛.虽然连接很重要,但是和其他容器集成也同样重要.
6.ActiveMQ 提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权.例如,ActiveMQ 通过 KahaDB 提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持标准的 JDBC 方案.ActiveMQ 可以通过配置文件提供简单的验证和授权,也提供标准的 JAAS 登陆模块.
7.ActiveMQ 是为开发者设计的.它并不需要专门的管理工具,因为它提供各种易用且强大的管理特性.有很多方法去监控 ActiveMQ 的各个方面,可以通过 JMX 使用 JConsole 或 ActiveMQ web console;可以运行 ActiveMQ 消息报告;可以用命令行脚本;可以通过日志.
8. 代理器集群(Broker clustering)---- 为了利于扩展,多个 ActiveMQ broker 能够联合工作.这个方式就是 network of brokers 并且能支持多种拓扑结构; 支持客户端 - 服务器,点对点.
9. 支持 Ajax, 支持与 Axis 的整合
ActiveMQ 优势
1. 与 OpenJMS,JbossMQ 等开源 jms provider 相比,ActiveMQ 有 Apache 的支持,持续发展的优势明显.
2. 消息处理速度很快
3. 提高系统资源的利用率,主要是任务的派发不是 24 小时平均的,而是高峰时期任务量很多,比如 1 秒 1000 多个,有的时候很低,比如十几秒钟才来一个.应用服务通过 JMS 队列一个一个的取任务,做完一个再领一个,使系统资源的运用趋于平均.比如 ActiveMQ 在赛扬(2.40GHz)机器上能够达到 2000/s,消息大小为 1-2k.好一些的服务器可以达到 2 万以上 / 秒.
3.ActiveMQ 安装
ActiveMQ 在 linux 服务上安装操作如下:
1. 在官网下载 activemq 安装文件.地址: http://activemq.apache.org/download.html
2. 上传下载的 tar.gz 安装文件到 linux 服务器上,并解压到指定目录:如 tar -xf apache-activemq-5.15.2-bin.tar.gz
3. 运行 activemq,进入到解压的 apache-activemq-5.15.2/bin 目录,执行命令:activemq start
4. 开放端口 8161,61616,保证端口可访问.
运行 activemq 截图如下:
本机访问启动成功的 activemq 截图如下:
4.ActiveMQ 类别及开发流程
1),Point-to-Point (点对点) 消息模式开发流程
1,生产者(producer)开发流程:
1.1 创建 Connection: 根据 url,user 和 password 创建一个 jms Connection.
1.2 创建 Session: 在 connection 的基础上创建一个 session,同时设置是否支持事务和 ACKNOWLEDGE 标识.
1.3 创建 Destination 对象: 需指定其对应的主题(subject)名称,producer 和 consumer 将根据 subject 来发送 / 接收对应的消息.
1.4 创建 MessageProducer: 根据 Destination 创建 MessageProducer 对象,同时设置其持久模式.
1.5 发送消息到队列(Queue): 封装 Message 消息,使用 MessageProducer 的 send 方法将消息发送出去.
2,消费者(consumer)开发流程:
2.1 实现 MessageListener 接口: 消费者类必须实现 MessageListener 接口,然后在 onMessage() 方法中监听消息的到达并处理.
2.2 创建 Connection: 根据 url,user 和 password 创建一个 jms Connection,如果是 durable 模式,还需要给 connection 设置一个 clientId.
2.3 创建 Session 和 Destination: 与 ProducerTool.java 中的流程类似,不再赘述.
2.4 创建 replyProducer【可选】:可以用来将消息处理结果发送给 producer.
2.5 创建 MessageConsumer: 根据 Destination 创建 MessageConsumer 对象.
2.6 消费 message: 在 onMessage() 方法中接收 producer 发送过来的消息进行处理,并可以通过 replyProducer 反馈信息给 producer
样例代码:
在消息生产者中定义一个队列,destination_request, 提供消息,同时定义一个监听消息的队列拥有接受消费者回复的消息,destination_response.
View Code
在消息消费者中定义一个队列,destination_request, 用于接受消息,同时定义一个回复收到消息的队列回复生产者已经收到消息,destination_response.
View Code
消息消费者收到消息,并打印出来,同时发送回复消息.截图如下:
消息生产者生产消息,同时接受到消费者回复的消息并打印出来.截图如下:
2),Publisher/Subscriber(发布 / 订阅者) 消息模式开发流程
1,订阅者(Subscriber)开发流程:
1.1 实现 MessageListener 接口: 在 onMessage() 方法中监听发布者发出的消息队列,并做相应处理.
1.2 创建 Connection: 根据 url,user 和 password 创建一个 jms Connection.
1.3 创建 Session: 在 connection 的基础上创建一个 session,同时设置是否支持事务和 ACKNOWLEDGE 标识.
1.4 创建 Topic: 创建 2 个 Topic, topictest.messages 用于接收发布者发出的消息,topictest.control 用于向发布者发送消息,实现双方的交互.
1.5 创建 consumer 和 producer 对象:根据 topictest.messages 创建 consumer,根据 topictest.control 创建 producer.
1.6 接收处理消息:在 onMessage() 方法中,对收到的消息进行处理,可直接简单在本地显示消息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新,文件操作等等),并且可以使用 producer 对象将处理结果返回给发布者.
2,发布者(Publisher)开发流程:
2.1 实现 MessageListener 接口:在 onMessage() 方法中接收订阅者的反馈消息.
2.2 创建 Connection: 根据 url,user 和 password 创建一个 jms Connection.
2.3 创建 Session: 在 connection 的基础上创建一个 session,同时设置是否支持事务和 ACKNOWLEDGE 标识.
2.4 创建 Topic: 创建 2 个 Topic,topictest.messages 用于向订阅者发布消息,topictest.control 用于接 收订阅者反馈的消息.这 2 个 topic 与订阅者开发流程中的 topic 是一一对应的.
2.5 创建 consumer 和 producer 对象: 根据 topictest.messages 创建 publisher; 根据 topictest.control 创建 consumer,同时监听订阅者反馈的消息.
2.6 给所有订阅者发送消息,并接收反馈消息. 注:可同时运行多个订阅者测试查看此模式效果 .
样例代码:
消息发布者发布消息,定义一个主题 example.A
View Code
消息订阅者接收消息,定义一个与发布者相对应的主题 example.A.
View Code
消息发布者发布消息,并打印截图如下:
消息订阅者接受消息并打印截图如下:(消息订阅者需在发布者之前启动,可保证能取到订阅的消息)
来源: https://www.cnblogs.com/wlandwl/p/activemq.html