用于Leader选举,也可以用Shared Reentrant Lock来实现。
如果需要集群中的固定的一台机器去做的事,就可以用此特性来实现,直到这台Leader死去,会产生新的Leader。
1.直接运行LLtest
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection; import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate; import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientDelete; import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientUpdate; public class LLTest { public static void main(String[] args) throws Exception { LeaderListener.main(null); LeaderListener2.main(null); LeaderListener3.main(null); LeaderListener4.main(null); } }
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.EnsurePath; import java.util.List; public class LeaderListener { public static final String C_PATH = "/TestLeader"; public static final String CHARSET = "UTF-8"; public static final String APP_NAME = "app1"; public static void main(String[] args) { try { new Thread(new Runnable() { @Override public void run() { try{ String zookeeperConnectionString = "127.0.0.1:2181"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); //ensure path of /test new EnsurePath(C_PATH).ensure(client.getZookeeperClient()); final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { try { int timeMilliSeconds = 6000; System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds); //once you take the leader ship //and you want hold the leader ship during the whole life of APP1 //you should use Thread.sleep(Integer.MAX_VALUE) //once tha APP1 dead, the other APP (participants) will reElect an new leader for(int i = 0; i < 6; i++){ System.out.println("===" + APP_NAME + " sleep " + i); Thread.sleep(1000); } }catch (Exception e){ e.printStackTrace(); } } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); leaderSelector.start(); Thread.sleep(Integer.MAX_VALUE); client.close(); }catch (Exception e){ e.printStackTrace(); } } }).start(); }catch (Exception e){ e.printStackTrace(); } } }
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.EnsurePath; public class LeaderListener2 { public static final String C_PATH = "/TestLeader"; public static final String CHARSET = "UTF-8"; public static final String APP_NAME = "app2"; public static void main(String[] args) { try { new Thread(new Runnable() { @Override public void run() { try{ String zookeeperConnectionString = "127.0.0.1:2181"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); //ensure path of /test new EnsurePath(C_PATH).ensure(client.getZookeeperClient()); final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { try { int timeMilliSeconds = 6000; System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds); for(int i = 0; i < 6; i++){ System.out.println("===" + APP_NAME + " sleep " + i); Thread.sleep(1000); } }catch (Exception e){ e.printStackTrace(); } } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); leaderSelector.start(); Thread.sleep(Integer.MAX_VALUE); client.close(); }catch (Exception e){ e.printStackTrace(); } } }).start(); }catch (Exception e){ e.printStackTrace(); } } }
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.EnsurePath; public class LeaderListener3 { public static final String C_PATH = "/TestLeader"; public static final String CHARSET = "UTF-8"; public static final String APP_NAME = "app3"; public static void main(String[] args) { try { new Thread(new Runnable() { @Override public void run() { try{ String zookeeperConnectionString = "127.0.0.1:2181"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); //ensure path of /test new EnsurePath(C_PATH).ensure(client.getZookeeperClient()); final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { try { int timeMilliSeconds = 6000; System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds); for(int i = 0; i < 6; i++){ System.out.println("===" + APP_NAME + " sleep " + i); Thread.sleep(1000); } }catch (Exception e){ e.printStackTrace(); } } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); leaderSelector.start(); Thread.sleep(Integer.MAX_VALUE); client.close(); }catch (Exception e){ e.printStackTrace(); } } }).start(); }catch (Exception e){ e.printStackTrace(); } } }
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.EnsurePath; public class LeaderListener4 { public static final String C_PATH = "/TestLeader"; public static final String CHARSET = "UTF-8"; public static final String APP_NAME = "app4"; public static void main(String[] args) { try { new Thread(new Runnable() { @Override public void run() { try{ String zookeeperConnectionString = "127.0.0.1:2181"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); //ensure path of /test new EnsurePath(C_PATH).ensure(client.getZookeeperClient()); final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { try { int timeMilliSeconds = 6000; System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds); for(int i = 0; i < 6; i++){ System.out.println("===" + APP_NAME + " sleep " + i); Thread.sleep(1000); } }catch (Exception e){ e.printStackTrace(); } } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); leaderSelector.start(); Thread.sleep(Integer.MAX_VALUE); client.close(); }catch (Exception e){ e.printStackTrace(); } } }).start(); }catch (Exception e){ e.printStackTrace(); } } }
时间: 2024-10-05 10:53:25