Daemon Fault Tolerance
Storm 有一些不同的守护进程
Nimbus 负责调度 workers
supervisors 负责运行和杀死 workers
log views 负责访问日志
UI 负责显示集群的状态
What happens when a worker dies?
当一个 worker 死了以后, supervisor 将会重启它. 如果在启动过程中不断的失败, 并且不能发送心跳给 Nimbus, 那么 Nimbus 将重新调度这个 worker.
What happens when a node dies?
分配到这台机器上的任务会超时, 然后 Nimbus 将这些任务分给其它机器来做.
What happens when Nimbus or Supervisor daemons die?
Nimbus 和 Supervisor 守护进程被设计成快速失败的 (当遇到不期望发生的情况时进程会自杀) 并且是无状态的(所有状态都保持在 zookeeper 或者磁盘上).
Nimbus 和 Supervisor 必须运行在被监督的状态下(PS: 必须对它们进行监控). 因此, 如果 Nimbus 或者 Supervisor 守护进程死了以后, 它们会被立即重启, 就好像什么事都发生一样.
尤其是, Nimbus 或者 Supervisors 的死亡对于 worker 进程没有任何影响(PS: 如果它们死了, 没有 worker 会受到影响). 这跟 Hadoop 不一样, Hadoop 中如果 JobTracker 死了, 所有 job 都会丢失.
Is Nimbus a single point of failure?
如果你失去了 Nimbus 节点, worker 仍然会正常工作. 另外, 如果 worker 死了, supervisor 会重启它. 然而, 如果没有 Nimbus, 在某些情况下 wokers 不能被重新分配到其它机器上(比如: 运行 worker 的机器挂了).
自从 1.0.0 版本以后, Storm 的 Nimbus 是高可用的.
Highly Available Nimbus Design
Problem Statement:
目前 Storm master 又叫做 nimbus,nimbus 是一个运行在单个机器上的受监督的进程. 大多数情况下, nimbus 失败是短暂的, 并且它会被 supervisor 重启. 然而, 有时候当磁盘或者网络失败发生的时候, nimbus 就死了. 在这种情况下 topologies 会正常运行, 但是不能提交新的 topologies 了. 为了解决这些问题, 我们采用主备模式运行 nimbus 以此保证即使一个 nimbus 失败了备用的那个可以接替它.
Leader Election(选举):
nimbus 服务器用下面的接口:
public interface ILeaderElector {
/**
* queue up for leadership lock. The call returns immediately and the caller
* must check isLeader() to perform any leadership action.
*/
void addToLeaderLockQueue();
/**
* Removes the caller from the leader lock queue. If the caller is leader
* also releases the lock.
*/
void removeFromLeaderLockQueue();
/**
*
* @return true if the caller currently has the leader lock.
*/
boolean isLeader();
/**
*
* @return the current leader's address , throws exception if noone has has lock.
*/
InetSocketAddress getLeaderAddress();
/**
*
* @return list of current nimbus addresses, includes leader.
*/
List<InetSocketAddress> getAllNimbusAddresses();
}
在启动的时候, nimbus 检查它本地是否有所有激活的 topologies 的 code. 一旦它得到这个检查的状态之后, 它将调用 addToLeaderLockQueue() 方法. 当一个 nimbus 被通知成为一个 leader 的时候, 它会在假设自己是 leadership 角色之前再检查它是不是有所有的 code. 如果它缺少任何一个激活的 topology 的 code, 那么这个节点无法成为 leadership 角色, 于是它将释放这个 lock, 在它为了获取 leader lock 之前它必须等待直到它获得了所有的 code.
第一个实现是基于 zookeeper 的. 如果 zookeeper 连接丢失或者被重置, 造成的结果就是失去 lock, 这种实现关心的是 isLeader() 的状态变化. 如果一个不是 leader 的 nimbus 收到一个请求, 将抛异常.
下面的步骤描述了一个 nimbus 故障转移方案: 假设, 有 4 个 topologies 正在运行, 3 个 nimbus 结点, code-replication-factor = 2. 我们假设 "The leader nimbus has code for all topologies locally" 在开始之前一直是 true. 非 leader 结点 "nonleader-1" 和 "nonleader-2" 各有 2 个 topologies 的 code. 假设 Leader nimbus 死了, 硬盘坏了以至于没有恢复的可能. 这个时候 nonleader-1 收到了 zookeeper 的通知表示它现在是新的 leader, 于是在接受成为 leadership 角色之前它检查它手上是不是有 4 个 topologies(这些 topologies 在 / storm/storms / 目录下)的 code. 它意识到它只有 2 个 topologies 的 code 以至于它必须放弃 lock, 并且查看 / storm/code-distributor/topologyId 目录以找到从哪儿可以下载到它缺失的 topologies. 它发现从 leader nimbus 和 nonleader-2 那儿都可以. 它尝试从这两个地方下载. nonleader-2 也意识到它还缺 2 个 topologies, 并且按照之前相同的方法下载它所缺失的 topologies. 最终, 它们当中至少有一个会获得所有的 code, 于是那个 nimber 将接收 leadership 成为新的 leader.
下面的时序图描述的是 leader 选举和故障转移是如何进行的:
Nimbus state store:
目前, nimbus 存储 2 种数据, 一种是元数据 (比如 supervisor info,assignment info) 被存储在 zookeeper 上, 另一种是实际的 topology 配置和 jars 存储在 nimbus 所在的主机的本地磁盘上.
为了能够成功的故障转移从主切换到备, nimbus state/data 需要被复制到所有的 nimbus 主机上或者需要被存储到一个分布式的存储设备上. 正确的复制数据包含状态管理, 一致性检查, 并且即使不正确也很难测试出来. 然而, 许多 storm 用户不想额外的依赖像 HDFS 那种副本存储系统而且还想高可用. 最终, 我们想到用比特流协议来移动给定大小的代码分布, 而且也是为了当 supervisors 数量很高的时候能获得更好的伸缩性. 为了支持比特流和所有基于副本存储的文件系统, 我们建议用下面的接口:
/**
* Interface responsible to distribute code in the cluster.
*/
public interface ICodeDistributor {
/**
* Prepare this code distributor.
* @param conf
*/
void prepare(Map conf);
/**
* This API will perform the actual upload of the code to the distributed implementation.
* The API should return a Meta file which should have enough information for downloader
* so it can download the code e.g. for bittorrent it will be a torrent file, in case of something
* like HDFS or s3 it might have the actual directory or paths for files to be downloaded.
* @param dirPath local directory where all the code to be distributed exists.
* @param topologyId the topologyId for which the meta file needs to be created.
* @return metaFile
*/
File upload(Path dirPath, String topologyId);
/**
* Given the topologyId and metafile, download the actual code and return the downloaded file's list.
* @param topologyid
* @param metafile
* @param destDirPath the folder where all the files will be downloaded.
* @return
*/
List<File> download(Path destDirPath, String topologyid, File metafile);
/**
* Given the topologyId, returns number of hosts where the code has been replicated.
*/
int getReplicationCount(String topologyId);
/**
* Performs the cleanup.
* @param topologyid
*/
void cleanup(String topologyid);
/**
* Close this distributor.
* @param conf
*/
void close(Map conf);
}
为了支持复制, 我们允许用户指定一个代码复制因子, 这个复制因子表示在开始 topologies 之前代码必须被复制到多少个 nimbus 主机上. 我们把 zookeeper 上维护的激活的 topologies 的列表作为我们的权力, 表示这些 topologies 代码必须存在于 nimbus 主机上. 任何一个没有在 zookeeper 上标记为 active 的所有的 topologies 代码的 nimbus 必须放弃它的 lock, 以至于其它的 nimbus 能够成为 leader. 在所有的 nimbus 主机上都有一个后台线程不断的尝试从其它的主机那里同步代码, 所以只要还有一个种子主机上存在所有的 active 的 topologies, 那么最终至少有一个 nimbus 会变成 leadership.
下面的步骤描述了对于一个 topology 在 nimbus 之间的代码复制过程: 当客户端上传了一个 jar 文件, 传就传了, 什么也不会发生. 而当客户端提交了一个 topology 的时候, leader nimbus 调用 code distributor(代码分发器)的 upload 函数, 这将会在 leader nimbus 本地创建一个 metafile 文件. leader nimbus 将在 zookeeper 上的 / storm/code-distributor/topologyId 目录下写一个新的入口, 以此通知所有的非 leader 的 nimbus 它们应该下载这个新代码. 在用户配置的超时时间内, 客户端必须等待 leader nimbus 确保至少有 N 个非 leader nimbus 已经完成了代码复制. 当一个非 leader nimbus 接收到关于这个新代码的通知的时候, 它从 leader nimbus 那里下载这个 meta file, 并且通过调用代码分发器的 download 函数下载这个 metafile 所代表的真实的代码. 一旦非 leader nimbus 完成了代码下载, 这个非 leader nimubs 会向 zk 的 /storm/code-distributor/topologyId 目录下写一个新的入口以此表明这是一个可以下载代码的 metafile 的位置, 这样做是为了以防万一 leader nimbus 死了. 然后 leader nimbus 继续做它该做的事情.
下面这个时序图描述了在代码分发过程中各个组件之间的通信:
本节重点
守护进程容错
1, 如果 worker 死了, 那么 supervisor 会重启它, 如果还是失败, 则由 nimbus 重新指定机器运行它
2, 如果 worker 所在的机器挂了, 那么这台机器上的所有未完成的任务将分配给其它机器去执行
3, 如果 nimbus 或者 supervisor 死了, 它们会被快速重启, 就好像什么都没发生一样
4,nimbus 和 supervisor 必须有监控, 它们必须运行在监督之下
5,nimbus 或者 supervisor 死了对 worker 进程没有影响
高可用的 Nimbus 设计
1,Nimbus HA 采用的是主备模式, 主节点挂掉以后从节点会接替主节点
2,Nimbus 存储两种类型的数据
元数据, 包括 supervisor info, assignment info(任务分配的信息). 这些信息保存在 zookeeper 中.
实际的 topology 配置和 jars 存储在 nimbus 主机的本地磁盘上
3, 为了能够更好的故障转移, 这些状态以及数据必须被复制到所有的 nimbus 上或者存到一个分布式的存储上. Storm 内部使用的比特流协议来复制的.
4, 用户自定义副本因子来决定代码必须被复制到多少个 nimbus 上
5, 每个 nimbus 都有一个后台线程不断的尝试从其它主机那里同步代码
6, 复制的流程如下:
(1) 当 leader nimbus 收到一个客户端提交的 topology 时, 它调用代码分发器的 upload 方法, 这将在本地创建一个 metafile 来保存 topology 的元数据, 紧接着 zookeeper 的 / storm/code-distributor/topologyId 目录下写一个新的数据, 以此通知所有的 nonleader nimbus 它们应该下载这个新代码;
(2) 客户端在提交这个 topology 以后一直处于等待状态, 直到 leader nimbus 确保至少有 N 个 non leader nimbus 已经完成了代码复制, 或者超时返回;
(3) 当一个 non leader nimbus 收到这样一个通知以后, 首先从 leader nimbus 那里下载 metafile, 然后下载真实的代码, 这些都完成以后它会往 / storm/code-distributor/topologyId 再写一个入口以表明从它那里可以下载代码的 metafile
7,leader 选举是基于 zookeeper 实现的
8, 选举的过程如下:
(1)nimbus 在启动的时候检查自己本地是不是有所有的在 zookeeper 上标记为 active 状态的 topologies 的代码, 如果没有则不能入队, 有的话就调用 addToLeaderLockQueue() 函数以求获得 leadership lock;
(2) 当一个 non leader nimbus 被通知它可以成为新的 leader 的时候, 这个 nimbus 会再次检查它本地是不是有所有的 topologies 的代码, 如果是不是, 那么它必须放弃 lock, 为了再次入队获得 leadership lock 它必须等待直到它收集到所有的代码; 如果是的话, 那么它将成为 leader;
参考
http://storm.apache.org/releases/1.1.1/Daemon-Fault-Tolerance.html
http://storm.apache.org/releases/1.1.1/nimbus-ha-design.html
来源: https://www.cnblogs.com/cjsblog/p/8406401.html