概述
NameServer 是 RocketMQ 消息队列的状态服务器 (服务发现功能), 集群中的各个服务都需要通过 NameServer 来了解集群中各个服务的状态. 相当于 SpringCloud 中的 Eureka 的功能.
NameServer 中维护着 Producer 集群, Broker 集群, Consumer 集群的服务状态. 通过定时发送心跳数据包进行维护更新各个服务的状态.
当有新的 Producer 加入集群时, 通过上报自身的服务信息, 及获取各个 Broker Master 的信息 (Broker 地址, Topic,Queue 等信息), 这样就可以决定把对应的 Topic 消息存储到那个 Broker, 哪个 Queue 上. Consumer 同理.
NameServer 可以部署多个, 多个 NameServer 互相独立, 不会交换消息. Producer,Broker,Consumer 启动的时候都需要指定多个 NameServer, 各个服务的信息会同时注册到多个 NameServer 上, 从而能到达高可用.
NameServer 模块结构
可以看出 NameServer 中的类比较少, 8 个类. 分析起来也比较轻松.
NameServer 启动
org.apache.rocketmq.namesrv.NamesrvStartup 是 NameServer 的启动类.
通过 createNamesrvController 方法创建 NamesrvController .
NameServer 启动时首先判断是否传入了命令行参数.
命令行参数有两个,-p 和 -c
-c 可以指定 NameServer 的配置文件, 如果不指定, 则使用默认值.
-p 打印 NameServer 的配置参数信息. 打印完参数后退出进程.
下面是打印 NameServer 默认的配置参数信息.
如果想修改这些默认的参数, 则可以使用 -c 参数, 指定配置文件, 进行更改.
初始化 NamesrvController
1, 调用 NamesrvController.initialize() 初始化 NamesrvController, 然后调用 NamesrvController.start() 方法来开启 NameServer 服务.
2, 注册 ShutdownHookThread 服务. 在 JVM 退出之前, 调用 ShutdownHookThread 来进行关闭服务, 释放资源.
注意: 使用 kill -9 强制杀进程是不会执行 ShutdownHook 的.
- NamesrvController.initialize()
- public boolean initialize() {
- // 从 /namesrv/kvConfig.JSON 中加载 NameServer 的配置
- this.kvConfigManager.load();
- // 创建 Netty Server
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
- // 创建 Netty Server 执行的线程池
- this.remotingExecutor =
- Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
- // 注册 NameServer 服务接受请求的处理类
- this.registerProcessor();
- // 定时清理超时的 Broker
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- NamesrvController.this.routeInfoManager.scanNotActiveBroker();
- }
- }, 5, 10, TimeUnit.SECONDS);
- // 定时打印 NameServer 的配置信息
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- NamesrvController.this.kvConfigManager.printAllPeriodically();
- }
- }, 1, 10, TimeUnit.MINUTES);
- ...
- }
1,KVConfigManager 类默认是从 /namesrv/kvConfig.JSON 配置文件中加载 NameServer 的配置参数. 将配置参数加载保存到
HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();
变量中.
kvConfig.JSON 文件的默认路径为:
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
2, 创建并初始化 NettyRemotingServer .
NettyRomotingServer 是 NameServer 对外提供服务功能的.
3, 创建 Netty Server 执行使用的线程池.
4, 注册默认的处理类 DefaultRequestProcessor, 所有的请求均由该处理类的 processRequest 方法来处理.
5, 创建一个定时清理超时的 Broker 定时任务.
每隔 10 秒检查一遍所有 Broker 的状态的定时任务, 判断每一个 Broker 最近两分钟是否更新过. 如果没有更新则把该 Broker 的 channel 关闭 (关闭该 Broker
的长连接), 并清除相关数据.
6, 创建一个打印 NameServer 配置的定时任务.
每隔 10 分钟打印一次 NameServer 的配置参数. 即 KVConfigManager.configTable 变量的内容.
NamesrvController.registerProcessor()
注册接收请求的处理类.
- private void registerProcessor() {
- if (namesrvConfig.isClusterTest()) {
- this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
- this.remotingExecutor);
- } else {
- this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
- }
- }
默认注册的是 DefaultRequestProcessor 处理器.
如果设置了 NamesrvConfig.clusterTest = true, 则会注册 ClusterTestRequestProcessor 处理器.
ClusterTestRequestProcessor 继承 DefaultRequestProcessor.
ClusterTestRequestProcessor.getRouteInfoByTopic 方法
ClusterTestRequestProcessor 仅重写了 getRouteInfoByTopic() 方法.
判断如果获取不到 topicRouteData 数据, 则会去其它的 NameServer 上查找该数据并返回.
DefaultRequestProcessor
通过 processRequest 方法来处理客户端发过来的请求.
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- if (ctx != null) {
- log.debug("receive request, {} {} {}",
- request.getCode(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- request);
- }
- switch (request.getCode()) {
- case RequestCode.PUT_KV_CONFIG:
- return this.putKVConfig(ctx, request);
- case RequestCode.GET_KV_CONFIG:
- return this.getKVConfig(ctx, request);
- case RequestCode.DELETE_KV_CONFIG:
- return this.deleteKVConfig(ctx, request);
- case RequestCode.QUERY_DATA_VERSION:
- return queryBrokerTopicConfig(ctx, request);
- case RequestCode.REGISTER_BROKER:
- Version brokerVersion = MQVersion.value2Version(request.getVersion());
- if (brokerVersion.ordinal()>= MQVersion.Version.V3_0_11.ordinal()) {
- return this.registerBrokerWithFilterServer(ctx, request);
- } else {
- return this.registerBroker(ctx, request);
- }
- case RequestCode.UNREGISTER_BROKER:
- return this.unregisterBroker(ctx, request);
- case RequestCode.GET_ROUTEINTO_BY_TOPIC:
- return this.getRouteInfoByTopic(ctx, request);
- case RequestCode.GET_BROKER_CLUSTER_INFO:
- return this.getBrokerClusterInfo(ctx, request);
- case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
- return this.wipeWritePermOfBroker(ctx, request);
- case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
- return getAllTopicListFromNameserver(ctx, request);
- case RequestCode.DELETE_TOPIC_IN_NAMESRV:
- return deleteTopicInNamesrv(ctx, request);
- case RequestCode.GET_KVLIST_BY_NAMESPACE:
- return this.getKVListByNamespace(ctx, request);
- case RequestCode.GET_TOPICS_BY_CLUSTER:
- return this.getTopicsByCluster(ctx, request);
- case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
- return this.getSystemTopicListFromNs(ctx, request);
- case RequestCode.GET_UNIT_TOPIC_LIST:
- return this.getUnitTopicList(ctx, request);
- case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
- return this.getHasUnitSubTopicList(ctx, request);
- case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
- return this.getHasUnitSubUnUnitTopicList(ctx, request);
- case RequestCode.UPDATE_NAMESRV_CONFIG:
- return this.updateConfig(ctx, request);
- case RequestCode.GET_NAMESRV_CONFIG:
- return this.getConfig(ctx, request);
- default:
- break;
- }
- return null;
- }
所有请求的操作说明如下:
requectcode | 说明 |
---|---|
PUT_KV_CONFIG | 向 Namesrv 追加 KV 配置 |
GET_KV_CONFIG | 从 Namesrv 获取 KV 配置 |
DELETE_KV_CONFIG | 从 Namesrv 获取 KV 配置 |
QUERY_DATA_VERSION | 获取版本信息 |
REGISTER_BROKER | 注册一个 Broker,数据都是持久化的,如果存在则覆盖配置 |
UNREGISTER_BROKER | 卸载一个 Broker,数据都是持久化的 |
GET_ROUTEINTO_BY_TOPIC | 根据 Topic 获取 Broker Name、topic 配置信息 |
GET_BROKER_CLUSTER_INFO | 获取注册到 Name Server 的所有 Broker 集群信息 |
WIPE_WRITE_PERM_OF_BROKER | 去掉 BrokerName 的写权限 |
GET_ALL_TOPIC_LIST_FROM_NAMESERVER | 从 Name Server 获取完整 Topic 列表 |
DELETE_TOPIC_IN_NAMESRV | 从 Namesrv 删除 Topic 配置 |
GET_KVLIST_BY_NAMESPACE | 通过 NameSpace 获取所有的 KV List |
GET_TOPICS_BY_CLUSTER | 获取指定集群下的所有 topic |
GET_SYSTEM_TOPIC_LIST_FROM_NS | 获取所有系统内置 Topic 列表 |
GET_UNIT_TOPIC_LIST | 单元化相关 topic |
GET_HAS_UNIT_SUB_TOPIC_LIST | 获取含有单元化订阅组的 Topic 列表 |
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST | 获取含有单元化订阅组的非单元化 |
UPDATE_NAMESRV_CONFIG | 更新 Name Server 配置 |
根据 processRequest 方法分析源码, 发现接收到的所有请求操作的数据都保存在 RouteInfoManager 类中, 所有的操作都是对 RouteInfoManager 类的操作.
- RouteInfoManager
- public class RouteInfoManager {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
- private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
- private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
- private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
- private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
- private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- 1,topicQueueTable
- private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
Map 中 key 存储的是 Topic 的名称, value 存储的是 QueueData 的集合.
QueueData 的集合 size 等于 Topic 对应的 Broker Master 的个数.
QueueData 的数据结构如下:
- public class QueueData implements Comparable<QueueData> {
- private String brokerName; //broker 名字
- private int readQueueNums; // 可读 queue 数
- private int writeQueueNums; // 可写 queue 数
- private int perm; // 读写权限
- private int topicSynFlag; // 同步标识
- 2,brokerAddrTable
- private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
Map 中 key 存储的是 Broker Name, value 存储的是 BrokerData 数据 (Broker 的相关信息).
BrokerData 的数据结构如下:
- public class BrokerData implements Comparable<BrokerData> {
- private String cluster; // 集群名称
- private String brokerName; // Broker Name
- // 存储的是该 Broker Name 对应的多个 Broker 地址信息.
- private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
因为相同的名称的 BrokerName 可以多有个. 一个 Master 和多个 Slave. 所有使用 brokerAddrs 来存储相同 BrokerName 下所有的 Broker 信息 (判断 Master 和 Slave 的关系是通过 Master 和 Slave 名称是否相同, brokerId 为 0 的是 Master, 大于 0 的是 Slave).
- 3,clusterAddrTable
- private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
Map 中 key 存储的是 clusterName 的名称, value 存储的是 brokerName 的集合.
- 4,brokerLiveTable
- private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
Map 中 key 存储的是 brokerAddr 信息, value 存储的是 BrokerLiveInfo 信息, BrokerLiveInfo 中存储了 Broker 的实时状态.
- class BrokerLiveInfo {
- // 最后更新时间
- private long lastUpdateTimestamp;
- private DataVersion dataVersion;
- private Channel channel;
- private String haServerAddr;
上面介绍的 NamesrvController.initialize() 中有一个 schedule 定时任务, 每个 10 秒钟定时调用 scanNotActiveBroker() 方法进行扫描不活动的 Broker, 并把不活动的 Broker 删除掉, 就是判断的 这个 lastUpdateTimestamp 这个数据.
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
超过 2 分钟没有更新这个值, 就认为 Broker 不可用了.
- 5,filterServerTable
- private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Map 中 key 存储的是 brokerAddr 信息, value 存储的是 Filter Server 信息.
Filter Server 是消息的过滤服务器, 一个 Broker 可以对应多个 Filter Server.
来源: http://www.jianshu.com/p/ec69c203020d