故障迁移是集群非常重要的功能; 直白的说就是在集群中部分节点失效时, 能将失效节点负责的键值对迁移到其他节点上, 从而保证整个集群系统在部分节点失效后没有丢失数据, 仍能正常提供服务. 这里先抛开 Redis 实际的做法, 我们可以自己想下对于 Redis 集群应该怎么做故障迁移, 哪些关键点是必须要实现的. 然后再去看 Redis 源码中具体的实现, 是否覆盖了我们想到的关键点, 有哪些设计是我们没有想到的, 这样看代码的效果会比较好.
我在思考故障迁移这个功能时, 首先想到的是节点发生故障时要很快被集群中其他节点发现, 尽量缩短集群不可用的时间; 其次就是要选出失效节点上的数据可以被迁移到哪个节点上; 在选择迁移节点时最好能够考虑节点的负载, 避免迁移造成部分节点负载过高. 另外, 失效节点的数据在其失效前就应该实时的复制到其他节点上, 因为一般情况下节点失效有很大概率是机器不可用, 如果没有事先执行过数据复制, 节点数据就丢失了. 最后, 就是迁移的执行, 除了要将失效节点原有的键值对数据迁移到其他节点上, 还要将失效节点原来负责的槽也迁移到其他节点上, 而且槽和键值对应该同步迁移, 要避免槽被分配到节点 A 而槽所对应的键值对被分配到节点 B 的情况.
总结起来有实现集群故障迁移要实现下面关键点:
1. 节点失效事件能被集群系统很快的发现
2. 迁移时要能选择合适的节点
3. 节点数据需要实时复制, 在失效后可以直接使用复制的数据进行迁移
4. 迁移要注意将槽和键值对同步迁移
看过 Redis 源码后, 发现 Redis 的故障迁移也是以主备复制为基础的, 也就是说需要给每个集群主节点配置从节点, 这样主节点的数据天然就是实时复制的, 在主节点出现故障时, 直接在从节点中选择一个接替失效主节点, 将该从节点升级为主节点并通知到集群中所有其他节点即可, 这样就无需考虑上面提到的第三点和第四点. 如果集群中有节点没有配置从节点, 那么就不支持故障迁移.
故障检测
Redis 的集群是无中心的, 无法通过中心定时向各个节点发送心跳来判断节点是否故障. 在 Redis 源码中故障的检测分三步:
1. 节点互发 ping 消息, 将 Ping 超时的节点置为疑似下线节点
在这一步中, 每个节点都会向其他节点发送 Ping 消息, 来检测其他节点是否和自己的连接有异常. 但要注意的是即便检测到了其他节点 Ping 消息超时, 也不能简单的认为其他节点是失效的, 因为有可能是这个节点自己的网络异常, 无法和其他节点通信. 所以在这一步只是将检测到超时的节点置为疑似下线. 例如: 节点 A 向节点 B 发送 Ping 发现超时, 则 A 会将节点 B 的状态置为疑似下线并保存在自己记录的集群节点信息中, 存储的疑似下线信息就是之前提过的 clusterState.nodes 里对应的失效节点的 flags 状态值.
- // 默认节点超时时限
- #define REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT 15000
2. 向其他节点共享疑似下线节点
在检测到某个节点为疑似下线之后, 会将这个节点的疑似下线情况分享给集群中其他的节点, 分享的方式也是通过互发 Ping 消息, 在 ping 消息中会带上集群中随机的三个节点的状态, 前面在分析集群初始化时, 曾介绍过利用 gossip 协议扩散集群节点状态给整个集群, 这里节点的疑似下线状态也是通过这种方式传播给其他节点的. 每条 ping 消息会带最多三个随机节点的状态信息
- void clusterSendPing(clusterLink *link, int type) { // 随机算去本节点所在集群中的任意两个其他 node 节点 (不包括 link 本节点和 link 对应的节点) 信息发送给 link 对应的节点
- unsigned char buf[sizeof(clusterMsg)];
- clusterMsg *hdr = (clusterMsg*) buf;
- int gossipcount = 0, totlen;
- /* freshnodes is the number of nodes we can still use to populate the
- * gossip section of the ping packet. Basically we start with the nodes
- * we have in memory minus two (ourself and the node we are sending the
- * message to). Every time we add a node we decrement the counter, so when
- * it will drop to <= zero we know there is no more gossip info we can
- * send. */
- int freshnodes = dictSize(server.cluster->nodes)-2; // 除去本节点和接收本 ping 信息的节点外, 整个集群中有多少其他节点
- // 如果发送的信息是 PING , 那么更新最后一次发送 PING 命令的时间戳
- if (link->node && type == CLUSTERMSG_TYPE_PING)
- link->node->ping_sent = mstime();
- // 将当前节点的信息 (比如名字, 地址, 端口号, 负责处理的槽) 记录到消息里面
- clusterBuildMessageHdr(hdr,type);
- /* Populate the gossip fields */
- // 从当前节点已知的节点中随机选出两个节点
- // 并通过这条消息捎带给目标节点, 从而实现 gossip 协议
- // 每个节点有 freshnodes 次发送 gossip 信息的机会
- // 每次向目标节点发送 3 个被选中节点的 gossip 信息(gossipcount 计数)
- while(freshnodes> 0 && gossipcount <3) {
- // 从 nodes 字典中随机选出一个节点(被选中节点)
- dictEntry *de = dictGetRandomKey(server.cluster->nodes);
- clusterNode *this = dictGetVal(de);
- clusterMsgDataGossip *gossip; ////ping pong meet 消息体部分用该结构
- int j;
- if (this == myself ||
- this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
- (this->link == NULL && this->numslots == 0))
- {
- freshnodes--; /* otherwise we may loop forever. */
- continue;
- }
- /* Check if we already added this node */
- // 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面
- // 如果是的话说明这个节点之前已经被选中了
- // 不要再选中它(否则就会出现重复)
- for (j = 0; j <gossipcount; j++) { // 这里是避免前面随机选择 clusterNode 的时候重复选择相同的节点
- if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
- REDIS_CLUSTER_NAMELEN) == 0) break;
- }
- if (j != gossipcount) continue;
- /* Add it */
- // 这个被选中节点有效, 计数器减一
- freshnodes--;
- // 指向 gossip 信息结构
- gossip = &(hdr->data.ping.gossip[gossipcount]);
- // 将被选中节点的名字记录到 gossip 信息
- memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
- // 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息
- gossip->ping_sent = htonl(this->ping_sent);
- // 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息
- gossip->pong_received = htonl(this->pong_received);
- // 将被选中节点的 IP 记录到 gossip 信息
- memcpy(gossip->ip,this->ip,sizeof(this->ip));
- // 将被选中节点的端口号记录到 gossip 信息
- gossip->port = htons(this->port);
- // 将被选中节点的标识值记录到 gossip 信息
- gossip->flags = htons(this->flags);
- // 这个被选中节点有效, 计数器增一
- gossipcount++;
- }
- // 计算信息长度
- totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
- totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
- // 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)
- // 记录在 count 属性里面
- hdr->count = htons(gossipcount);
- // 将信息的长度记录到信息里面
- hdr->totlen = htonl(totlen);
- // 发送信息
- clusterSendMessage(link,buf,totlen);
- }
收到 ping 消息的节点, 如果发现 ping 消息中带的某个节点属于疑似下线状态, 则找到自身记录该节点的 ClusterNode 结构, 并向该结构的下线报告链表中插入一条上报记录, 上报源头为发出 Ping 的节点. 例如: 节点 A 向节点 C 发送了 ping 消息, ping 消息中带上 B 节点状态, 并且 B 节点状态为疑似下线, 那么 C 节点收到这个 Ping 消息之后, 就会查找自身记录节点 B 的 clusterNode, 向这个 clusterNode 的 fail_reports 链表中插入来自 A 的下线报告.
3. 收到集群中超过半数的节点认为某节点处于疑似下线状态, 则判定该节点下线, 并广播
判定的时机是在每次收到一条 ping 消息的时候, 当发现 ping 消息中带有某节点的疑似下线状态后, 除了加入该节点的下线报告以外, 还会调用 markNodeAsFailingIfNeeded 函数来尝试判断该节点是否已经被超过半数的节点判断为疑似下线, 如果是的话, 就将该节点状态置为下线, 并调用 clusterSendFail 函数将下线状态广播给所有已知节点. 这里广播不是通过订阅分发的方式, 而是遍历所有节点, 并给每个节点单独发送消息.
- void clusterSendFail(char *nodename) {
- // 如果超过一半的主节点认为该 nodename 节点下线了, 则需要把该节点下线信息同步到整个 cluster 集群
- unsigned char buf[sizeof(clusterMsg)];
- clusterMsg *hdr = (clusterMsg*) buf;
- // 创建下线消息
- clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
- // 记录命令
- memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
- // 广播消息
- clusterBroadcastMessage(buf,ntohl(hdr->totlen));
- }
- void clusterBroadcastMessage(void *buf, size_t len) { //buf 里面的内容为 clusterMsg+clusterMsgData
- dictIterator *di;
- dictEntry *de;
- // 遍历所有已知节点
- di = dictGetSafeIterator(server.cluster->nodes);
- while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetVal(de);
- // 不向未连接节点发送信息
- if (!node->link) continue;
- // 不向节点自身或者 HANDSHAKE 状态的节点发送信息
- if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
- continue;
- // 发送信息
- clusterSendMessage(node->link,buf,len);
- }
- dictReleaseIterator(di);
从节点判断自己所属的主节点下线, 则开始进入故障转移流程. 如果主节点下只有一个从节点, 那么很自然的可以直接进行切换, 但如果主节点下的从节点不只一个, 那么还需要选出一个新的主节点. 这里的选举过程使用了比较经典的分布式一致性算法 Raft, 下一篇会介绍 Redis 中选举新主节点的过程.
来源: https://www.cnblogs.com/gogoCome/p/9751490.html