在分布式环境中, 一个应用通常都会部署在多个服务器节点上如果这些应用节点的运行模式是一主多从或者多主多从, 这时就需要用到 Leader 选举策略, 从多个节点中选举出 Master 节点另外, 当某个 Master 节点意外宕机, 这时也需要用到 Leader 选举策略从它的多个 Slave 节点中选举出新的 Master 节点
对于 Leader 选举策略, Apache Curator 框架提供了两种策略, 开发者可以根据实际需求具体选择
(1)添加依赖:
首先需要在 pom.xml 中添加如下依赖:
- <!-- Apache Curator -->
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-x-discovery</artifactId>
- <version>4.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>4.0.0</version>
- <scope>test</scope>
- </dependency>
- (2)Leader Latch:
Apache Curator 框架提供的第一种 Leader 选举策略是 Leader Latch 这种选举策略, 其核心思想是初始化多个 LeaderLatch, 然后在等待几秒钟后, Curator 会自动从中选举出 Leader 示例代码如下:
- package cn.zifangsky.kafkademo.zookeeper;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.leader.LeaderLatch;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.curator.utils.CloseableUtils;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- /**
- * 测试 Apache Curator 框架的两种选举方案
- * @author zifangsky
- *
- */
- public class TestLeaderLatch {
- // 会话超时时间
- private final int SESSION_TIMEOUT = 30 * 1000;
- // 连接超时时间
- private final int CONNECTION_TIMEOUT = 3 * 1000;
- // 客户端数量
- private final int CLIENT_NUMBER = 10;
- //ZooKeeper 服务地址
- private static final String SERVER = "192.168.1.159:2100,192.168.1.159:2101,192.168.1.159:2102";
- private final String PATH = "/curator/latchPath";
- // 创建连接实例
- private CuratorFramework client = null;
- /**
- * baseSleepTimeMs: 初始的重试等待时间
- * maxRetries: 最多重试次数
- */
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- //LeaderLatch 实例集合
- List<LeaderLatch> leaderLatchList = new ArrayList<LeaderLatch>(CLIENT_NUMBER);
- /**
- * 初始化
- * @throws Exception
- */
- @Before
- public void init() throws Exception{
- // 创建 CuratorFrameworkImpl 实例
- client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
- client.start();
- for(int i=0;i<CLIENT_NUMBER;i++){
- // 创建 LeaderLatch 实例
- LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "Client #" + i);
- leaderLatchList.add(leaderLatch);
- leaderLatch.start();
- }
- // 等待 Leader 选举完成
- TimeUnit.SECONDS.sleep(5);
- System.out.println("**********LeaderLatch 初始化完成 **********");
- }
- /**
- * 测试获取当前选举出来的 leader, 以及手动尝试获取领导权
- * @throws Exception
- */
- @Test
- public void testCheckLeader() throws Exception{
- LeaderLatch currentLeader = null;
- for(LeaderLatch tmp : leaderLatchList){
- if(tmp.hasLeadership()){ // 判断是否是 leader
- currentLeader = tmp;
- break;
- }
- }
- System.out.println("当前 leader 是:" + currentLeader.getId());
- // System.out.println("当前 leader 是:" + leaderLatchList.get(0).getLeader().getId());
- /**
- * 从 List 中移除当前主节点, 并从剩下的节点中继续选举 leader
- */
- currentLeader.close(); // 关闭当前主节点
- leaderLatchList.remove(currentLeader); // 从 List 中移除
- TimeUnit.SECONDS.sleep(5); // 等待再次选举
- // 再次获取当前 leader
- for(LeaderLatch tmp : leaderLatchList){
- if(tmp.hasLeadership()){
- currentLeader = tmp;
- break;
- }
- }
- System.out.println("新 leader 是:" + currentLeader.getId());
- currentLeader.close(); // 关闭当前主节点
- leaderLatchList.remove(currentLeader); // 从 List 中移除
- LeaderLatch firstNode = leaderLatchList.get(0); // 获取此时第一个节点
- System.out.println("删除 leader 后, 当前第一个节点:" + firstNode.getId());
- firstNode.await(10, TimeUnit.SECONDS); // 阻塞并尝试获取领导权, 可能失败
- // 再次获取当前 leader
- for(LeaderLatch tmp : leaderLatchList){
- if(tmp.hasLeadership()){
- currentLeader = tmp;
- break;
- }
- }
- System.out.println("最终实际 leader 是:" + currentLeader.getId());
- }
- /**
- * 测试完毕关闭连接
- */
- @After
- public void close(){
- for(LeaderLatch tmp : leaderLatchList){
- CloseableUtils.closeQuietly(tmp);
- }
- CloseableUtils.closeQuietly(client);
- }
- }
关于上述代码, 有以下几点需要简单说明:
i)在初始化 LeaderLatch 的时候, 因为这里只是简单测试, 因此直接在一个 for 循环里完成了整个初始化过程然而在实际的分布式环境中, 每个 LeaderLatch 的初始化过程应该在每个应用节点内部完成(PS: 可以使用多个单元测试模拟)
ii)LeaderLatch 的 await() 方法的含义是阻塞当前线程, 直到当前 LeaderLatch 实例获取领导权当然有可能当前 LeaderLatch 实例直到等待时间结束也没有获取领导权, 原因可能是: 其他线程在某一时刻中断此线程当前 LeaderLatch 实例在某一时刻被关闭其他某个 LeaderLatch 实例一直没有释放领导权等等
iii)Leader Latch 选举的本质是连接 ZooKeeper, 然后在 / curator/latchPath 路径为每个 LeaderLatch 创建临时有序节点:
在创建临时节点时, org.apache.curator.framework.recipes.leader.LeaderLatch 的 checkLeadership(List<String> children) 方法会将选举路径 (/curator/latchPath) 下面的所有节点按照序列号排序, 如果当前节点的序列号最小, 则将该节点设置为 leader 反之则监听比当前节点序列号小一位的节点的状态 (PS: 因为每次都会选择序列号最小的节点为 leader, 所以在比当前节点序列号小一位的节点未被删除前, 当前节点是不可能变成 leader 的) 如果监听的节点被删除, 则会触发重新选举方法 reset()
注: 上图使用的工具是 ZK UI, 具体可以参考我之前的这篇文章: www.zifangsky.cn/1126.html
上面示例代码输出如下:
**********LeaderLatch 初始化完成 **********
当前 leader 是: Client #0
新 leader 是: Client #8
删除 leader 后, 当前第一个节点: Client #1
最终实际 leader 是: Client #9
上面的输出结果很好理解, Client #0 的序列号最小(0000000120), 其次是 Client #8(0000000121)Client #9(0000000122)
思考: 如果上面测试代码的 init()方法是如下示例, 最终选举出来的 leader 顺序是什么样的, 为什么?
- /**
- * 初始化
- * @throws Exception
- */
- @Before
- public void init() throws Exception{
- // 创建 CuratorFrameworkImpl 实例
- client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
- client.start();
- for(int i=0;i<CLIENT_NUMBER;i++){
- // 创建 LeaderLatch 实例
- LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "Client #" + i);
- leaderLatchList.add(leaderLatch);
- leaderLatch.start();
- TimeUnit.SECONDS.sleep(5);
- }
- // 等待 Leader 选举完成
- TimeUnit.SECONDS.sleep(5);
- System.out.println("**********LeaderLatch 初始化完成 **********");
- }
iv)Leader Latch 选举策略在选举出 leader 后, 该 LeaderLatch 实例会一直占有领导权, 直到调用 close() 方法关闭当前主节点, 然后其他 LeaderLatch 实例才会再次选举 leader 这种策略适合主备应用, 当主节点意外宕机之后, 多个从节点会自动选举其中一个为新的主节点(Master 节点)
(3)Leader Election:
Apache Curator 框架提供的另一种 Leader 选举策略是 Leader Election 这种选举策略跟 Leader Latch 选举策略不同之处在于每个实例都能公平获取领导权, 而且当获取领导权的实例在释放领导权之后, 该实例还有机会再次获取领导权另外, 选举出来的 leader 不会一直占有领导权, 当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权示例代码如下:
i)继承 LeaderSelectorListenerAdapter, 用于定义获取领导权后的业务逻辑:
- package cn.zifangsky.kafkademo.zookeeper;
- import java.io.Closeable;
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.leader.LeaderSelector;
- import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
- public class CustomLeaderSelectorListenerAdapter extends
- LeaderSelectorListenerAdapter implements Closeable {
- private String name;
- private LeaderSelector leaderSelector;
- public AtomicInteger leaderCount = new AtomicInteger();
- public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name
- ) {
- this.name = name;
- this.leaderSelector = new LeaderSelector(client, path, this);
- /**
- * 自动重新排队
- * 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
- */
- leaderSelector.autoRequeue();
- }
- public void start() throws IOException {
- leaderSelector.start();
- }
- @Override
- public void close() throws IOException {
- leaderSelector.close();
- }
- /**
- * 获取领导权
- */
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- final int waitSeconds = 2;
- System.out.println(name + "成为当前 leader");
- System.out.println(name + "之前成为 leader 的次数:" + leaderCount.getAndIncrement() + "次");
- //TODO 其他业务代码
- try{
- // 等待 2 秒后放弃领导权(模拟业务执行过程)
- Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
- }catch ( InterruptedException e ){
- System.err.println(name + "已被中断");
- Thread.currentThread().interrupt();
- }finally{
- System.out.println(name + "放弃领导权 \ n");
- }
- }
- }
ii)测试:
- package cn.zifangsky.kafkademo.zookeeper;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.curator.utils.CloseableUtils;
- import org.junit.After;
- import org.junit.Test;
- /**
- * 测试 Apache Curator 框架的两种选举方案
- * @author zifangsky
- *
- */
- public class TestLeaderElection {
- // 会话超时时间
- private final int SESSION_TIMEOUT = 30 * 1000;
- // 连接超时时间
- private final int CONNECTION_TIMEOUT = 3 * 1000;
- // 客户端数量
- private final int CLIENT_NUMBER = 10;
- //ZooKeeper 服务地址
- private static final String SERVER = "192.168.1.159:2100,192.168.1.159:2101,192.168.1.159:2102";
- private final String PATH = "/curator/latchPath";
- // 创建连接实例
- private CuratorFramework client = null;
- /**
- * baseSleepTimeMs: 初始的重试等待时间
- * maxRetries: 最多重试次数
- */
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- // 自定义 LeaderSelectorListenerAdapter 实例集合
- List<CustomLeaderSelectorListenerAdapter> leaderSelectorListenerList
- = new ArrayList<CustomLeaderSelectorListenerAdapter>();
- @Test
- public void test() throws Exception{
- // 创建 CuratorFrameworkImpl 实例
- client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
- client.start();
- for(int i=0;i<CLIENT_NUMBER;i++){
- // 创建 LeaderSelectorListenerAdapter 实例
- CustomLeaderSelectorListenerAdapter leaderSelectorListener =
- new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + i);
- leaderSelectorListener.start();
- leaderSelectorListenerList.add(leaderSelectorListener);
- }
- // 暂停当前线程, 防止单元测试结束, 可以让 leader 选举过程持续进行
- TimeUnit.SECONDS.sleep(600);
- }
- /**
- * 测试完毕关闭连接
- */
- @After
- public void close(){
- for(CustomLeaderSelectorListenerAdapter tmp : leaderSelectorListenerList){
- CloseableUtils.closeQuietly(tmp);
- }
- CloseableUtils.closeQuietly(client);
- }
- }
以上代码需要注意的是:
上面只是简单测试, 为了使模拟过程更加真实, 可以在多个单元测试中实例化并测试选举过程
每个实例在获取领导权后, 如果 takeLeadership(CuratorFramework client) 方法执行结束, 将会释放其领导权
上面输出示例如下:
- Client #1 成为当前 leader
- Client #1 之前成为 leader 的次数: 0 次
- Client #1 放弃领导权
- Client #4 成为当前 leader
- Client #4 之前成为 leader 的次数: 0 次
- Client #4 放弃领导权
- Client #6 成为当前 leader
- Client #6 之前成为 leader 的次数: 0 次
- Client #6 放弃领导权
- Client #7 成为当前 leader
- Client #7 之前成为 leader 的次数: 0 次
- Client #7 放弃领导权
- Client #5 成为当前 leader
- Client #5 之前成为 leader 的次数: 0 次
- Client #5 放弃领导权
- Client #9 成为当前 leader
- Client #9 之前成为 leader 的次数: 0 次
- Client #9 放弃领导权
- Client #3 成为当前 leader
- Client #3 之前成为 leader 的次数: 0 次
- Client #3 放弃领导权
- Client #2 成为当前 leader
- Client #2 之前成为 leader 的次数: 0 次
- Client #2 放弃领导权
- Client #8 成为当前 leader
- Client #8 之前成为 leader 的次数: 0 次
- Client #8 放弃领导权
- Client #0 成为当前 leader
- Client #0 之前成为 leader 的次数: 2 次
- Client #0 放弃领导权
- Client #1 成为当前 leader
- Client #1 之前成为 leader 的次数: 1 次
- Client #1 放弃领导权
- Client #4 成为当前 leader
- Client #4 之前成为 leader 的次数: 1 次
- Client #4 放弃领导权
- Client #6 成为当前 leader
- Client #6 之前成为 leader 的次数: 1 次
- Client #6 放弃领导权
- Client #7 成为当前 leader
- Client #7 之前成为 leader 的次数: 1 次
- Client #7 放弃领导权
- Client #5 成为当前 leader
- Client #5 之前成为 leader 的次数: 1 次
- Client #5 放弃领导权
- Client #9 成为当前 leader
- Client #9 之前成为 leader 的次数: 1 次
- Client #9 放弃领导权
- Client #3 成为当前 leader
- Client #3 之前成为 leader 的次数: 1 次
- Client #3 放弃领导权
- Client #2 成为当前 leader
- Client #2 之前成为 leader 的次数: 1 次
- Client #2 放弃领导权
- Client #8 成为当前 leader
- Client #8 之前成为 leader 的次数: 1 次
- Client #8 放弃领导权
- Client #0 成为当前 leader
- Client #0 之前成为 leader 的次数: 3 次
- Client #0 放弃领导权
参考:
github.com/apache/cura
来源: https://juejin.im/entry/5aa14d92f265da238f1220d2