最近公司做了一个备份数据库(保存zk的配置节点数据),当zk发送宕机或者错误的时候,数据丢失后,能够保证mysql中存储的配置数据同步在zk中。相关实现逻辑如下:
首先是zk的一个初始化工具类:操作zk常用的方法。
1 /** 2 * Corearchi.com Inc. 3 * Copyright (c) 2017-2018 All Rights Reserved. 4 */ 5 package com.corearchi.incat.server.common.zkconfig; 6 7 import com.corearchi.incat.server.api.SystemInfo.SystemInfoMsg; 8 import com.corearchi.incat.server.common.request.PageResult; 9 import lombok.extern.slf4j.Slf4j; 10 import org.apache.zookeeper.*; 11 import org.apache.zookeeper.data.Stat; 12 13 import java.io.IOException; 14 import java.text.DecimalFormat; 15 import java.util.List; 16 import java.util.concurrent.CountDownLatch; 17 18 /** 19 * ZooKeeper工具类 20 * 21 * @author DonnieGao 22 * @version Id: ZooKeeperUtils.java, v 0.1 2018/2/28 下午6:28 DonnieGao Exp $$ 23 */ 24 @Slf4j 25 public class ZooKeeperUtils { 26 27 public final static String CONNECT_IP = "10.211.55.10:2181"; 28 29 public static int sessionTimeout = 3000; 30 31 public static final String ROOT_PATH = "/corearchi/config/incat-client"; 32 33 /** 34 * 获取Zookeeper对象 35 * 36 * @return Zookeeper实例 37 * @throws IOException IOException 38 * @throws InterruptedException InterruptedException 39 */ 40 public static ZooKeeper getInstance() throws IOException, InterruptedException { 41 42 final CountDownLatch connectedSignal = new CountDownLatch(1); 43 ZooKeeper zk = new ZooKeeper(CONNECT_IP, sessionTimeout, new Watcher() { 44 @Override 45 public void process(WatchedEvent event) { 46 if (event.getState() == Event.KeeperState.SyncConnected) { 47 connectedSignal.countDown(); 48 } 49 } 50 }); 51 connectedSignal.await(); 52 return zk; 53 } 54 55 /** 56 * 判断总开关是否打开 57 * 58 * @return 59 */ 60 public static boolean isSwitchOpen() { 61 Stat s = new Stat(); 62 byte[] data = null; 63 try { 64 ZooKeeper zk = ZooKeeperUtils.getInstance(); 65 data = zk.getData(ZooKeeperUtils.ROOT_PATH + "/" + "switchOpen", true, s); 66 } catch (Exception e) { 67 System.err.println("获取开关失败" + e.toString()); 68 } 69 return Boolean.parseBoolean(new String(data)); 70 } 71 72 public static boolean detectUsed(SystemInfoMsg systemInfoMsg) throws IOException, InterruptedException, KeeperException { 73 74 boolean flag = false; 75 DecimalFormat df = new DecimalFormat("######0.00"); 76 Integer memTotal = systemInfoMsg.getMemTotal(); 77 Integer memUsed = systemInfoMsg.getMemUsed(); 78 Integer cpuCore = systemInfoMsg.getCpuCore(); 79 Integer cpuUsed = systemInfoMsg.getCpuUsed(); 80 81 if (memUsed != null && memTotal != null && cpuCore != null && cpuUsed !=null) { 82 double men = Double.parseDouble(df.format((memUsed / (memTotal * 1.0)))); 83 if (men >= 0.8 || cpuUsed >= 80) { 84 85 if (isSwitchOpen()) { 86 ZooKeeper zk = getInstance(); 87 zk.setData(ZooKeeperUtils.ROOT_PATH + "/" + "switchOpen", "false".getBytes(), -1); 88 zk.close(); 89 flag = true; 90 } 91 } 92 } 93 94 return flag; 95 96 } 97 98 99 public static void main(String[] args) throws IOException, KeeperException, InterruptedException { 100 ZooKeeper zk = ZooKeeperUtils.getInstance(); 101 Stat s = new Stat(); 102 103 List<String> znodes = zk.getChildren(ZooKeeperUtils.ROOT_PATH, true); 104 for (String znode : znodes) { 105 byte[] data = zk.getData(ZooKeeperUtils.ROOT_PATH + "/" + znode, true, s); 106 System.out.println("获取节点:" + znode + " 的数据是:" 107 + (data == null ? "null" : new String(data))); 108 /*System.out.println(znode);*/ 109 } 110 } 111 }
而且当数据库配置发送改变的时候保证zk节点数据要与数据库数据保持一致,下面是逻辑实现:
1 @Transactional(readOnly = false, isolation = Isolation.DEFAULT, propagation = Propagation.REQUIRED, rollbackFor = Exception.class) 2 public void synZooKeeper() throws Exception { 3 ZooKeeper zk = ZooKeeperUtils.getInstance(); 4 List<ZkConfig> zkConfigs = zkConfigMapper.selectAllZkConfig(); 5 if (null == zk.exists(ZooKeeperUtils.ROOT_PATH, true)) { 6 // 父节点不存在创建父节点 7 String paths = ZooKeeperUtils.ROOT_PATH; 8 String[] split = paths.split("/"); 9 StringBuffer sb = new StringBuffer(); 10 int i = 0; 11 for (String s : split) { 12 i++; 13 if (i == 1) { 14 continue; 15 } 16 sb.append("/").append(s); 17 zk.create(sb.toString(), "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 18 } 19 } 20 21 // 创建子节点,并设置值 22 for (ZkConfig zkConfig : zkConfigs) { 23 String nodePath = ZooKeeperUtils.ROOT_PATH + "/" + zkConfig.getDataKey(); 24 // 判断节点是否存在 25 if (zk.exists(nodePath, true) != null) { 26 zk.setData(nodePath, zkConfig.getDataValue().getBytes(), -1); 27 } else { 28 // 节点不存在 29 zk.create(nodePath, zkConfig.getDataValue().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 30 } 31 } 32 33 zk.close();
当数据库配置数据发送变化的时候,只需调用该方法即可。就能完成zk节点数据与mysql配置数据保持同步。
原文地址:https://www.cnblogs.com/codegeekgao/p/8531257.html
时间: 2024-11-17 14:12:54