一, 消息中间件的介绍
介绍
消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流, 并基于 数据通信 来进行分布式系统的集成.
特点(作用)
应用解耦
异步通信
流量削峰
(海量)日志处理
消息通讯
......
应用场景
根据消息队列的特点, 可以衍生出很多场景, 或者说很多场景都能用到. 下面举几个例子:
1)异步通信
注册时的短信, 邮件通知, 减少响应时间;
2)应用解耦
信息发送者和消息接受者无需耦合, 比如调用第三方;
3)流量削峰
例如秒杀系统;
二, 消息中间件的对比
1.ActiveMQ
官网: http://activemq.apache.org/
简介:
ActiveMQ 是 Apache 出品, 最流行的, 能力强劲的开源消息总线. ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现, 尽管 JMS 规范出台已经是很久的事情了, 但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位.
特点:
支持来自 Java,C,C ++,C#,Ruby,Perl,Python,PHP 的各种跨语言客户端和协议
完全支持 JMS 客户端和 Message Broker 中的企业集成模式
支持许多高级功能, 如消息组, 虚拟目标, 通配符和复合目标
完全支持 JMS 1.1 和 J2EE 1.4, 支持瞬态, 持久, 事务和 XA 消息
Spring 支持, 以便 ActiveMQ 可以轻松嵌入到 Spring 应用程序中, 并使用 Spring 的 xml 配置机制进行配置
专为高性能集群, 客户端 - 服务器, 基于对等的通信而设计
CXF 和 Axis 支持, 以便 ActiveMQ 可以轻松地放入这些 web 服务堆栈中以提供可靠的消息传递
可以用作内存 JMS 提供程序, 非常适合单元测试 JMS
支持可插拔传输协议, 例如 in-VM,TCP,SSL,NIO,UDP, 多播, JGroups 和 JXTA 传输
使用 JDBC 和高性能日志支持非常快速的持久性
2.RabbitMQ
官网: http://www.rabbitmq.com/
简介:
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现. RabbitMQ 轻巧且易于部署在云端. 它支持多种消息传递协议. RabbitMQ 可以部署在分布式和联合配置中, 以满足高规模, 高可用性需求. RabbitMQ 可运行在许多操作系统和云环境中, 并为大多数流行语言提供广泛的开发工具.(来自官网翻译)
AMQP (Advanced MessageQueue): 高级消息队列协议. 它是应用层协议的一个开放标准, 为面向消息的中间件设计, 基于此协议的客户端与消息中间件可传递消息, 并不受产品, 开发语言等条件的限制.
RabbitMQ 最初广泛应用于金融行业, 根据官网描述, 它具有如下特点:
特点:
异步消息传递: 支持多种消息协议, 消息队列, 传送确认, 灵活的路由到队列, 多种交换类型;
支持几乎所有最受欢迎的编程语言: Java,C,C ++,C#,Ruby,Perl,Python,PHP 等等;
可以部署为高可用性和吞吐量的集群; 跨多个可用区域和区域进行联合;
可插入的身份验证, 授权, 支持 TLS 和 LDAP.;
提供了一个易用的用户界面, 使得用户可以监控和管理消息 Broker 的许多方面;
提供了许多插件, 来从多方面进行扩展, 也可以编写自己的插件.
3. Kafka
官网: http://kafka.apache.org/
简介:
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台, 由 Scala 和 Java 编写. Kafka 是一种高吞吐量的分布式发布订阅消息系统, 它可以处理消费者规模的网站中的所有动作流数据. 这种动作 (网页浏览, 搜索和其他用户的行动) 是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像 Hadoop 的一样的日志数据和离线分析系统, 但又要求实时处理的限制, 这是一个可行的解决方案. Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理, 也是为了通过集群来提供实时的消息.
Kafka 它主要用于处理活跃的流式数据, 因此 Kafaka 在大数据系统中使用较多.
特点:
同时为发布和订阅提供高吞吐量. 据了解, Kafka 每秒可以生产约 25 万消息(50 MB), 每秒处理 55 万消息(110 MB).
可进行持久化操作. 将消息持久化到磁盘, 因此可用于批量消费, 例如 ETL, 以及实时应用程序. 通过将数据持久化到硬盘以及 replication 防止数据丢失.
分布式系统, 易于向外扩展. 所有的 producer,broker 和 consumer 都会有多个, 均为分布式的. 无需停机即可扩展机器.
消息被处理的状态是在 consumer 端维护, 而不是由 server 端维护. 当失败时能自动平衡.
支持 online 和 offline 的场景.
4. RocketMQ
官网: http://rocketmq.apache.org/
简介:
RocketMQ 是阿里开源的消息中间件, 目前在 Apache 孵化, 使用纯 Java 开发, 具有高吞吐量, 高可用性, 适合大规模分布式系统应用的特点. RocketMQ 思路起源于 Kafka, 但并不是简单的复制, 它对消息的可靠传输及事务性做了优化, 目前在阿里集团被广泛应用于交易, 充值, 流计算, 消息推送, 日志流式处理, binglog 分发等场景, 支撑了阿里多次双十一活动.
特点:
支持发布 / 订阅 (Pub/Sub) 和点对点 (P2P) 消息模型
在一个队列中可靠的先进先出 (FIFO) 和严格的顺序传递
支持拉 (pull) 和推 (push) 两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议, 如 JMS,MQTT 等
分布式高可用的部署架构, 满足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置, 指标和监控等功能丰富的 Dashboard
三, ActiveMQ 的安装
1. 安装步骤
activemq 在各个系统下都有对应的安装包. 以下来演示 Linux 系统下安装 activemq.
进入 apache-activemq-5.15.8/bin 目录, 启动 activemq./activemq start
输出以上信息, 表示启动成功.
2. 安装遇到的问题
在安装过程中, 通过查看 activemq 的运行状态,
显示以上.
通过./bin/activemq console 命令查看运行日志:
主机名中包含非法字符;
那么解决办法就很简单了, 改主机名:
1, 方法一使用 hostnamectl 命令
hostnamectl set-hostname 主机名
2, 方法二: 修改配置文件 /etc/hostname 保存退出
修改完成之后重启即可, 这里我使用的是方法一:
hostnamectl set-hostname activemq
查看运行状态:
五, ActiveMQ 页面介绍
待 ActiveMQ 安装启动好, 访问 http://ip:8161/admin, 登录名和密码都是 admin(在配置文件中可修改), 进入 ActiveMQ 的主页:
下面来介绍每个菜单的功能:
1.Queue 消息队列页面
Name: 消息队列的名称.
Number Of Pending Messages: 未被消费的消息数目.
Number Of Consumers: 消费者的数量.
Messages Enqueued: 进入队列的消息 ; 进入队列的总消息数目, 包括已经被消费的和未被消费的. 这个数量只增不减.
Messages Dequeued: 出了队列的消息, 可以理解为是被消费掉的消息数量. 在 Queues 里它和进入队列的总数量相等(因为一个消息只会被成功消费一次), 如果暂时不等是因为消费者还没来得及消费.
2.Topic 主题页面
Name: 主题名称.
Number Of Pending Messages: 未被消费的消息数目.
Number Of Consumers: 消费者的数量.
Messages Enqueued: 进入队列的消息 ; 进入队列的总消息数目, 包括已经被消费的和未被消费的. 这个数量只增不减.
Messages Dequeued: 出了队列的消息, 可以理解为是被消费掉的消息数量. 在 Topics 里, 因为多消费者从而导致数量会比入队列数高.
3.Subscribers 查看订阅者页面
查看订阅者信息, 只在 Topics 消息类型中这个页面才会有数据.
4.Connections 查看连接数页面
六, 简单使用
引入 jar 包:
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- <version>5.7.0</version>
- </dependency>
1. 点对点 (P2P) 模型
点对点模型, 采用的是队列 (Queue) 作为消息载体. 在该模式中, 一条消息只能被一个消费者消费, 没有被消费的, 只能留在队列中, 等待被消费, 或者超时. 举个例子, 如果队列中有 10 条消息, 有两个消费者, 就是一个消费者消费 5 条信息, 你一条我一条. 以下以代码演示.
消息发布者:
- public static void main(String[] args) throws JMSException {
- /*
- * 实现步骤
- * 1. 建立 ConnectionFactory 工厂对象, 需要填入用户名, 密码, 连接地址(一般使用默认, 如果没有修改的话)
- * 2. 通过 ConnectionFactory 对象创建一个 Connection 连接, 并且调用 Connection 的 start 方法开启连接, Connection 方法默认是关闭的
- * 3. 通过 Connection 对象创建 Session 会话(上下文环境对象), 用于接收消息, 参数 1 是是否启用事物, 参数 2 是签收模式, 一般设置为自动签收
- * 4. 通过 Session 对象创建 Destination 对象, 指的是一个客户端用来制定生产消息目标和消费消息来源的对象. 在 PTP 的模式中, Destination 被称作队列, 在 Pub/Sub 模式中, Destination 被称作主题(Topic)
- * 5. 通过 Session 对象创建消息的发送和接收对象(生产者和消费者)
- * 6. 通过 MessageProducer 的 setDeliverMode 方法为其设置持久化或者非持久化特性
- * 7. 使用 JMS 规范的 TextMessage 形式创建数据(通过 Session 对象), 并用 MessageProducer 的 send 方法发送数据. 客户端同理. 记得关闭
- */
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
- ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("queue");
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- for (int i=0;i<=5;i++) {
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText("我是第"+i+"消息");
- producer.send(textMessage);
- }
- if(connection!=null){
- connection.close();
- }
- }
消息消费者:
- public static void main(String[] args) throws JMSException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
- ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("queue");
- MessageConsumer consumer = session.createConsumer(destination);
- while (true){
- TextMessage message = (TextMessage) consumer.receive();
- if (message==null){
- break;
- }
- System.out.println(message.getText());
- }
- if(connection!=null){
- connection.close();
- }
- }
先启动两个消费者, 在启动发布者:
2. 发布 / 订阅 (Pub/Sub) 模型
发布 / 订阅模型采用的是主题 (Topic) 作为消息通讯载体. 该模式类似微信公众号的模式. 发布者发布一条信息, 然后将该信息传递给所有的订阅者. 注意: 订阅者想要接收到该信息, 必须在该信息发布之前订阅.
发布者发布信息:
public static void main(String[] args) throws JMSException, IOException { // 创建一个 ConnectionFactory 对象连接 MQ 服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616"); // 创建一个连接对象 Connection connection; connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 使用 Connection 对象创建一个 Session 对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个 Destination 对象. topic 对象 Topic topic = session.createTopic("test-topic"); // 使用 Session 对象创建一个消费者对象. MessageConsumer consumer = session.createConsumer(topic); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印结果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("这是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic 消费者启动...."); // 等待接收消息 System.in.read(); // 关闭资源 consumer.close(); session.close(); connection.close(); }
订阅者订阅信息:
public static void main(String[] args) throws JMSException { // 1, 创建一个连接工厂对象, 需要指定服务的 ip 及端口. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616"); // 2, 使用工厂对象创建一个 Connection 对象. Connection connection = connectionFactory.createConnection(); // 3, 开启连接, 调用 Connection 对象的 start 方法. connection.start(); // 4, 创建一个 Session 对象. // 第一个参数: 是否开启事务. 如果 true 开启事务, 第二个参数无意义. 一般不开启事务 false. // 第二个参数: 应答模式. 自动应答或者手动应答. 一般自动应答. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5, 使用 Session 对象创建一个 Destination 对象. 两种形式 queue,topic, 现在应该使用 topic Topic topic = session.createTopic("test-topic"); // 6, 使用 Session 对象创建一个 Producer 对象. MessageProducer producer = session.createProducer(topic); // 7, 创建一个 Message 对象, 可以使用 TextMessage. for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第" + i + "一个 ActiveMQ 队列目的地的消息"); // 8, 发送消息 producer.send(textMessage); } // 9, 关闭资源 producer.close(); session.close(); connection.close(); }
订阅者要提前订阅, 所以先运行订阅者.
3. 两种模式对比
1)由以上, 我们可以总结出 ActiveMQ 的实现步骤:
建立 ConnectionFactory 工厂对象, 需要填入用户名, 密码, 连接地址
通过 ConnectionFactory 对象创建一个 Connection 连接
通过 Connection 对象创建 Session 会话
通过 Session 对象创建 Destination 对象; 在 P2P 的模式中, Destination 被称作队列(Queue), 在 Pub/Sub 模式中, Destination 被称作主题(Topic)
通过 Session 对象创建消息的发送和接收对象
发送消息
关闭资源
2)可以看出, P2P 模式和 Pub/Sub 模式, 在实现上的区别是通过 Session 创建的 Destination 对象不一样, 在 P2P 的模式中, Destination 被称作队列(Queue), 在 Pub/Sub 模式中, Destination 被称作主题(Topic)
七, 参考
https://www.jianshu.com/p/0363ac9ff574 https://juejin.im/post/5adaaae351882567356415eb
来源: https://www.cnblogs.com/yanfei1819/p/10615605.html