1.rocketmq 图形化控制台安装
虽然 rocketmq 为用户提供了使用命令行管理主题, 消费组以及 broker 配置的功能, 但对于不够熟练的非运维人员来说, 命令行的管理界面还是较难使用的. 为此, 我们可以使用图形化的管理界面来简化管理操作.
rocketmq 官方推荐的图形化控制台目前还处在不成熟的孵化阶段. 仓库地址为( https://github.com/apache/rocketmq-externals ), 其中包含了 rocketmq 相关拓展的, 属于孵化期的各种项目. 下载源码之后, 找到 rocketmq-console 文件夹, 这就是 rocketmq 官方推荐的图形化控制台项目, 基于 springboot 和 AngularJS.
打开 application.properties, 能看到一些重要参数的配置, 例如端口, nameServer 地址, 登录权限控制等等. 对于启动参数的设置, 可以选择直接在配置文件中修改; 也可在启动项目时通过命令行指定.
为部署项目, 先执行 maven 的打包命令(mvn clean package), 生成 jar 包.
然后执行 java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876(nameServer 地址) --server.port=8080(启动端口).
通过浏览器访问项目启动的 ip/port, 即可看到以下管理界面(右上角可以中英文切换).
至此, rocketmq 图形化控制台安装成功.
2.rocketmq 集群部署
rocketmq 的单机部署虽然简单方便, 却存在着单点故障的问题. 通过集群部署 nameServer 和 broker 可以实现 rocketmq 服务端的高可用.
下面介绍 rocketmq 的集群部署, 以在两台机器上搭建一个双主双从的高可用 rocketmq 集群为例子, 这两台机器 (Linux 环境) 的 IP 地址分别是 192.168.32.130, 和 192.168.32.131.
2.1 nameServer 集群部署
由于 nameServer 的集群节点之间互不通信, 所以不需要额外的配置.
在两台机器的 rocketmq 安装的根路径下分别执行 "sh bin/mqnamesrv", 各启动一个 nameServer, 使用其默认的端口 9876.
现在, 192.168.32.130:9876 和 192.168.32.131:9876 上都各运行着一个 nameServer 服务.
2.2 broker 集群部署
broker 作为 rocketmq 的核心, 其运行的稳定性至关重要. 前面提到的双主双从 (2-master-2-slave) 实际上指代的是 broker 的集群运行模式, 从 broker 作为主 broker 的备份, 负责和主 broker 保持数据同步, 可读不可写.
rocketmq 通过赋予 broker 名称来区别不同角色的 broker. 我们把当前两个角色的 broker 分别命名为 broker-a 和 broker-b.
在 192.168.32.130 部署 broker-a 的主和 broker-b 的从, 在 192.168.32.131 部署 broker-b 的主和 broker-a 的从, 同一角色的主从分别部署在不同的机器上. 这样, 即使任意一台机器挂掉, 由于从 broker 的存在, broker-a 和 broker-b 依然可以对外提供服务.
启动 broker 时, 可以通过指定配置文件的方式为 broker 设置一系列参数. 不同角色的 broker, 主从 broker 之间的配置文件参数内容各不相同.
broker 配置文件中参数介绍:
brokerName:broker 名称, 互为主从的 broker 名称保持一致.
namesrvAddr: 关联的 nameserver 地址, 多个用 ";" 隔开.
listenPort:broker 监听端口, 同一机器部署多个 broker 不能监听端口不能相同, 避免冲突
storePathRootDir:broker 存储数据的根目录
brokerClusterName:broker 集群名称, 相同集群的 master 能互相识别
brokerId:0 代表 master, 大于 0 代表不同的 slave-broker
deleteWhen: 删除过时消息的时间, 04 代表凌晨 4 点
fileReservedTime: 落盘数据文件保存的时长, 单位小时
brokerRole:brokerRole 有三种类型, SYNC_MASTER,ASYNC_MASTER 和 SLAVE,SYNC_M 和 ASYNC_M 都代表主 broker, 区别在于主从之间进行数据同步的方式不同. SYNC 代表主从数据同步完成, 才向客户端返回消息发送成功结果; 反之 ASYNC 代表主库收到消息后立即返回发送消息成功结果.
可以看到, ASYNC_MASTER 的效率更高, 但是当 MASTER 出现故障时, 可能出现消息丢失的问题. 需要用户进行效率与可靠性之间的取舍.
flushDiskType:flushDiskType 有两种类型, SYNC_FLUSH(同步刷盘)和 ASYNC_FLUSH(异步刷盘), 用于指定 broker 在接收到消息后, 返回消息发送结果和数据落盘处理的策略. 当选择同步刷盘时, 只有当消息数据真正的写入磁盘持久化时, 才返回消息发送成功. 选择异步刷盘时, 消息数据写入本地虚拟内存映射后, 就直接返回.
broker 的本地落盘策略和主从同步策略的选择类似, 都需要在效率与可靠性, 一致性之间进行取舍.
比较推荐的一种配置方案是主从同步策略选择 SYNC_MASTER 而本地落盘策略选择 SYNC_FLUSH. 从可靠性的角度来看, 只要主从 broker 没有同时挂掉(避免了单点故障), 消息将不会丢失; 从效率的角度来看, 由于主从 broker 都是异步落盘, 执行效率也有一定的保障, 是一个优秀的折中方案.
broker 配置文件详情:
broker-a 主 broker 配置文件(broker-a.properties):
- brokerName=broker-a
- namesrvAddr=192.168.32.130:9876;192.168.32.131:9876
- listenPort=10911
- storePathRootDir=/root/rocketmq/data/store/store-a
- brokerClusterName=DefaultCluster
- brokerId=0
- deleteWhen=04
- fileReservedTime=48
- brokerRole=SYNC_MASTER
- flushDiskType=ASYNC_FLUSH
broker-b 主 broker 配置文件(broker-b.properties):
- brokerName=broker-b
- namesrvAddr=192.168.32.130:9876;192.168.32.131:9876
- listenPort=10911
- storePathRootDir=/root/rocketmq/data/store/store-b
- brokerClusterName=DefaultCluster
- brokerId=0
- deleteWhen=04
- fileReservedTime=48
- brokerRole=SYNC_MASTER
- flushDiskType=ASYNC_FLUSH
broker-a 从 broker 配置文件(broker-a-s.properties):
- brokerName=broker-a
- namesrvAddr=192.168.32.130:9876;192.168.32.131:9876
- listenPort=11011
- storePathRootDir=/root/rocketmq/data/store/store-a
- brokerClusterName=DefaultCluster
- brokerId=1
- deleteWhen=04
- fileReservedTime=48
- brokerRole=SLAVE
- flushDiskType=ASYNC_FLUSH
broker-b 从 broker 配置文件(broker-b-s.properties):
- brokerName=broker-b
- namesrvAddr=192.168.32.130:9876;192.168.32.131:9876
- listenPort=11011
- storePathRootDir=/root/rocketmq/data/store/store-b
- brokerClusterName=DefaultCluster
- brokerId=1
- deleteWhen=04
- fileReservedTime=48
- brokerRole=SLAVE
- flushDiskType=ASYNC_FLUSH
在对应机器依次执行以下命令, 依次启动 broker(先启动主 broker, 后启动从 broker).
192.168.32.130 执行: sh bin/mqbroker -c [配置文件路径 eg: rocketmq/data/conf/broker-a.properties]
192.168.32.131 执行: sh bin/mqbroker -c [配置文件路径 eg: rocketmq/data/conf/broker-b.properties]
192.168.32.131 执行: sh bin/mqbroker -c [配置文件路径 eg: rocketmq/data/conf/broker-a-s.properties]
192.168.32.130 执行: sh bin/mqbroker -c [配置文件路径 eg: rocketmq/data/conf/broker-b-s.properties]
此时, rocketmq 双主双从的 broker 集群已经搭建完毕. 启动图形控制台, 指定命令行参数 namesrvAddr=192.168.32.130:9876;192.168.32.131:9876, 可看到以下信息:
3. 使用 java 客户端收发消息
前面我们通过命令行的方式进行了 rocketmq 收发消息的测试. 但在实际使用过程中, 还是需要 sdk 客户端来进行收发消息. 这里, 我们使用 rocketmq 提供的 java sdk 来进行 rocketmq 的消息收发实验.
先通过图形控制台创建一个主题用于测试, 主题名称为 "TopicTest"(随便取的).
接着启动一个 java 项目, 加入 rocketmq-client 的依赖.
maven 坐标:
- <!-- 原生 rocketmq client -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.4.0</version>
- </dependency>
生产者 producer 示例代码:
- public class Producer {
- public static void main(String[] args) throws Exception {
- final DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");
- // 设置 nameServer 地址
- producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876");
- producer.start();
- for (int i = 0; i <1000; i++) {
- try {
- // 构造消息对象, topic=TopicTest,tag=TagA
- Message msg = new Message("TopicTest", "TagA" ,
- ("Hello RocketMQ TopicTest" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- // 发送消息
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
- }
- }
消费者 consumer 示例代码:
- public class Consumer {
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
- // 设置 nameServer 地址
- consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876");
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- // 订阅主题 tag="*" 代表订阅 TopicTest 主题下所有子主题消息
- consumer.subscribe("TopicTest", "*");
- // 注册消息监听回调函数
- consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
- for(MessageExt messageExt : msgs){
- String strMsg = new String(messageExt.getBody());
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), strMsg);
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
- }
生产者和消费者都是简单的 main 方法启动, 先启动 producer 发送消息, 然后启动 consumer 接受消息, 控制台上将会看到接受时消息的日志. 可以试试主动关闭一个 master-broker, 看看 broker 集群的消息收发是否正常.
至此, 通过 java 客户端使用 rocketmq 的测试告一段落.
总结
本篇博客介绍了 rocketmq 的集群部署, 图形化界面的安装以及如何使用 java 客户端与 rocketmq 进行交互. rocketmq 还有着许多好用, 强大的功能, 后续的博客将结合着 rocketmq 的源码来介绍它们.
去阅读并理解源码, 可以在解决问题时能看得更深, 更远. 通过阅读 rocketmq 的源码, 除了更好地掌握 rocketmq 外, 也能够从源码中学习到许多架构设计和编程实践相关的知识.
如有理解不到位的地方, 请多多指教.
posted on 2019-09-27 00:38 小熊餐馆 阅读(...) 评论(...) 编辑 收藏
来源: https://www.cnblogs.com/xiaoxiongcanguan/p/11579300.html