05.Curator Leader选举

在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

Curator有两种选举recipe,你可以根据你的需求选择合适的。

1.Leader Latch

1.LeaderLatch的简单介绍

首先我们看一个使用LeaderLatch类来选举的例子。它的常用方法如下:

  1. // 构造方法
  2. public LeaderLatch(CuratorFramework client, String latchPath)
  3. public LeaderLatch(CuratorFramework client, String latchPath, String id)
  4. public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
  5. // 查看当前LeaderLatch实例是否是leader
  6. public boolean hasLeadership()
  7. // 尝试让当前LeaderLatch实例称为leader
  8. public void await() throws InterruptedException, EOFException
  9. public boolean await(long timeout, TimeUnit unit) throws InterruptedException

必须启动LeaderLatch: leaderLatch.start();一旦启动,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader。

一旦不使用LeaderLatch了,必须调用close方法。如果它是leader,会释放leadership,其它的参与者将会选举一个leader。

2.异常处理

LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。当 SUSPENDED 或 LOST 时,leader不再认为自己还是leader.当LOST 连接重连后 RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创建一个.

LeaderLatch用户必须考虑导致leadershi丢失的连接问题。强烈推荐你使用ConnectionStateListener。

3.示例程序

  1. public class LeaderLatchExample
  2. {
  3. private static final int CLIENT_QTY = 10;
  4. private static final String PATH = "/examples/leader";
  5. public static void main(String[] args) throws Exception
  6. {
  7. List<CuratorFramework> clients = Lists.newArrayList();
  8. List<LeaderLatch> examples = Lists.newArrayList();
  9. try
  10. {
  11. for (int i = 0; i < CLIENT_QTY; ++i)
  12. {
  13. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  14. clients.add(client);
  15. client.start();
  16. LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);
  17. examples.add(example);
  18. example.start();
  19. }
  20. System.out.println("LeaderLatch初始化完成!");
  21. Thread.sleep(10 * 1000);// 等待Leader选举完成
  22. LeaderLatch currentLeader = null;
  23. for (int i = 0; i < CLIENT_QTY; ++i)
  24. {
  25. LeaderLatch example = examples.get(i);
  26. if (example.hasLeadership())
  27. {
  28. currentLeader = example;
  29. }
  30. }
  31. System.out.println("当前leader:" + currentLeader.getId());
  32. currentLeader.close();
  33. examples.get(0).await(10, TimeUnit.SECONDS);
  34. System.out.println("当前leader:" + examples.get(0).getLeader());
  35. System.out.println("输入回车退出");
  36. new BufferedReader(new InputStreamReader(System.in)).readLine();
  37. }
  38. catch (Exception e)
  39. {
  40. e.printStackTrace();
  41. }
  42. finally
  43. {
  44. for (LeaderLatch exampleClient : examples)
  45. {
  46. System.out.println("当前leader:" + exampleClient.getLeader());
  47. try
  48. {
  49. CloseableUtils.closeQuietly(exampleClient);
  50. }
  51. catch (Exception e)
  52. {
  53. System.out.println(exampleClient.getId() + " -- " + e.getMessage());
  54. }
  55. }
  56. for (CuratorFramework client : clients)
  57. {
  58. CloseableUtils.closeQuietly(client);
  59. }
  60. }
  61. System.out.println("OK!");
  62. }
  63. }

首先我们创建了10个LeaderLatch,启动后它们中的一个会被选举为leader。因为选举会花费一些时间,start后并不能马上就得到leader。

通过hasLeadership查看自己是否是leader,如果是的话返回true。

可以通过.getLeader().getId()可以得到当前的leader的ID。

只能通过close释放当前的领导权。

await是一个阻塞方法, 尝试获取leader地位,但是未必能上位。

注意:LeaderLatch类不能close()多次,LeaderLatch.hasLeadership()与LeaderLatch.getLeader()得到的结果不一定一致,需要通过LeaderLatch.getLeader().isLeader()来判断。

4.运行结果及其分析

上面测试程序运行结果如下:

  1. LeaderLatch初始化完成!
  2. 当前leader:Client #1
  3. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  4. 输入回车退出
  5. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  6. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  7. Client #1 -- Already closed or has not been started
  8. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  9. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  10. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  11. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  12. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  13. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  14. 当前leader:Participant{id=‘Client #8‘, isLeader=true}
  15. 当前leader:Participant{id=‘Client #9‘, isLeader=true}
  16. OK!

使用ZooInspector工具查看Zookeeper数据如下图:

每创建一个LeaderLatch实例并调用其start()方法就会在其Path下创建一个节点,当调用close()方法时就会删除节点。

2.Leader Election

Curator还提供了另外一种选举方法。与Leader latch不同的是这种方法可以对领导权进行控制,在适当的时候释放领导权,这样每个节点都有可能获得领导权。主要涉及以下四个类:

  • LeaderSelector - 选举Leader的角色。
  • LeaderSelectorListener - 选举Leader时的事件监听。
  • LeaderSelectorListenerAdapter - 选举Leader时的事件监听,官方提供的适配器,用于用户扩展。
  • CancelLeadershipException - 取消Leader权异常

1.主要类介绍

重要的是LeaderSelector类,它的构造函数为:

  1. public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
  2. public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
  3. public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener)

类似LeaderLatch,必须start: leaderSelector.start();一旦启动,当实例取得领导权时LeaderSelectorListener的takeLeadership()方法被调用。而takeLeadership()方法执行完毕时领导权会自动释放重新选举。当你不再使用LeaderSelector实例时,应该调用它的close方法。LeaderSelector类中也有hasLeadership()、getLeader()方法。

2.异常处理

LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接状态的改变。如果实例成为leader,它应该相应SUSPENDED 或 LOST。当 SUSPENDED 状态出现时,实例必须假定在重新连接成功之前它可能不再是leader了。如果LOST状态出现,实例不再是leader,takeLeadership方法返回.

注意: 推荐处理方式是当收到SUSPENDED 或 LOST时抛出CancelLeadershipException异常. 这会导致LeaderSelector实例中断并取消执行takeLeadership方法的异常。这非常重要,你必须考虑扩展LeaderSelectorListenerAdapter。LeaderSelectorListenerAdapter提供了推荐的处理逻辑。

3.示例程序

首先创建一个ExampleClient类,它继承LeaderSelectorListenerAdapter,它实现了takeLeadership方法:

  1. public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable
  2. {
  3. private final String name;
  4. private final LeaderSelector leaderSelector;
  5. private final AtomicInteger leaderCount = new AtomicInteger();
  6. public ExampleClient(CuratorFramework client, String path, String name)
  7. {
  8. this.name = name;
  9. leaderSelector = new LeaderSelector(client, path, this);
  10. leaderSelector.autoRequeue();
  11. }
  12. public void start() throws IOException
  13. {
  14. leaderSelector.start();
  15. }
  16. @Override
  17. public void close() throws IOException
  18. {
  19. leaderSelector.close();
  20. }
  21. @Override
  22. public void takeLeadership(CuratorFramework client) throws Exception
  23. {
  24. final int waitSeconds = 1;
  25. System.out.println(name + " 是当前的leader(" + leaderSelector.hasLeadership() + ") 等待" + waitSeconds + "秒...");
  26. System.out.println(name + " 之前成为leader的次数:" + leaderCount.getAndIncrement() + "次");
  27. try
  28. {
  29. Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
  30. }
  31. catch (InterruptedException e)
  32. {
  33. System.err.println(name + " 已被中断");
  34. Thread.currentThread().interrupt();
  35. }
  36. finally
  37. {
  38. System.out.println(name + " 放弃leader\n");
  39. }
  40. }
  41. }

你可以在takeLeadership方法中进行任务的分配等业务处理,并且不要返回(一返回就会释放Leader权),如果你想要要此实例一直是leader的话可以加一个死循环。leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。在这里我们使用AtomicInteger来记录此client获得领导权的次数,每个client有平等的机会获得领导权。

测试代码:

  1. public class LeaderSelectorExample
  2. {
  3. private static final int CLIENT_QTY = 10;
  4. private static final String PATH = "/examples/leader";
  5. public static void main(String[] args) throws Exception
  6. {
  7. List<CuratorFramework> clients = Lists.newArrayList();
  8. List<ExampleClient> examples = Lists.newArrayList();
  9. try
  10. {
  11. for (int i = 0; i < CLIENT_QTY; ++i)
  12. {
  13. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  14. clients.add(client);
  15. client.start();
  16. ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);
  17. examples.add(example);
  18. example.start();
  19. }
  20. System.out.println("输入回车退出:");
  21. new BufferedReader(new InputStreamReader(System.in)).readLine();
  22. }
  23. finally
  24. {
  25. for (ExampleClient exampleClient : examples)
  26. {
  27. CloseableUtils.closeQuietly(exampleClient);
  28. }
  29. for (CuratorFramework client : clients)
  30. {
  31. CloseableUtils.closeQuietly(client);
  32. }
  33. }
  34. System.out.println("OK!");
  35. }
  36. }

4.示例运行结果及分析

运行结果控制台:

  1. 输入回车退出:
  2. Client #4 是当前的leader(true) 等待1秒...
  3. Client #4 之前成为leader的次数:0次
  4. Client #4 放弃leader
  5. Client #5 是当前的leader(true) 等待1秒...
  6. Client #5 之前成为leader的次数:0次
  7. Client #5 已被中断
  8. Client #5 放弃leader
  9. OK!

可以看出:LeaderSelector与LeaderLatch的区别,通过LeaderSelectorListener可以对领导权进行控制,在适当的时候释放领导权,这样每个节点都有可能获得领导权。而LeaderLatch一根筋到死,除非调用close方法,否则它不会释放领导权。

-------------------------------------------------------------------------------------------------------------------------------

来自为知笔记(Wiz)

时间: 2024-10-20 05:20:41

05.Curator Leader选举的相关文章

zookeeper curator学习(配合spring boot模拟leader选举)

基础知识:http://www.cnblogs.com/LiZhiW/p/4930486.html 项目路径:https://gitee.com/zhangjunqing/spring-boot  查找下面四个项目就可以了 zookeeper版本为zookeeper-3.4.9(需要查找合适的curator版本) 三个spring bootweb项目: 官方案例leader: 思路:新建三个spring boot web项目,在这三个web项目中定义一个过滤器来在初始化时抢夺leader权限,如

CentOS环境利用mariadb(mysql)数据库使用golang实现分布式系统的Leader选举

一.准备工作 1.下载安装vmware,步骤省略. 2.下载CentOS系统ios包:http://isoredirect.centos.org/centos/7/isos/x86_64/CentOS-7-x86_64-Everything-1611.iso 3.下载安装Xshell5,步骤省略. 4.下载安装git,步骤省略. 5.mariadb用于golang的api:https://github.com/go-sql-driver/mysql 6.vmware中依次点击"创建新的虚拟机&q

Apache Curator Leader Election

用于Leader选举,也可以用Shared Reentrant Lock来实现. 如果需要集群中的固定的一台机器去做的事,就可以用此特性来实现,直到这台Leader死去,会产生新的Leader. 1.直接运行LLtest package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection; import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate

跟着实例学习ZooKeeper的用法: Leader选举

http://colobu.com/2014/12/12/zookeeper-recipes-by-example-1/ ZooKeeper官方给出了使用zookeeper的几种用途. Leader Election Barriers Queues Locks Two-phased Commit 其它应用如Name Service, Configuration, Group Membership 在实际使用ZooKeeper开发中,我们最常用的是Apache Curator. 它由Netflix

【分布式】Zookeeper的Leader选举

一.前言 前面学习了Zookeeper服务端的相关细节,其中对于集群启动而言,很重要的一部分就是Leader选举,接着就开始深入学习Leader选举. 二.Leader选举 2.1 Leader选举概述 Leader选举是保证分布式数据一致性的关键所在.当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举. (1) 服务器初始化启动. (2) 服务器运行期间无法和Leader保持连接. 下面就两种情况进行分析讲解. 1. 服务器启动时期的Leader选举 若进行

ZOOKEEPER3.3.3源码分析(四)对LEADER选举过程分析的纠正

很抱歉,之前分析的zookeeper leader选举算法有误,特此更正说明. 那里面最大的错误在于,leader选举其实不是在大多数节点通过就能选举上的,这一点与传统的paxos算法不同,因为如果这样的话,就会出现数据丢失的情况.比如某台服务器其实有最多的数据量,按照规则而言应该是leader,但是由于启动晚了,最后只能把leader让给其它的服务器.这里面存在明显的时序问题,也就是说leader服务器启动的早晚会影响整个过程. 实际上并不是这样,leader选举算法只有在收到所有参与服务器的

Raft 为什么是更易理解的分布式一致性算法——(1)Leader在时,由Leader向Follower同步日志 (2)Leader挂掉了,选一个新Leader,Leader选举算法。

转自:http://www.cnblogs.com/mindwind/p/5231986.html Raft 协议的易理解性描述 虽然 Raft 的论文比 Paxos 简单版论文还容易读了,但论文依然发散的比较多,相对冗长.读完后掩卷沉思觉得还是整理一下才会更牢靠,变成真正属于自己的.这里我就借助前面黑白棋落子里第一种极简思维来描述和概念验证下 Raft 协议的工作方式. 在一个由 Raft 协议组织的集群中有三类角色: Leader(领袖) Follower(群众) Candidate(候选人

第四章 Leader选举算法分析

Leader选举 学习leader选举算法,主要是从选举概述,算法分析与源码分析(后续章节写)三个方面进行. Leader选举概述 服务器启动时期的Leader选举 选举的隐式条件便是ZooKeeper的集群规模至少是2台机器,以3台机器组成的服务器集群为例.在服务器集群初始化阶段,当有一台服务器(myid为1,称为Server1)启动的时候,无法完成Leader选举.第二台机器(myid为2,称其为Server2)也启动后,此时这两台机器已经能够进行互相通信,每台机器都试图找到一个Leader

zookeeper应用 - leader选举 锁

模拟leader选举: 1.zookeeper服务器上有一个/leader节点 2.在/leader节点下创建短暂顺序节点/leader/lock-xxxxxxx 3.获取/leader的所有子节点并注册监听 4.拿自己的顺序号跟其他子节点的顺序号比较,如果自己的是最小的则获得leader 5.监听到/leader子节点发生变化则执行步骤3. 4尝试获取leader Client .java package leader; import java.util.List; import java.u