Apache Curator Leader Election

用于Leader选举,也可以用Shared Reentrant Lock来实现。

如果需要集群中的固定的一台机器去做的事,就可以用此特性来实现,直到这台Leader死去,会产生新的Leader。

1.直接运行LLtest

package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientDelete;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientUpdate;

public class LLTest {
    public static void main(String[] args) throws Exception {
        LeaderListener.main(null);
        LeaderListener2.main(null);
        LeaderListener3.main(null);
        LeaderListener4.main(null);
    }
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

import java.util.List;

public class LeaderListener {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app1";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    //once you take the leader ship
                                    //and you want hold the leader ship during the whole life of APP1
                                    //you should use Thread.sleep(Integer.MAX_VALUE)
                                    //once tha APP1 dead, the other APP (participants) will reElect an new leader
                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener2 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app2";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener3 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app3";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener4 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app4";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
时间: 2024-10-05 10:53:25

Apache Curator Leader Election的相关文章

Apache Curator入门实战

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量. 1.Zookeeper安装部署 Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行. [root@vm Temp]$ wget http://mirror.bi

05.Curator Leader选举

在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点.在任务开始前,哪个节点都不知道谁是leader或者coordinator.当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader.除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来. Curator有两种选举recipe,你可以根据你的需求选择合适的. 1.Leader Latch 1.LeaderLatch

15. 使用Apache Curator管理ZooKeeper

Apache ZooKeeper是为了帮助解决复杂问题的软件工具,它可以帮助用户从复杂的实现中解救出来. 然而,ZooKeeper只暴露了原语,这取决于用户如何使用这些原语来解决应用程序中的协调问题. 社区已经在ZooKeeper数据模型及其API之上开发了高级框架. Apache Curator是一个高级的包装类库和框架,使得ZooKeeper非常简单易用. Tips Curator最初由Netflix开发,现在是一个Apache项目. 项目页面位于http://curator.apache.

使用Apache Curator监控Zookeeper的Node和Path的状态

1.Zookeeper经常被我们用来做配置管理,配置的管理在分布式应用环境中很常见,例如同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样非常麻烦而且容易出错.像这样的配置信息完全可以交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收

Zookeeper 学习笔记之 Leader Election

ZooKeeper四种节点类型: Persist Persist_Sequential Ephemeral Ephemeral_Sequential 在节点上可注册的Watch,客户端先得到通知再得到数据,Watch被fire后,不会再Watch到后续的变化. 基于ZooKeeper做Leader Election 非公平模式 - 客户端会在Persist父节点下创建Ephemeral的Leader节点,只不过是大家抢占式注册,先到先得.即使第一次排在前面,对第二次竞选也不会有影响,所以称为非公

Apache Curator Node Cache Watcher

只能监听某一路径本身的add,delete,update 1.run NodeListener 2.run NLTest package com.collonn.javaUtilMvn.zookeeper.curator.NodeCache; public class NLTest { public static void main(String[] args) throws Exception { NLClientCreate.main(null); Thread.sleep(1000 * 2

Apache Curator Path Cache Watcher

可以监控某一路径的直接子结点(一级子结点)变化,add,update,delete. 利用此特性可以很方便的监控集群中的所有结点,当然也就很方便的可以实现简单的key.hashCode()%serverCount式的分布式计算,还可以实现简单的定制规则的负载均衡. 1.run ChildrenListener 2.run CLTest package com.collonn.javaUtilMvn.zookeeper.curator.PathCache; public class CLTest

Apache Curator Tree Cache Watcher

可以监控某一路径下的子结点(所有子结节,不管有多少层子结点)变化. 比NodeCache方便的是,可以监听一群结点,而不用一个节点一个节点的去设置监听 1.run TreeListener 2.run TLTest package com.collonn.javaUtilMvn.zookeeper.curator.TreeCache; public class TLTest { public static void main(String[] args) throws Exception { T

Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,需要手动操作 Watch注册一次就会失效,需要反复注册 不支持递归创建节点 Apache curator: Apache 的开源项目 解决Watch注册一次就会失效的问题 提供的 API 更加简单易用 提供更多解决方案并且实现简单,例如: