跟着实例学习ZooKeeper的用法: Leader选举

http://colobu.com/2014/12/12/zookeeper-recipes-by-example-1/

ZooKeeper官方给出了使用zookeeper的几种用途

  • Leader Election
  • Barriers
  • Queues
  • Locks
  • Two-phased Commit
  • 其它应用如Name Service, Configuration, Group Membership

在实际使用ZooKeeper开发中,我们最常用的是Apache Curator。 它由Netflix公司贡献给Apache,目前版本2.7.0。

相信你在使用ZK API开发时会遇到让人头疼的几个问题,ZK连接管理、SESSION失效等一些异常问题的处理,Curator替我们解决了这些问题,通过对ZK连接状态的监控来做出相应的重连等操作,并触发事件。

更好的地方是Curator对ZK的一些应用场景提供了非常好的实现,而且有很多扩充,这些都符合ZK使用规范。

它的主要组件为:

  • Recipes, ZooKeeper的系列recipe实现, 基于 Curator Framework.
  • Framework, 封装了大量ZooKeeper常用API操作,降低了使用难度, 基于Zookeeper增加了一些新特性,对ZooKeeper链接的管理,对链接丢失自动重新链接。
  • Utilities,一些ZooKeeper操作的工具类包括ZK的集群测试工具路径生成等非常有用,在Curator-Client包下org.apache.curator.utils。
  • Client,ZooKeeper的客户端API封装,替代官方 ZooKeeper class,解决了一些繁琐低级的处理,提供一些工具类。
  • Errors,异常处理, 连接异常等
  • Extensions,对curator-recipes的扩展实现,拆分为curator-:stuck_out_tongue_closed_eyes:iscovery和curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基于RESTful的Recipes WEB服务.

Recipe 词典的意思是食谱,配方,美食菜谱,烹饪法, 延伸用法:某项计划或步骤来取得预先给定的结果。 在计算机领域没有合适的汉语对应,如果把ZooKeeper看成菜的话,recipe就相当于菜谱, 比如麻婆豆腐, 宫保鸡丁。

由于内容较多, 将会分成多篇文章进行介绍。具体的章节如上面所示。

除了ZK 的”Two-phased Commit”的recipe外, Curator提供了全部的ZK的recipe, 而且分类更详细。 这篇文章将会以实例的方式介绍这些Recipe。 一旦你领会了这些Recipe,就可以在项目中很好的使用ZooKeeper的强大威力。

leader选举

在分布式计算中, leader election是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader或者coordinator. 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader.

除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

Curator 有两种选举recipe, 你可以根据你的需求选择合适的。

Leader latch

首先我们看一个使用LeaderLatch类来选举的例子。

它的构造函数如下:

1

2
public LeaderLatch(CuratorFramework client, String latchPath)

public LeaderLatch(CuratorFramework client, String latchPath,  String id)

必须启动LeaderLatch: leaderLatch.start();

一旦启动, LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader。 你可以随时查看一个给定的实例是否是leader:

1
public boolean hasLeadership()

类似JDK的CountDownLatch, LeaderLatch在请求成为leadership时有block方法:

1

2

3

4

5

6

7

8

9
public void await()

throws InterruptedException,

EOFException

Causes the current thread to wait until this instance acquires leadership

unless the thread is interrupted or closed.

public boolean await(long timeout,

TimeUnit unit)

throws InterruptedException

一旦不使用LeaderLatch了,必须调用close方法。 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。

异常处理

LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时, leader不再认为自己还是leader.当LOST 连接重连后 RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创建一个.

LeaderLatch用户必须考虑导致leadershi丢失的连接问题。 强烈推荐你使用ConnectionStateListener。

下面看例子:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68
package com.colobu.zkrecipe.leaderelection;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.List;

import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.leader.LeaderLatch;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.curator.test.TestingServer;

import org.apache.curator.utils.CloseableUtils;

import com.google.common.collect.Lists;

public class LeaderLatchExample {

private static final int CLIENT_QTY = 10;

private static final String PATH = "/examples/leader";

public static void main(String[] args) throws Exception {

List<CuratorFramework> clients = Lists.newArrayList();

List<LeaderLatch> examples = Lists.newArrayList();

TestingServer server = new TestingServer();

try {

for (int i = 0; i < CLIENT_QTY; ++i) {

CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));

clients.add(client);

LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);

examples.add(example);

client.start();

example.start();

}

Thread.sleep(20000);

LeaderLatch currentLeader = null;

for (int i = 0; i < CLIENT_QTY; ++i) {

LeaderLatch example = examples.get(i);

if (example.hasLeadership())

currentLeader = example;

}

System.out.println("current leader is " + currentLeader.getId());

System.out.println("release the leader " + currentLeader.getId());

currentLeader.close();

examples.get(0).await(2, TimeUnit.SECONDS);

System.out.println("Client #0 maybe is elected as the leader or not although it want to be");

System.out.println("the new leader is " + examples.get(0).getLeader().getId());

System.out.println("Press enter/return to quit\n");

new BufferedReader(new InputStreamReader(System.in)).readLine();

} catch (Exception e) {

e.printStackTrace();

} finally {

System.out.println("Shutting down...");

for (LeaderLatch exampleClient : examples) {

CloseableUtils.closeQuietly(exampleClient);

}

for (CuratorFramework client : clients) {

CloseableUtils.closeQuietly(client);

}

CloseableUtils.closeQuietly(server);

}

}

}

首先我们创建了10个LeaderLatch,启动后它们中的一个会被选举为leader。 因为选举会花费一些时间,start后并不能马上就得到leader。

通过hasLeadership查看自己是否是leader, 如果是的话返回true。

可以通过.getLeader().getId()可以得到当前的leader的ID。

只能通过close释放当前的领导权。

await是一个阻塞方法, 尝试获取leader地位,但是未必能上位。

Leader Election

Curator还提供了另外一种选举方法。

注意涉及以下四个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

重要的是LeaderSelector类,它的构造函数为:

1

2
public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)

public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,必须start: leaderSelector.start();

一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用. 而takeLeadership()方法只有领导权被释放时才返回。

当你不再使用LeaderSelector实例时,应该调用它的close方法。

异常处理

LeaderSelectorListener类继承ConnectionStateListener.LeaderSelector必须小心连接状态的改变. 如果实例成为leader, 它应该相应SUSPENDED 或 LOST. 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是leader了。 如果LOST状态出现, 实例不再是leader, takeLeadership方法返回.

重要: 推荐处理方式是当收到SUSPENDED 或 LOST时抛出CancelLeadershipException异常. 这会导致LeaderSelector实例中断并取消执行takeLeadership方法的异常. 这非常重要, 你必须考虑扩展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推荐的处理逻辑。

这个例子摘自官方

首先创建一个ExampleClient类, 它继承LeaderSelectorListenerAdapter, 它实现了takeLeadership方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46
package com.colobu.zkrecipe.leaderelection;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

import org.apache.curator.framework.recipes.leader.LeaderSelector;

import java.io.Closeable;

import java.io.IOException;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable {

private final String name;

private final LeaderSelector leaderSelector;

private final AtomicInteger leaderCount = new AtomicInteger();

public ExampleClient(CuratorFramework client, String path, String name) {

this.name = name;

leaderSelector = new LeaderSelector(client, path, this);

leaderSelector.autoRequeue();

}

public void start() throws IOException {

leaderSelector.start();

}

@Override

public void close() throws IOException {

leaderSelector.close();

}

@Override

public void takeLeadership(CuratorFramework client) throws Exception {

final int waitSeconds = (int) (5 * Math.random()) + 1;

System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");

System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");

try {

Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));

} catch (InterruptedException e) {

System.err.println(name + " was interrupted.");

Thread.currentThread().interrupt();

} finally {

System.out.println(name + " relinquishing leadership.\n");

}

}

}

你可以在takeLeadership进行任务的分配等等,并且不要返回,如果你想要要此实例一直是leader的话可以加一个死循环。

leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。

在这里我们使用AtomicInteger来记录此client获得领导权的次数, 它是”fair”, 每个client有平等的机会获得领导权。

测试代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47
package com.colobu.zkrecipe.leaderelection;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.util.List;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.leader.LeaderSelector;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.curator.test.TestingServer;

import org.apache.curator.utils.CloseableUtils;

import com.google.common.collect.Lists;

public class LeaderSelectorExample {

private static final int CLIENT_QTY = 10;

private static final String PATH = "/examples/leader";

public static void main(String[] args) throws Exception {

List<CuratorFramework> clients = Lists.newArrayList();

List<ExampleClient> examples = Lists.newArrayList();

TestingServer server = new TestingServer();

try {

for (int i = 0; i < CLIENT_QTY; ++i) {

CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));

clients.add(client);

ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);

examples.add(example);

client.start();

example.start();

}

System.out.println("Press enter/return to quit\n");

new BufferedReader(new InputStreamReader(System.in)).readLine();

} finally {

System.out.println("Shutting down...");

for (ExampleClient exampleClient : examples) {

CloseableUtils.closeQuietly(exampleClient);

}

for (CuratorFramework client : clients) {

CloseableUtils.closeQuietly(client);

}

CloseableUtils.closeQuietly(server);

}

}

}

与LeaderLatch, 通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch一根筋到死, 除非调用close方法,否则它不会释放领导权。

时间: 2024-08-27 02:59:47

跟着实例学习ZooKeeper的用法: Leader选举的相关文章

[转载] 跟着实例学习zookeeper 的用法

原文: http://ifeve.com/zookeeper-curato-framework/ zookeeper 的原生客户端库过于底层, 用户为了使用 zookeeper需要编写大量的代码, 为此Curator框架对 zookeeper 进行了高层次的语义封装, 简化使用 zookeeper 的成本. 只可惜 curator 框架仅仅支持 java 语言, 期待 c++版本出现(或者我们自己尝试实现一个) 跟着实例学习ZooKeeper的用法: Curator框架应用 前面的几篇文章介绍了

跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下: Recipes Framework Utilities Client Errors Extensions 前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等. 还有最后一个组件我们还没有介绍,那就是Curator扩展组件. Recipes组件包含了丰富的Curator应用

ZooKeeper集群Leader选举 —— paxos算法

原理简单来说,就是要选举leader,会生成一个zxid,然后分发给所有的server(所以这里一台server可以接受多台server给他发送要选举leader的请求),然后各个server根据发送给自己的zxid,选择一个值最大的,然后将这个选择返回给发送这个zxid的server,只要这个server收到的答复大于等于2/n+1个(也就是超过半数的同意票),则表明自己当选为leader,然后会向所有server广播自己已经成为leader. 原文地址:https://www.cnblogs

【分布式】Zookeeper的Leader选举

一.前言 前面学习了Zookeeper服务端的相关细节,其中对于集群启动而言,很重要的一部分就是Leader选举,接着就开始深入学习Leader选举. 二.Leader选举 2.1 Leader选举概述 Leader选举是保证分布式数据一致性的关键所在.当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举. (1) 服务器初始化启动. (2) 服务器运行期间无法和Leader保持连接. 下面就两种情况进行分析讲解. 1. 服务器启动时期的Leader选举 若进行

zookeeper应用 - leader选举 锁

模拟leader选举: 1.zookeeper服务器上有一个/leader节点 2.在/leader节点下创建短暂顺序节点/leader/lock-xxxxxxx 3.获取/leader的所有子节点并注册监听 4.拿自己的顺序号跟其他子节点的顺序号比较,如果自己的是最小的则获得leader 5.监听到/leader子节点发生变化则执行步骤3. 4尝试获取leader Client .java package leader; import java.util.List; import java.u

zookeeper leader选举机制

最近看了下zookeeper的源码,先整理下leader选举机制 先看几个关键数据结构和函数 服务可能处于的状态,从名字应该很好理解 public enum ServerState { LOOKING, FOLLOWING, LEADING, OBSERVING; } 选票参数,还有Notification,参数也都差不多 static public class ToSend { long leader; //leader id long zxid; //选票的zxid long electio

zookeeper curator学习(配合spring boot模拟leader选举)

基础知识:http://www.cnblogs.com/LiZhiW/p/4930486.html 项目路径:https://gitee.com/zhangjunqing/spring-boot  查找下面四个项目就可以了 zookeeper版本为zookeeper-3.4.9(需要查找合适的curator版本) 三个spring bootweb项目: 官方案例leader: 思路:新建三个spring boot web项目,在这三个web项目中定义一个过滤器来在初始化时抢夺leader权限,如

Zookeeper详解-工作流和leader选举(三)

一.工作流 一旦ZooKeeper集合启动,它将等待客户端连接.客户端将连接到ZooKeeper集合中的一个节点.它可以是leader或follower节点.一旦客户端被连接,节点将向特定客户端分配会话ID并向该客户端发送确认.如果客户端没有收到确认,它将尝试连接ZooKeeper集合中的另一个节点. 一旦连接到节点,客户端将以有规律的间隔向节点发送心跳,以确保连接不会丢失. 如果客户端想要读取特定的znode,它将会向具有znode路径的节点发送读取请求,并且节点通过从其自己的数据库获取来返回

Zookeeper leader选举

Zookeeper leader选举 由 xpproen 创建,youj 最后一次修改 2016-12-27 让我们分析如何在ZooKeeper集合中选举leader节点.考虑一个集群中有N个节点.leader选举的过程如下: 所有节点创建具有相同路径 /app/leader_election/guid_ 的顺序.临时节点. ZooKeeper集合将附加10位序列号到路径,创建的znode将是 /app/leader_election/guid_0000000001,/app/leader_el