ZK基础类及服务的注册与发现:
package top.letsgogo.util; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; import java.util.List; import java.util.Map; /** * @author panteng * @description * @date 17-6-9. */ public class ZkManager { private static String ZKServers = "10.38.164.80:2181,10.38.164.80:2182,10.38.164.80:2183"; private static ZkClient zkClient = new ZkClient(ZKServers, 10000, 10000, new SerializableSerializer()); /** * 遍历所有节点 * * @param currentPath * @param nodes */ public static void getAllNodesAndVlue(String currentPath, Map<String, Object> nodes) { try { List<String> stringList = zkClient.getChildren(currentPath); for (String childPath : stringList) { if ("/".equals(currentPath)) { childPath = currentPath + childPath; } else { childPath = currentPath + "/" + childPath; } try { if (childPath.indexOf("zookeeper") > -1) { continue; } Object nodeVlue = zkClient.readData(childPath); nodes.put(childPath, nodeVlue); } catch (Exception e) { System.out.println("node路径:" + childPath); e.printStackTrace(); } getAllNodesAndVlue(childPath, nodes); } } catch (Exception e) { if (e.getMessage().indexOf("KeeperErrorCode = NoNode for") > -1) { return; } } } /** * 增加不存在的节点,如果节点已经存在,返回"" * * @param path * @param value * @param mode * @return 返回"" 表示增加失败 */ public static String addNode(String path, Object value, CreateMode mode) { try { if (zkClient.exists(path)) { return ""; } return zkClient.create(path, value, mode); } catch (Exception e) { e.printStackTrace(); } return ""; } public static void main2(String[] arges) { ZkManager.addNode("/dao", "data operation", CreateMode.PERSISTENT); ZkManager.addNode("/service", "service provider", CreateMode.PERSISTENT); ZkManager.addNode("/controller", "work control", CreateMode.PERSISTENT); ZkManager.addNode("/dao/pool", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/service/pool", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/controller/pool", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/dao/configration", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/service/configration", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/controller/configration", "machine list", CreateMode.PERSISTENT); /*ZkManager.addNode("/controller/api1", "api1", CreateMode.EPHEMERAL); Map<String, Object> map = new HashMap<>(); ZkManager.getAllNodesAndVlue("/", map); for (Map.Entry entry : map.entrySet()) { System.out.println("path=" + entry.getKey() + " value=" + entry.getValue()); } try { Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); }*/ } }
ZkManager
package top.letsgogo.auto; import com.google.common.base.Strings; import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import top.letsgogo.util.ZkManager; import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author panteng * @description * @date 17-6-9. */ @Component public class ServiceRegisterDiscover implements CommandLineRunner { @Value("${server.port}") private String serverPort; private static String serviceNamePrefix = "dao-api-"; private static String path = "/dao/pool/" + serviceNamePrefix; private static Map<String, List<String>> nextServiceInfo = new HashMap<String, List<String>>(); @Override public void run(String... strings) throws Exception { try { //首先注册向管理中心注册自己的服务 String getPath = ZkManager.addNode(path + getIpAddress() + ":" + serverPort, "config", CreateMode.EPHEMERAL); if (!Strings.isNullOrEmpty(getPath)) { System.out.println(getPath + "服务注册成功"); } //去管理中心发现需要调用的服务 } catch (Exception e) { e.printStackTrace(); } } /** * 获取本机IP * * @return */ public static String getIpAddress() { try { Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces(); InetAddress ip = null; while (allNetInterfaces.hasMoreElements()) { NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement(); if (netInterface.isLoopback() || netInterface.isVirtual() || !netInterface.isUp()) { continue; } else { Enumeration<InetAddress> addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { ip = addresses.nextElement(); if (ip != null && ip instanceof Inet4Address) { return ip.getHostAddress(); } } } } } catch (Exception e) { e.printStackTrace(); } return ""; } }
ServiceRegisterDiscover
/dao/pool/dao-api-10.38.164.80:8080服务注册成功
[zk: localhost:2181(CONNECTED) 0] ls /dao/pool [dao-api-10.38.164.80:8080]
服务查看:
[zk: localhost:2181(CONNECTED) 1] ls /dao/pool [dao-api-10.38.164.80:8081, dao-api-10.38.164.80:8082, dao-api-10.38.164.80:8080] [zk: localhost:2181(CONNECTED) 0] ls / [service, controller, dao, zookeeper]
Service
package top.letsgogo.auto; import com.google.common.base.Strings; import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import top.letsgogo.util.ZkManager; import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.util.*; /** * @author panteng * @description * @date 17-6-9. */ @Component public class ServiceRegisterDiscover implements CommandLineRunner { @Value("${server.port}") private String serverPort; private final static String serviceNamePrefix = "service-api-"; private final static String path = "/service/pool/" + serviceNamePrefix; /** * 被调用的服务名 */ private final static String[] nextServiceName = new String[]{"dao-api-"}; /** * 被调用的服务所在根路径,应该与nextServiceName中的一一对应 */ private final static String[] nextServiceRootPath = new String[]{"/dao/pool"}; private static Map<String, List<String>> nextServiceInfo = new HashMap<String, List<String>>(); @Override public void run(String... strings) throws Exception { try { //首先注册向管理中心注册自己的服务 String getPath = ZkManager.addNode(path + getIpAddress() + ":" + serverPort, "config", CreateMode.EPHEMERAL); if (!Strings.isNullOrEmpty(getPath)) { System.out.println(getPath + "服务注册成功"); } discoverNextServiceInfo(); } catch (Exception e) { e.printStackTrace(); } } /** * 发现服务,并监听变化 */ public static void discoverNextServiceInfo() { //去管理中心发现需要调用的服务 Map<String, Object> map = new HashMap<>(); ZkManager.getAllNodesAndVlue("/", map); for (Map.Entry entry : map.entrySet()) {//遍历所有服务 for (int i = 0; i < nextServiceName.length; i++) { String servicePath = entry.getKey().toString(); if (servicePath.indexOf(nextServiceName[i]) > -1) { List<String> serviceList = nextServiceInfo.get(nextServiceName[i]); if (serviceList == null) { serviceList = new ArrayList<String>(); } serviceList.add(servicePath); nextServiceInfo.put(nextServiceName[i], serviceList); } } } printNextServiceInfo(); //监听节点变化 for (int i = 0; i < nextServiceRootPath.length; i++) { ZkManager.subscribeChildChanges(nextServiceRootPath[i], new ServiceListener(nextServiceName[i])); } } public static void printNextServiceInfo() { for (Map.Entry entry : nextServiceInfo.entrySet()) { System.out.print("发现服务名称:" + entry.getKey() + " 服务实例:"); for (String str : (List<String>) entry.getValue()) { System.out.print(str + ", "); } System.out.println(); } } /** * 获取本机IP * * @return */ public static String getIpAddress() { try { Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces(); InetAddress ip = null; while (allNetInterfaces.hasMoreElements()) { NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement(); if (netInterface.isLoopback() || netInterface.isVirtual() || !netInterface.isUp()) { continue; } else { Enumeration<InetAddress> addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { ip = addresses.nextElement(); if (ip != null && ip instanceof Inet4Address) { return ip.getHostAddress(); } } } } } catch (Exception e) { e.printStackTrace(); } return ""; } public static Map<String, List<String>> getNextServiceInfo() { return nextServiceInfo; } public static void setNextServiceInfo(Map<String, List<String>> nextServiceInfo) { ServiceRegisterDiscover.nextServiceInfo = nextServiceInfo; } }
ServiceRegisterDiscover
package top.letsgogo.auto; import org.I0Itec.zkclient.IZkChildListener; import java.util.List; /** * @author panteng * @description * @date 17-6-10. */ public class ServiceListener implements IZkChildListener { String serviceName; public ServiceListener(String serviceName) { this.serviceName = serviceName; } @Override public void handleChildChange(String s, List<String> list) throws Exception { System.out.println("服务" + serviceName + "发生了变化"); ServiceRegisterDiscover.getNextServiceInfo().put(serviceName, list); ServiceRegisterDiscover.printNextServiceInfo(); } public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; } }
ServiceListener
package top.letsgogo.util; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; import java.util.List; import java.util.Map; /** * @author panteng * @description * @date 17-6-9. */ public class ZkManager { private static String ZKServers = "10.38.164.80:2181,10.38.164.80:2182,10.38.164.80:2183"; private static ZkClient zkClient = new ZkClient(ZKServers, 10000, 10000, new SerializableSerializer()); /** * 遍历所有节点 * * @param currentPath * @param nodes */ public static void getAllNodesAndVlue(String currentPath, Map<String, Object> nodes) { try { List<String> stringList = zkClient.getChildren(currentPath); for (String childPath : stringList) { if ("/".equals(currentPath)) { childPath = currentPath + childPath; } else { childPath = currentPath + "/" + childPath; } try { if (childPath.indexOf("zookeeper") > -1) { continue; } Object nodeVlue = zkClient.readData(childPath); nodes.put(childPath, nodeVlue); } catch (Exception e) { System.out.println("node路径:" + childPath); e.printStackTrace(); } getAllNodesAndVlue(childPath, nodes); } } catch (Exception e) { if (e.getMessage().indexOf("KeeperErrorCode = NoNode for") > -1) { return; } } } /** * 增加不存在的节点,如果节点已经存在,返回"" * * @param path * @param value * @param mode * @return 返回"" 表示增加失败 */ public static String addNode(String path, Object value, CreateMode mode) { try { if (zkClient.exists(path)) { return ""; } return zkClient.create(path, value, mode); } catch (Exception e) { e.printStackTrace(); } return ""; } public static void subscribeChildChanges(String nodePath, IZkChildListener listener) { if (zkClient.exists(nodePath)) { zkClient.subscribeChildChanges(nodePath, listener); } } public static void main2(String[] arges) { ZkManager.addNode("/dao", "data operation", CreateMode.PERSISTENT); ZkManager.addNode("/service", "service provider", CreateMode.PERSISTENT); ZkManager.addNode("/controller", "work control", CreateMode.PERSISTENT); ZkManager.addNode("/dao/pool", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/service/pool", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/controller/pool", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/dao/configration", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/service/configration", "machine list", CreateMode.PERSISTENT); ZkManager.addNode("/controller/configration", "machine list", CreateMode.PERSISTENT); /*ZkManager.addNode("/controller/api1", "api1", CreateMode.EPHEMERAL); Map<String, Object> map = new HashMap<>(); ZkManager.getAllNodesAndVlue("/", map); for (Map.Entry entry : map.entrySet()) { System.out.println("path=" + entry.getKey() + " value=" + entry.getValue()); } try { Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); }*/ } }
ZkManager
/service/pool/service-api-10.232.36.21:8083服务注册成功
发现服务名称:dao-api- 服务实例:/dao/pool/dao-api-10.38.164.80:8081, /dao/pool/dao-api-10.38.164.80:8080, /dao/pool/dao-api-10.38.164.80:8082,
服务dao-api-发生了变化
发现服务名称:dao-api- 服务实例:dao-api-10.38.164.80:8081, dao-api-10.38.164.80:8082,
代码仓库:https://github.com/luckyPT/ZkManager
时间: 2024-10-17 07:39:32