前言
Rabbitmq 是一个开源的消息代理软件, 是 AMQP 协议的实现. 核心作用就是创建消息队列, 异步发送和接收消息. 通常用来在高并发中处理削峰填谷, 延迟处理, 解耦系统之间的强耦合, 处理秒杀订单. 入门 rabbitmq 之前主要是想了解下秒杀排队订单入库后, 异步通知客户端秒杀结果.
基础知识
1, 基本概念(角色)
了解 rabbitmq 之前先要了解 3 个基本概念: 生产者, 消费者, 代理(队列). rabbitmq 在生产者和代理中间做了一层抽象. 这样消息生产者和队列就没有直接联系, 在中间加入了一层交换器(Exchange). 这样消息生产者把消息交给交换器, 交换器根据路由策略再把消息转发给对应队列.
2, 消息发送原理
首先要发送消息必须先连接到 rabbitmq-server. 那怎么连接和发送消息呢? 首先你的应用程序和 rabbitmq 会创建一个 TCP 链接. 一旦 TCP 链接并通过认证. 认证就是你试图连接 rabbitmq 时服务器的用户名和密码. 认证通过, 你的应用程序和 rabbitmq 之间就创建了一条 AMQP 信道(Channel), 后续所有的消息都是基于这个信道完成.
3, 为什么不直接通过 TCP 直接发送消息
对于操作系统来说创建和销毁 TCP 连接是非常昂贵的开销, 而在并发高峰期时再去处理 TCP 创建与销毁显然是不合适的. 这就造成了 TCP 的巨大浪费, 而且操作系统每秒创建 TCP 的能力也是有限的, 因此直接通过 TCP 发送消息会很快遇到瓶颈.
交换器(Exchange)
前面提到 rabbitmq 在你的应用程序和队列中间增加了一层代理, 代理根据路由策略把消息路由到指定队列. 交换器分为 4 类:
- direct(默认)
- headers
- fanout
- topic
1,direct. 是交换器的默认实现, 根据路由规则匹配上就会把消息投递到对应队列.
2,headers. 是一个自定义匹配规则类型, 在队列和交换器绑定时, 会设置一组键值对, 消息中也包含一组键值对, 当这些键值对匹配上, 则会投递消息到对应队列.
3,fanout. 是一种发布订阅模式, 当你发送一条消息时, 交换器会把消息广播到所有附加到这个交换器的队列上. 对于 fanout 来说 routingkey 是无效的.
4,topic. 可以更灵活的匹配自己想订阅的消息. 也是 routingkey 用处最大的一种. 类似我们配置 request mapping 中的通配符.
安装 rabbitmq
我是本机 ubuntu16 中安装, 没有配置软件源, 安装速度倒还能接收. rabbitmq 是 erlang 开发, 所以先安装 erlang, 再安装 rabbitmq-server
- sudo apt-get install erlang
- sudo apt-get install rabbitmq-server
安装完成后查看运行状态 systemctl status rabbitmq-server
启动 service rabbitmq-server start
停止 serivce rabbitmq-server stop
重启 service rabbitmq-server restart
安装好 rabbitmq 后, 启用 web 客户端 rabbitmq-plugines enable rabbitmq_management.
启动后默认使用 guest/guest 访问, 仅支持 localhost 访问: http://localhost:15672. Web 站点默认端口 15672. rabbitmq 默认端口 5672
在 SpringBoot 中集成 rabbitmq
1, 配置信息
- sprimg.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
2, 默认交换器 (direct) 实现
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class DirectConfig {
- @Bean
- public Queue directQueue(){
- return new Queue("direct",false); // 队列名字, 是否持久化
- }
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("direct",false,false);// 交换器名称, 是否持久化, 是否自动删除
- }
- @Bean
- Binding binding(Queue queue, DirectExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("direct");
- }
- }
消息生产者(发送者)
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * 消息发送 -- 生产消息
- */
- @Component
- public class Sender {
- @Autowired
- AmqpTemplate rabbitmqTemplate;
- public void send(String message){
- System.out.println("发送消息:"+message);
- rabbitmqTemplate.convertAndSend("direct",message);
- }
- }
消息消费者(接收者)
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "direct")
- public class Receiver {
- @RabbitHandler
- public void handler(String message){
- System.out.println("接收消息:"+message);
- }
- }
OK, 来测试下, 默认情况下, 只能本机访问, 我本地是在 Ubuntu 虚拟机中, 我在虚拟机中运行 demo
- import com.zhangfei.mq.Sender;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Controller;
- import org.springframework.Web.bind.annotation.RequestMapping;
- import org.springframework.Web.bind.annotation.ResponseBody;
- @Controller
- @RequestMapping("/rabbitmq")
- public class MyRabbitmqController {
- @Autowired
- Sender sender;
- @RequestMapping("/sender")
- @ResponseBody
- public String sender(){
- System.out.println("send string:hello world");
- sender.send("hello world");
- return "sending...";
- }
- }
运行结果
参考资料
https://www.cnblogs.com/vipstone/p/9950434.html [推荐. 作者: 王磊] 这里基础概念里有示意图, 可以对 rabbit 涉及到的基础概念和流程有一个直观的认识
来源: http://www.bubuko.com/infodetail-2927926.html