- 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
准备工作
有几个帮助类,先把代码放上来
ZKClient 对zk的操作做了一个简单的封装
Java代码
ZKUtil 针对zk路径的一个工具类
Java代码
NetworkUtil 获取本机IP的工具方法
Java代码
--------------------------- 正文开始 -----------------------------------
这种实现非常简单,具体的流程如下
对应的实现如下Java代码
总结
网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更
- package zk.lock;
- import zk.util.NetworkUtil;
- import zk.util.ZKUtil;
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午8:37
- * 分布式锁实现.
- *
- * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
- * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
- */
- public class DistributedLock01 {
- private ZKClient zkClient;
- public static final String LOCK_ROOT = "/lock";
- private String lockName;
- public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
- //先创建zk链接.
- this.createConnection(connectString,sessionTimeout);
- this.lockName = lockName;
- }
- public boolean tryLock(){
- String path = ZKUtil.contact(LOCK_ROOT,lockName);
- String localIp = NetworkUtil.getNetworkAddress();
- try {
- if(zkClient.exists(path)){
- String ownnerIp = zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- return true;
- }
- } else {
- zkClient.createPathIfAbsent(path,false);
- if(zkClient.exists(path)){
- String ownnerIp = zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- return true;
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
- /**
- * 创建zk连接
- *
- */
- protected void createConnection(String connectString, int sessionTimeout) throws Exception {
- if(zkClient != null){
- releaseConnection();
- }
- zkClient = new ZKClient(connectString,sessionTimeout);
- zkClient.createPathIfAbsent(LOCK_ROOT,true);
- }
- /**
- * 关闭ZK连接
- */
- protected void releaseConnection() throws InterruptedException {
- if (zkClient != null) {
- zkClient.close();
- }
- }
- }
- package zk.util;
- import java.net.InetAddress;
- import java.net.NetworkInterface;
- import java.util.Enumeration;
- /**
- * User: zhenghui
- * Date: 14-4-1
- * Time: 下午4:47
- */
- public class NetworkUtil {
- static private final char COLON = ‘:‘;
- /**
- * 获取当前机器ip地址
- * 据说多网卡的时候会有问题.
- */
- public static String getNetworkAddress() {
- Enumeration<NetworkInterface> netInterfaces;
- try {
- netInterfaces = NetworkInterface.getNetworkInterfaces();
- InetAddress ip;
- while (netInterfaces.hasMoreElements()) {
- NetworkInterface ni = netInterfaces
- .nextElement();
- Enumeration<InetAddress> addresses=ni.getInetAddresses();
- while(addresses.hasMoreElements()){
- ip = addresses.nextElement();
- if (!ip.isLoopbackAddress()
- && ip.getHostAddress().indexOf(COLON) == -1) {
- return ip.getHostAddress();
- }
- }
- }
- return "";
- } catch (Exception e) {
- return "";
- }
- }
- }
- package zk.util;
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午9:56
- */
- public class ZKUtil {
- public static final String SEPARATOR = "/";
- /**
- * 转换path为zk的标准路径 以/开头,最后不带/
- */
- public static String normalize(String path) {
- String temp = path;
- if(!path.startsWith(SEPARATOR)) {
- temp = SEPARATOR + path;
- }
- if(path.endsWith(SEPARATOR)) {
- temp = temp.substring(0, temp.length()-1);
- return normalize(temp);
- }else {
- return temp;
- }
- }
- /**
- * 链接两个path,并转化为zk的标准路径
- */
- public static String contact(String path1,String path2){
- if(path2.startsWith(SEPARATOR)) {
- path2 = path2.substring(1);
- }
- if(path1.endsWith(SEPARATOR)) {
- return normalize(path1 + path2);
- } else {
- return normalize(path1 + SEPARATOR + path2);
- }
- }
- /**
- * 字符串转化成byte类型
- */
- public static byte[] toBytes(String data) {
- if(data == null || data.trim().equals("")) return null;
- return data.getBytes();
- }
- }
- package zk.lock;
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import zk.util.ZKUtil;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午8:50
- * 封装一个zookeeper实例.
- */
- public class ZKClient implements Watcher {
- private ZooKeeper zookeeper;
- private CountDownLatch connectedSemaphore = new CountDownLatch(1);
- public ZKClient(String connectString, int sessionTimeout) throws Exception {
- zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
- System.out.println("connecting zk server");
- if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
- System.out.println("connect zk server success");
- } else {
- System.out.println("connect zk server error.");
- throw new Exception("connect zk server error.");
- }
- }
- public void close() throws InterruptedException {
- if (zookeeper != null) {
- zookeeper.close();
- }
- }
- public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
- CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
- path = ZKUtil.normalize(path);
- if (!this.exists(path)) {
- zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
- }
- }
- public boolean exists(String path) throws Exception {
- path = ZKUtil.normalize(path);
- Stat stat = zookeeper.exists(path, null);
- return stat != null;
- }
- public String getData(String path) throws Exception {
- path = ZKUtil.normalize(path);
- try {
- byte[] data = zookeeper.getData(path, null, null);
- return new String(data);
- } catch (KeeperException e) {
- if (e instanceof KeeperException.NoNodeException) {
- throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
- } else {
- throw new Exception(e);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new Exception(e);
- }
- }
- @Override
- public void process(WatchedEvent event) {
- if (event == null) return;
- // 连接状态
- Watcher.Event.KeeperState keeperState = event.getState();
- // 事件类型
- Watcher.Event.EventType eventType = event.getType();
- // 受影响的path
- // String path = event.getPath();
- if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
- // 成功连接上ZK服务器
- if (Watcher.Event.EventType.None == eventType) {
- System.out.println("zookeeper connect success");
- connectedSemaphore.countDown();
- }
- }
- //下面可以做一些重连的工作.
- else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
- System.out.println("zookeeper Disconnected");
- } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
- System.out.println("zookeeper AuthFailed");
- } else if (Watcher.Event.KeeperState.Expired == keeperState) {
- System.out.println("zookeeper Expired");
- }
- }
- }
核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 1. 项目核心代码结构截图 项目模块依赖特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化2. 项目依赖介绍 2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图: 2.2 Dubbo独立服务项目依赖如下图: 3. 项目功能部分截图: zookeeper、dubbo服务启动 dubbo管控台 REST服务平台
时间: 2024-10-10 05:28:29