package com.carelink.rpc.registry.client.util; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetDataBuilder; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import com.carelink.rpc.registry.client.ZkClient; import com.carelink.rpc.registry.client.ZkConfig; import com.carelink.rpc.registry.client.factory.ZkClientServiceFactory; import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.base.Strings; import com.google.common.collect.Maps; public class RpcRegisteryService { public static final int PROCESS = Runtime.getRuntime().availableProcessors(); private RpcRegisteryService() { } private static class SingletonHolder { static final RpcRegisteryService instance = new RpcRegisteryService(); } public static RpcRegisteryService instance() { return SingletonHolder.instance; } private CuratorFramework zkclient = null; public CuratorFramework getZkclient() { return zkclient; } private void setZkclient(CuratorFramework zkclient) { this.zkclient = zkclient; } /** * 连接ZK 创建初始 * * @param address * 地址 * @param timeout * 超时时间 * @param namespace * 命名空间 * @param group * 组 * @param groupVal * 组节点值 * @param node * 节点 * @param nodeVal * 节点值 */ public void connectZookeeper( String address, int timeout, String namespace, String group, String groupVal, String node, String nodeVal) { if (getZkclient() != null) { return; } CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); builder.connectString(address).connectionTimeoutMs(timeout).sessionTimeoutMs(timeout) .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 2000)); if (!Strings.isNullOrEmpty(namespace)) { builder.namespace(namespace); } setZkclient(builder.build()); RpcConnectionStateListener listener = new RpcConnectionStateListener(group, groupVal, node, nodeVal); getZkclient().getConnectionStateListenable().addListener(listener); getZkclient().start(); // 注入 startRegisterServer(group, groupVal, node, nodeVal); } private void startRegisterServer(String group, String groupVal, String node, String nodeVal) { registerGroup(group, groupVal); registerNode(group, node, nodeVal); } public boolean registerGroup(String group, String groupVal) { return createNode("/" + group, groupVal, CreateMode.PERSISTENT);// 创建持久的 } public boolean registerNode(String group, String node, String nodeVal) { return createNode("/" + group + "/" + node, nodeVal, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时的 } // 设置路径更改监听 public void listenerPathChildren(String group,PathChildrenCacheListener listener) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(PROCESS * 2); @SuppressWarnings("resource") PathChildrenCache childrenCache = new PathChildrenCache(getZkclient(), "/" + group, true); childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener(listener, pool); } /** * 创建节点 * * @param nodeName * @param value * @param createMode * @return * @throws Exception */ public boolean createNode(String nodeName, String value, CreateMode createMode) { boolean suc = false; if (getZkclient() == null) { return suc; } try { Stat stat = getZkclient().checkExists().forPath(nodeName); if (stat == null) { String opResult = null; // 节点判断值为不为空 if (Strings.isNullOrEmpty(value)) { opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName); } else { opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName, value.getBytes(Charsets.UTF_8)); } suc = Objects.equal(nodeName, opResult); } } catch (Exception e) { System.out.println("create node fail! path: " + nodeName + " value: " + value + " CreateMode: " + createMode.name()); e.printStackTrace(); return suc; } return suc; } public void destory() { if(getZkclient()==null){ return; } getZkclient().close(); } /** * 删除节点 * * @param node * @return */ public boolean deleteNode(String node) { if (getZkclient() == null) { return false; } try { Stat stat = getZkclient().checkExists().forPath(node); if (stat != null) { getZkclient().delete().deletingChildrenIfNeeded().forPath(node); } return true; } catch (Exception e) { System.out.println("delete node fail! path: " + node); return false; } } /** * 获取指定节点下的子节点路径和值 * @param node * @return */ public Map<String, String> getChildrenDetail(String node) { if (getZkclient() == null) { return null; } Map<String, String> map = Maps.newHashMap(); try { GetChildrenBuilder childrenBuilder = getZkclient().getChildren(); List<String> children = childrenBuilder.forPath(node); GetDataBuilder dataBuilder = getZkclient().getData(); if (children != null) { for (String child : children) { String propPath = ZKPaths.makePath(node, child); map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8)); } } } catch (Exception e) { System.out.println("get node chilren list fail! path: " + node); return null; } return map; } class RpcConnectionStateListener implements ConnectionStateListener{ private String group; @SuppressWarnings("unused") private String groupVal; private String node; private String nodeVal; public RpcConnectionStateListener(String group, String groupVal,String node,String nodeVal) { this.groupVal = groupVal; this.group = group; this.node = node; this.nodeVal = nodeVal; } @Override public void stateChanged(CuratorFramework cf, ConnectionState state) { if(state == ConnectionState.LOST){ //重新注册 while(true){ //只需要注册节点,组已经是持久的// if(registerNode(group, node, nodeVal)){ break; } } } } } //获取本机IP public static String getLocalHost(String type){ InetAddress addr = null; try { addr = InetAddress.getLocalHost(); if("address".equals(type)){ return addr.getHostName().toString();//获得本机名称 } return addr.getHostAddress().toString();//获得本机IP } catch (UnknownHostException e) { e.printStackTrace(); return ""; } } public static void main(String[] args) throws Exception { String address = "192.168.200.34:2181,192.168.200.44:2181,192.168.200.64:2181"; System.out.println(ZkConfig.forView()); System.out.println(getLocalHost("ip")); ZkClient.startZkRegistery(address,5000); int i = 0; while(true){ try { System.out.println( ZkClientServiceFactory.getRpcClientServcie(0).getServersByGroup(ZkConfig.getZkServerGroup()).size()); System.out.println( ZkClientServiceFactory.getRpcClientServcie(0).getServersByGroup(ZkConfig.getZkServerGroup())); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } i++; if(i>10000){ break; } } } }
package com.carelink.rpc.registry.server.util; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetDataBuilder; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import com.carelink.rpc.registry.server.ZkConfig; import com.carelink.rpc.registry.server.ZkService; import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.base.Strings; import com.google.common.collect.Maps; public class RpcRegisteryService { private RpcRegisteryService() { } private static class SingletonHolder { static final RpcRegisteryService instance = new RpcRegisteryService(); } public static RpcRegisteryService instance() { return SingletonHolder.instance; } private CuratorFramework zkclient = null; private CuratorFramework getZkclient() { return zkclient; } private void setZkclient(CuratorFramework zkclient) { this.zkclient = zkclient; } /** * 连接ZK 创建初始 * * @param address * 地址 * @param timeout * 超时时间 * @param namespace * 命名空间 * @param group * 组 * @param groupVal * 组节点值 * @param node * 节点 * @param nodeVal * 节点值 */ public void connectZookeeper( String address, int timeout, String namespace, String group, String groupVal, String node, String nodeVal) { if (getZkclient() != null) { return; } CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); // ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加. // RetryNTimes:指定最大重试次数的重试策略 // RetryOneTime:仅重试一次 // RetryUntilElapsed:一直重试直到达到规定的时 builder.connectString(address).connectionTimeoutMs(timeout).sessionTimeoutMs(timeout) .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 2000)); if (!Strings.isNullOrEmpty(namespace)) { builder.namespace(namespace); } setZkclient(builder.build()); RpcConnectionStateListener listener = new RpcConnectionStateListener(group, groupVal, node, nodeVal); getZkclient().getConnectionStateListenable().addListener(listener); getZkclient().start(); // 注入服务 startRegisterServer(group, groupVal, node, nodeVal); } private void startRegisterServer(String group, String groupVal, String node, String nodeVal) { registerGroup(group, groupVal); registerNode(group, node, nodeVal); } public boolean registerGroup(String group, String groupVal) { return createNode("/" + group, groupVal, CreateMode.PERSISTENT);// 创建持久的 } public boolean registerNode(String group, String node, String nodeVal) { return createNode("/" + group + "/" + node, nodeVal, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时的 } /** * 创建节点 * * @param nodeName * @param value * @param createMode * @return * @throws Exception */ public boolean createNode(String nodeName, String value, CreateMode createMode) { boolean suc = false; if (getZkclient() == null) { return suc; } try { Stat stat = getZkclient().checkExists().forPath(nodeName); if (stat == null) { String opResult = null; // 节点判断值为不为空 if (Strings.isNullOrEmpty(value)) { opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName); } else { opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName, value.getBytes(Charsets.UTF_8)); } suc = Objects.equal(nodeName, opResult); } } catch (Exception e) { System.out.println("create node fail! path: " + nodeName + " value: " + value + " CreateMode: " + createMode.name()); e.printStackTrace(); return suc; } return suc; } public void destory() { if(getZkclient()==null){ return; } getZkclient().close(); } /** * 删除节点 * * @param node * @return */ public boolean deleteNode(String node) { if (getZkclient() == null) { return false; } try { Stat stat = getZkclient().checkExists().forPath(node); if (stat != null) { getZkclient().delete().deletingChildrenIfNeeded().forPath(node); } return true; } catch (Exception e) { System.out.println("delete node fail! path: " + node); return false; } } /** * 获取指定节点下的子节点路径和值 * @param node * @return */ public Map<String, String> getChildrenDetail(String node) { if (getZkclient() == null) { return null; } Map<String, String> map = Maps.newHashMap(); try { GetChildrenBuilder childrenBuilder = getZkclient().getChildren(); List<String> children = childrenBuilder.forPath(node); GetDataBuilder dataBuilder = getZkclient().getData(); if (children != null) { for (String child : children) { String propPath = ZKPaths.makePath(node, child); map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8)); } } } catch (Exception e) { System.out.println("get node chilren list fail! path: " + node); return null; } return map; } class RpcConnectionStateListener implements ConnectionStateListener{ private String group; @SuppressWarnings("unused") private String groupVal; private String node; private String nodeVal; public RpcConnectionStateListener(String group, String groupVal,String node,String nodeVal) { this.groupVal = groupVal; this.group = group; this.node = node; this.nodeVal = nodeVal; } @Override public void stateChanged(CuratorFramework cf, ConnectionState state) { if(state == ConnectionState.LOST){ //重新注册服务 while(true){ //只需要注册节点,组已经是持久的 if(registerNode(group, node, nodeVal)){ break; } } } } } public static String getLocalHost(String type){ InetAddress addr = null; try { addr = InetAddress.getLocalHost(); if("address".equals(type)){ return addr.getHostName().toString();//获得本机名称 } return addr.getHostAddress().toString();//获得本机IP } catch (UnknownHostException e) { e.printStackTrace(); return ""; } } public static void main(String[] args) { ZkConfig.readConfig(); System.out.println(ZkConfig.forView()); System.out.println(getLocalHost("ip")); ZkService.startZkRegistery(); int i = 0; while(true){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } i++; if(i>10){ break; } } ZkService.stopZkRegistery(); } }
时间: 2024-10-22 10:40:21