还没关注?
快动动手指!
聊技术, 论职场!
为 IT 人打造一个 "有温度" 的 狸猫技术窝
目录
面向 EDA(事件驱动架构) 的方式来设计你的消息
AMQP routing key 的设计
RabbitMQ cluster 搭建
Mirror queue policy 设置
两个不错的 RabbitMQ plugin 大型应用插件 (Sharding,Rederation)
Queue 镜像失败手动同步
各集群配置同步方式 (RabbitMQ export\import)
客户端连接方式 (尽量采用 AMQP 组来动态链接)
RabbitMQ 产线二次产品化封装 (消息补偿, 发送消息持久化, 异常处理, 监控页面, 重复消息剔除)
1. 面向 EDA 来设计你的消息
在通常情况下你在使用消息中间件的时候, 都是未经设计的使用, 你没有把应用架构和系统架构边界搞清楚.
消息中间件只是一个纯粹的技术工具, 当你引入的时候是站在应用架构的角度引入时, 你就不会用到最后, 发现越来越混乱, 而且也无法结合软件模式, 方法论, 最佳实践来综合提升系统的架构能力.
EDA(Event Driven Architecture, EDA ) 事件驱动架构, 它是一种用来在 SOA 或者 Micro service 中进行的架构模式. 它的好处有几个, 柔性具有很高的伸缩性.
既然要 EDA 就要规划好你当前的系统边界之内有多少业务实体, 这些实体是围绕着领域模型而得来.
所以这里不要很主观的就定义一些你认为的事件, 这些事件要根据业务实体中的对象来设计. 业务实体起码是有唯一 Identity 的.
比如, 订单, 商品, 围绕着这些实体展开, 订单可能有几个状态是比较常用的, 创建, 支付, 配送, 取消. 商品可能有价格, 关键属性修改等等. 这些实体的抽象和提炼取决于你当前的业务.
这些是相对理论的指导思想, 有了这些之后你可以落地你的 Rabbitmq, 这样你就不会跑偏了.
比如, 你的消息名称不会是看起来没结构和层次的, deliveryMssage(配送消息).
而是应该, order.delivery.ondeliveryEvented(订单. 配送. 配送完成事件) 这样的结构.
当你的层次结构不满足业务需求的时候, 你可能还需要进一步明确事件范围, order.viporder.delivery.ondeliveryEvented(订单. VIP 订单. 配送. 配送完成事件).
上图是一个事件驱动的基本场景, 它最瞩目的几个特性就是这几个, 首先是异步化的, 可以大大提高系统的抗峰值能力.
然后就是解耦, 这不用说了, 设计模式里的观察者模式没有人不知道它的好处.
伸缩性, 可以按需 scaleout, 比如 rabbitmq 的 node 可以很方便的加入. 最终一致性解决了分布式系统的 CAP 定理的问题.
2.AMQP routing key 的设计
AMQP 协议中约定了 routing key 的设计和交互. 为了实现订阅发布功能, 我们需要某种方式能够订阅自己所感兴趣的事件.
所以在 AMQP 中的 Binding 中, 可以根据 routing key 来进行模式匹配. 所以, 这里可以结合 amqp routingkey 与领域事件, 发出来的事件就相当于 amqp 中的 routingkey, 这样可以完美的结合起来.
你的事件肯定是随着业务发展逐渐增加的, 而这个事件集合也没办法在一开始就定义清楚, 所以这里有一个需要注意的就是, 绑定的时候千万不要写死具体的 routing key.
比如, order.delivery.OnDeliveryEvented 是订单配送, 此时你 Binding 的时候 routingkey 就写成了 "order.delivery.OnDeliveryEvented".
未来订单事件一扩展, 就会很麻烦, 不相关的事件都被订阅到, 无法细化或者事件你无法获取到, 因为 routingkey 改变了.
所以在绑定的时候记住具体点绑定, 也就是借助字符串的模式匹配绑定, 比如,*.delivery.*,*.onDeliveryEvented" 这样. 将来越来越多的 routingkey 和 event 出来都不会影响你的绑定. 你只需要根据自己的关心程度, 绑定在事件的不同层级上即可.
上图中, orderBinding 绑定了 order 事件, 它订阅了顶级事件, 也就是说未来任何类型的订单都可以被订阅到
比如, order.normalorder.delivery.onDeliveryEvent 也可以被订阅到. 而 viporderBinding 订阅了 viporder 事件, 如果发送了一个 order.normalorder.delivery.onDeliveryEvent 就跟它没关系了.
3.RabbitMQ cluster 搭建
搞清楚了应用架构的事情, 我们开始着手搭建 RabbitMQ cluster.rabbitmq 这款 AMQP 产品是用 erlang 开发的, 那么我们稍微介绍下 erlang.
我第一次正式接触 erlang 就是从 rabbitmq 开始的, 一开始并没有太多感觉到特别的地方, 后来才明白越明白越发现挺喜欢这门语言的.
喜欢的理由就是, 它是天然的分布式语言. 这句话说起来好像挺平常的, 但是当你明白了. erlang.cookie 机制之后才恍然大悟. 瞬间顿悟了, 为什么要用 erlang 来搞 rabbitmq, 而是它真的很适合信息交换之类的软件.
erlang 是爱立信公司开发的专门用来开发高性能信息交换机的, 想想也会觉得那些软件的性能和稳定性要求是极高的.
RabbitMQ 的节点发现和互连真的很方便, 这在 erlang 的虚拟机中就集成了, 而且具有高度容错能力. 反正我对它很有好感.
还有一点值得骄傲的是 RabbitMQ 是伟大的 pivotal 公司的, 你应该知道 pivotal 公司是干什么的, 如果你还不清楚建议你立刻 google 下.
一开始我并没有太关注他们的 copyright, 后来对 pivotal 公司越来越佩服之后突然看到原来 RabbitMQ 也是他们家的, 突然信心倍增.
这就是影响力和口碑, 看看人家公司的 spring,springboot,spring cloud, 佩服的五体投地.
(RabbtiMQ 官网: http://www.rabbitmq.com/)
3.1. 安装 erlang & RabbitMQ
要想安装 RabbitMQ, 首先需要安装和配置好它的宿主环境 erlang. 去 erlang 官网下载好 erlang otp_src 源码包, 然后在本地执行源码安装.(erlang 官网: http://www.erlang.org/)
由于我本机已经下载好了 otp_src 源码包, 我是使用的 otp_src_19.1 版本. 下载好之后解压缩, 然后进入目录, 执行./configure --prefix=/usr/erlang/, 进行环境的检查和安装路径的选择.
如果你提示 "No curses library functions found" 错误, 是因为缺少 curses 库, yum install -y ncurses-devel. 安装后在进行 configure.
如果没有报错的话, 就说明安装成功了.
你还需要配置下环境变量:
- export PATH=$PATH:/usr/erlang/bin
- source /etc/profile
此时使用 erl 命令检查下 erlang 是否能正常工作了. 接下来安装 RabbitMQ, 去官网下载运行的包就行了. 同样要配置下环境变量, 这样你的命令才能被系统查找到. 然后运行 rabbitmq 实例.
这里有一个需要注意, 记得配置下 hosts, 在 127.0.0.1 里加上本机的名称. erlang 进程需要 host 来进行连接, 所以它会检查你的 hosts 配置.
还需要设置下防火墙, 三个端口要打开. 15672 是管理界面用的, 25672 是集群之间使用的端口, 4369 是 erlang 进程 epmd 用来做 node 连接的.
我配置了两个节点, 192.168.0.105,192.168.0.107, 现在已经全部就绪. 我们添加原始账号进入 rabbitmq 管理界面.
3.2. 配置 RabbitMQ cluster
先保证你的各个 rabbitmq 节点都是可以访问的, 且打开 rabbitmq_management plugin, 这样可以当出现某个节点挂掉之后可以切换到其他管理界面查看情况或者管理.
打开管理界面插件:
rabbitmq-plugins enable rabbitmq_management
添加账号:
rabbitmqctl add_user admin admin
添加 权限 tag
rabbitmqctl set_user_tags admin administrator
保证两个节点都是可以正常工作的. 下面我们就将这两个节点连接起来形成高可用的 cluster, 这样我们就可以让我们的 exchange,queue 在这两个节点之间复制, 形成高可用的 queue.
cd 到你的 home 目录下, 我是在 root 下, 里面有一个隐藏的. erlang.cookie 文件, 这就是我在前面介绍 erlang 时候提到的, 这个文件是 erlang 用来发现和互连的基础.
我们需要做的很简单, 将两个节点中的. erlang.cookie 设置成一样的. 这是 erlang 的约定, 一样的 cookie hash key 他认为是合法和正确的连接.
.erlang.cookie 默认是只读的, 你需要修改下写入权限, 然后复制粘贴下 cookie 字符串即可.
chmod u+w .erlang.cookie
配置好了之后接下来配置 hosts 文件, erlang 会使用 hosts 文件里的配置去发现节点.
- VIM /etc/hosts
- 192.168.0.107 rabbitmq_node2
- 192.168.0.105 rabbitmq_node1
保证同样的配置在所有的节点上都是相同的. 验证你配置的正确不正确你只需要在你的机器上 ping rabbitmq_node1, 试下请求的 ip 是不是你配置的即可.
按照 DNS 的请求原理, hosts 是最高优先权, 除非浏览器有缓存, 你直接用 ping 就不会有问题的.
选择一个节点 stop, 然后连接到另外节点.
- rabbitmqctl stop_app
- rabbitmqctl join_cluster rabbit@rabbitmq_node2
Clustering node rabbit@rabbitmq_node1 with rabbit@rabbitmq_node2 ...
rabbitmqctl start_app
节点已经连接成功.
默认情况下节点占用的 memory 是总内存的 40%, 可以根据自己的用途仔细研究 rabbitmq 的配置项.
为了提高性能, 不需要两个节点都是 disc 的节点, 所以我们需要启动一个节点为 RAM 模式.
rabbitmqctl change_cluster_node_type ram
改变 rabbitmq_node1 为内存节点模式.
4.Mirror queue policy 设置
节点是准备好了, 接下来我们需要设置 exchange,queue 高可用策略, 这样才能真的做到高可用.
现在是物理上的机器或者说虚拟机节点是高可用的, 但是里面的对象需要我们进行配置策略.
RabbitMQ 支持很好的策略模式, 需要管理员才能操作.
首先我们需要创建一个属于自己业务范围内的 vhost, 标示一个逻辑上的独立空间, 所有的账号, 策略, 队列都是强制在某个虚拟机里的. 我创建了一个 common vhost.
开始添加 policie.
最主要是 Apply to , 可以作用在 exchange 或者 queues 上, 当然也可以包含这两个.
策略选择还是比较丰富的, 最常用的是 HAmode, 还有 MessageTTL(消息的过期时间).
这些策略按照几个维度分组了, 有跟高可用相关的, 有 Federation(集群之间同步消息) 相关的 , 有 Queue 相关的, 还有 Exchange 相关的. 可以根据的业务场景进行调整.
我们定义了策略的匹配模式. order., 这样可以避免将所有的 exchange,queue 都镜像了.
我们新建了一个 ex.order.topic exchange, 它的 features 中应用了 exchange_queue_ha 策略 (相同的策略是无法叠加使用的).
其他的 exchange 并没有应用这个策略, 是因为我们的 pattern 限定了只匹配. order. 的名称.
创建一个 qu.order.crm queue, 注意看它的 node 属性里有一个 "Synchronised mirrors:rabbit@rabbitmq_node2" 镜像复制.
features 里也应用了 exchange_queue_ha 策略. 这个时候, 队列其实在两个节点里都是有的, 虽然我们创建的时候是在 rabbit@rabbitmq_node1 里的, 但是它会复制到集群里的其他节点.
在创建 HAmode 的时候可以提供 HA params 参数, 来限定复制节点的个数, 这通常用来提高性能和 HA 之间的平衡.
5. 两个不错的 RabbitMQ plugin 大型应用插件 (Sharding,Rederation)
在 rabbitmq-plugins 中有两个 plugin 还是可以试着研究研究的. rabbitmq-plugins list.
- rabbitmq-plugins list
- Configured: E = explicitly enabled; e = implicitly enabled
- | Status: * = running on rabbit@rabbitmq_node1
- |/
- [e*] amqp_client 3.6.5
- [ ] cowboy 1.0.3
- [ ] cowlib 1.0.1
- [e*] mochiweb 2.13.1
- [ ] rabbitmq_amqp1_0 3.6.5
- [ ] rabbitmq_auth_backend_ldap 3.6.5
- [ ] rabbitmq_auth_mechanism_ssl 3.6.5
- [ ] rabbitmq_consistent_hash_exchange 3.6.5
- [ ] rabbitmq_event_exchange 3.6.5
- [ ] rabbitmq_federation 3.6.5
- [ ] rabbitmq_federation_management 3.6.5
- [ ] rabbitmq_jms_topic_exchange 3.6.5
- [E*] rabbitmq_management 3.6.5
- [e*] rabbitmq_management_agent 3.6.5
- [ ] rabbitmq_management_visualiser 3.6.5
- [ ] rabbitmq_mqtt 3.6.5
- [ ] rabbitmq_recent_history_exchange 1.2.1
- [ ] rabbitmq_sharding 0.1.0
- [ ] rabbitmq_shovel 3.6.5
- [ ] rabbitmq_shovel_management 3.6.5
- [ ] rabbitmq_stomp 3.6.5
- [ ] rabbitmq_top 3.6.5
- [ ] rabbitmq_tracing 3.6.5
- [ ] rabbitmq_trust_store 3.6.5
- [e*] rabbitmq_web_dispatch 3.6.5
- [ ] rabbitmq_web_stomp 3.6.5
- [ ] rabbitmq_web_stomp_examples 3.6.5
- [ ] sockjs 0.3.4
- [e*] webmachine 1.10.3
rabbitmq_sharding,rabbitmq_federation,rabbitmq_sharding 的版本有点低了, GitHub 地址:
https://github.com/rabbitmq/rabbitmq-sharding
Rederation 可以用来进行跨 cluster 或者 node 之间同步消息.
http://www.rabbitmq.com/federated-exchanges.html
这个用来在不同的 domain 之间传递消息还是个不错的解决方案, 跨机房或者跨网络区域, 订阅别人的 rabbitmq 消息始终不太稳定, 可以用这种方式来传递消息.
6.Queue 镜像失败手动同步
有时候可能由于各种原因导致 queue mirror 失败, 这个时候可以手动进行同步, 而不是像其他分布式系统来重启节点或者重建数据.
这个还是比较方便的, 有时候总有那么几个小问题需要你手动处理的.
7. 各集群配置同步方式 (RabbitMQ export\import)
各个环境的集群配置同步也是个日常运维的问题, 还好 RabbitMQ 也提供了相关工具.
8. 客户端连接方式 (尽量采用 AMQP 组来动态链接)
由于 RabbitMQ 是 AMQP 协议的实现, 所以在进行远程连接的时候尽量采用 amqp 协议的方式连接.
关于集群的 vip 方案其实也是需要综合考虑的, 如果是统一的地址会面临三个问题, DNS,LoadBalance,VIP, 这三个点都有可能导致集群连接不上.
现在越来越多的方案倾向于在客户端做负载和故障转移, 这有很多好处, 消除了中间节点带来的故障概率. 如果这三个点加在一起出现的可用性指标肯定是比直接在客户端连接的低的多.
我们碰到最多就是 VIP 的问题, 这类系统的 VIP 不同于数据库, 数据库的 master\slave 大多都是要人工 check 后才切换, 不会随便自动的切换主从库.
而非数据库的 VIP 大多都是 Keepalived 自动检测切换, 这带来一些列问题, 包括连接重试, 心跳保持. 这只是 VIP 的出错场景之一.
还有 LoadBalance 带来的问题, DNS 出错的可能性也是很大. 所以我倾向于使用客户端来做这些.
有几个地方很重要, 第一个就是消息的 Persistent 持久化状态要带上, 第二个就是 ContentType, 这个属性很实用, 方便你查看消息的正文.
如果没设置, 默认是 null.
第三个就是 AutomaticRecoveryEnabled, 自动连接重试, 这致命重要. 当上面的 VIP 切换之后这个可以保命.
第四个就是 TopologyRecoveryEnabled, 重新恢复 Exchange,queue,binding. 在出现网络断开之后, 一旦恢复连接就会恢复这些设置以保证是最新的设置.
9.RabbitMQ 产线二次产品化封装 (消息补偿, 发送消息持久化, 异常处理, 监控页面, 重复消息剔除)
不管 rabbitmq 保证的多么强壮, 多么高可用, 记住一定要有备用方案.
一旦将发送和接受的消息持久化之后我们能做到事情就比较多了. 消息补偿是可以做的, 异常也不用担心.
但是在发送消息的时候一定要注意, 是先持久化消息在业务逻辑处理. 为了应对特殊活动的监控, 还可以开发一定的业务来监控消息的接受和处理的数量, 然后自动补偿.
在开发补偿程序的时候有一个逻辑挺饶人的, 当你对某一个消息进行补偿的时候会多出发送消息, 而接受的消息肯定是比你发送的少. 所以你在统计的时候记得 DISTINCT 下.
END
来源: http://www.tuicool.com/articles/ymeeeeq