基于zookeeper的分布式锁实现 【转载】

  • 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

    准备工作

    有几个帮助类,先把代码放上来

    ZKClient 对zk的操作做了一个简单的封装

    Java代码

    ZKUtil 针对zk路径的一个工具类

    Java代码

    NetworkUtil 获取本机IP的工具方法

    Java代码

    --------------------------- 正文开始  -----------------------------------

    这种实现非常简单,具体的流程如下


     对应的实现如下

    Java代码

    总结

    网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更

  1. package zk.lock;
  2. import zk.util.NetworkUtil;
  3. import zk.util.ZKUtil;
  4. /**
  5. * User: zhenghui
  6. * Date: 14-3-26
  7. * Time: 下午8:37
  8. * 分布式锁实现.
  9. *
  10. * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
  11. * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
  12. */
  13. public class DistributedLock01 {
  14. private ZKClient zkClient;
  15. public static final String LOCK_ROOT = "/lock";
  16. private String lockName;
  17. public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
  18. //先创建zk链接.
  19. this.createConnection(connectString,sessionTimeout);
  20. this.lockName = lockName;
  21. }
  22. public boolean tryLock(){
  23. String path = ZKUtil.contact(LOCK_ROOT,lockName);
  24. String localIp = NetworkUtil.getNetworkAddress();
  25. try {
  26. if(zkClient.exists(path)){
  27. String ownnerIp = zkClient.getData(path);
  28. if(localIp.equals(ownnerIp)){
  29. return true;
  30. }
  31. } else {
  32. zkClient.createPathIfAbsent(path,false);
  33. if(zkClient.exists(path)){
  34. String ownnerIp = zkClient.getData(path);
  35. if(localIp.equals(ownnerIp)){
  36. return true;
  37. }
  38. }
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. return false;
  44. }
  45. /**
  46. * 创建zk连接
  47. *
  48. */
  49. protected void createConnection(String connectString, int sessionTimeout) throws Exception {
  50. if(zkClient != null){
  51. releaseConnection();
  52. }
  53. zkClient = new ZKClient(connectString,sessionTimeout);
  54. zkClient.createPathIfAbsent(LOCK_ROOT,true);
  55. }
  56. /**
  57. * 关闭ZK连接
  58. */
  59. protected void releaseConnection() throws InterruptedException {
  60. if (zkClient != null) {
  61. zkClient.close();
  62. }
  63. }
  64. }
  1. package zk.util;
  2. import java.net.InetAddress;
  3. import java.net.NetworkInterface;
  4. import java.util.Enumeration;
  5. /**
  6. * User: zhenghui
  7. * Date: 14-4-1
  8. * Time: 下午4:47
  9. */
  10. public class NetworkUtil {
  11. static private final char COLON = ‘:‘;
  12. /**
  13. * 获取当前机器ip地址
  14. * 据说多网卡的时候会有问题.
  15. */
  16. public static String getNetworkAddress() {
  17. Enumeration<NetworkInterface> netInterfaces;
  18. try {
  19. netInterfaces = NetworkInterface.getNetworkInterfaces();
  20. InetAddress ip;
  21. while (netInterfaces.hasMoreElements()) {
  22. NetworkInterface ni = netInterfaces
  23. .nextElement();
  24. Enumeration<InetAddress> addresses=ni.getInetAddresses();
  25. while(addresses.hasMoreElements()){
  26. ip = addresses.nextElement();
  27. if (!ip.isLoopbackAddress()
  28. && ip.getHostAddress().indexOf(COLON) == -1) {
  29. return ip.getHostAddress();
  30. }
  31. }
  32. }
  33. return "";
  34. } catch (Exception e) {
  35. return "";
  36. }
  37. }
  38. }
  1. package zk.util;
  2. /**
  3. * User: zhenghui
  4. * Date: 14-3-26
  5. * Time: 下午9:56
  6. */
  7. public class ZKUtil {
  8. public static final String SEPARATOR = "/";
  9. /**
  10. * 转换path为zk的标准路径 以/开头,最后不带/
  11. */
  12. public static String normalize(String path) {
  13. String temp = path;
  14. if(!path.startsWith(SEPARATOR)) {
  15. temp = SEPARATOR + path;
  16. }
  17. if(path.endsWith(SEPARATOR)) {
  18. temp = temp.substring(0, temp.length()-1);
  19. return normalize(temp);
  20. }else {
  21. return temp;
  22. }
  23. }
  24. /**
  25. * 链接两个path,并转化为zk的标准路径
  26. */
  27. public static String contact(String path1,String path2){
  28. if(path2.startsWith(SEPARATOR)) {
  29. path2 = path2.substring(1);
  30. }
  31. if(path1.endsWith(SEPARATOR)) {
  32. return normalize(path1 + path2);
  33. } else {
  34. return normalize(path1 + SEPARATOR + path2);
  35. }
  36. }
  37. /**
  38. * 字符串转化成byte类型
  39. */
  40. public static byte[] toBytes(String data) {
  41. if(data == null || data.trim().equals("")) return null;
  42. return data.getBytes();
  43. }
  44. }
  1. package zk.lock;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import zk.util.ZKUtil;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * User: zhenghui
  9. * Date: 14-3-26
  10. * Time: 下午8:50
  11. * 封装一个zookeeper实例.
  12. */
  13. public class ZKClient implements Watcher {
  14. private ZooKeeper zookeeper;
  15. private CountDownLatch connectedSemaphore = new CountDownLatch(1);
  16. public ZKClient(String connectString, int sessionTimeout) throws Exception {
  17. zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
  18. System.out.println("connecting zk server");
  19. if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
  20. System.out.println("connect zk server success");
  21. } else {
  22. System.out.println("connect zk server error.");
  23. throw new Exception("connect zk server error.");
  24. }
  25. }
  26. public void close() throws InterruptedException {
  27. if (zookeeper != null) {
  28. zookeeper.close();
  29. }
  30. }
  31. public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
  32. CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
  33. path = ZKUtil.normalize(path);
  34. if (!this.exists(path)) {
  35. zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
  36. }
  37. }
  38. public boolean exists(String path) throws Exception {
  39. path = ZKUtil.normalize(path);
  40. Stat stat = zookeeper.exists(path, null);
  41. return stat != null;
  42. }
  43. public String getData(String path) throws Exception {
  44. path = ZKUtil.normalize(path);
  45. try {
  46. byte[] data = zookeeper.getData(path, null, null);
  47. return new String(data);
  48. } catch (KeeperException e) {
  49. if (e instanceof KeeperException.NoNodeException) {
  50. throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
  51. } else {
  52. throw new Exception(e);
  53. }
  54. } catch (InterruptedException e) {
  55. Thread.currentThread().interrupt();
  56. throw new Exception(e);
  57. }
  58. }
  59. @Override
  60. public void process(WatchedEvent event) {
  61. if (event == null) return;
  62. // 连接状态
  63. Watcher.Event.KeeperState keeperState = event.getState();
  64. // 事件类型
  65. Watcher.Event.EventType eventType = event.getType();
  66. // 受影响的path
  67. //        String path = event.getPath();
  68. if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
  69. // 成功连接上ZK服务器
  70. if (Watcher.Event.EventType.None == eventType) {
  71. System.out.println("zookeeper connect success");
  72. connectedSemaphore.countDown();
  73. }
  74. }
  75. //下面可以做一些重连的工作.
  76. else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
  77. System.out.println("zookeeper Disconnected");
  78. } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
  79. System.out.println("zookeeper AuthFailed");
  80. } else if (Watcher.Event.KeeperState.Expired == keeperState) {
  81. System.out.println("zookeeper Expired");
  82. }
  83. }
  84. }

核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 1.     项目核心代码结构截图   项目模块依赖特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化2.    项目依赖介绍   2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:        2.2 Dubbo独立服务项目依赖如下图: 3.  项目功能部分截图:      zookeeper、dubbo服务启动  dubbo管控台        REST服务平台

时间: 2024-10-10 05:28:29

基于zookeeper的分布式锁实现 【转载】的相关文章

基于zookeeper实现分布式锁

一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 线程锁:大家都不陌生,主要用来给方法.代码块加锁.当某个方法或者代码块使用锁时,那么在同一时刻至多仅有有一个线程在执行该段代码.当有多个线程访问同一对象的加锁方法/代码块时,同一时间只有一个线程在执行,其余线程必须要等待当前线程执行完之后才能执行该代码段.但是,其余线程是可以访问该对象中的非加锁代码块的. 进程锁:也是为了控制同一操作系统中多个进程访问一个共享资源,只是因为

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

spring boot 定时任务基于zookeeper的分布式锁实现

基于ZooKeeper分布式锁的流程 在zookeeper指定节点(locks)下创建临时顺序节点node_n 获取locks下所有子节点children 对子节点按节点自增序号从小到大排序 判断本节点是不是第一个子节点,若是,则获取锁:若不是,则监听比该节点小的那个节点的删除事件 若监听事件生效,则回到第二步重新进行判断,直到获取到锁 具体实现 添加Maven依赖: <?xml version="1.0" encoding="UTF-8"?> <

基于zookeeper的分布式锁实现 【转】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

基于zookeeper的分布式锁实现【转】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

基于Redis实现分布式锁(转载)

原文地址:http://blog.csdn.net/ugg/article/details/41894947 Redis命令介绍使用Redis实现分布式锁,有两个重要函数需要介绍 SETNX命令(SET if Not eXists)语法:SETNX key value功能:当且仅当 key 不存在,将 key 的值设为 value ,并返回1:若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0. GETSET命令语法:GETSET key value功能:将给定 key 的值设为

基于zookeeper的分布式锁实现

需要了解源码的朋友加我QQ:2137028325 框架简介: 本系统一款通用的SOA中间件平台,用来开发各类J2EE企业级应用,节省时间和人力成本.本系统采用MVC模式.AOP引擎.任务调度器.Ajax.拦截器.过滤器.缓存.日志监控.数据访问.表达式.国际化等技术. 框架/平台构成:Maven+Springmvc + Mybatis + Shiro(权限)+ Tiles(模板) +ActiveMQ(消息队列) + Rest(服务) + WebService(服务)+ EHcache(缓存) +

基于redis和zookeeper的分布式锁实现方式

先来说说什么是分布式锁,简单来说,分布式锁就是在分布式并发场景中,能够实现多节点的代码同步的一种机制.从实现角度来看,主要有两种方式:基于redis的方式和基于zookeeper的方式,下面分别简单介绍下这两种方式: 一.基于redis的分布式锁实现 1.获取锁 redis是一种key-value形式的NOSQL数据库,常用于作服务器的缓存.从redis v2.6.12开始,set命令开始变成如下格式: SET key value [EX seconds] [PX milliseconds] [

基于ZooKeeper的分布式Session实现(转)

1.   认识ZooKeeper ZooKeeper—— “动物园管理员”.动物园里当然有好多的动物,游客可以根据动物园提供的向导图到不同的场馆观赏各种类型的动物,而不是像走在原始丛林里,心惊胆颤的被动 物所观赏.为了让各种不同的动物呆在它们应该呆的地方,而不是相互串门,或是相互厮杀,就需要动物园管理员按照动物的各种习性加以分类和管理,这样我们才 能更加放心安全的观赏动物.回到我们企业级应用系统中,随着信息化水平的不断提高,我们的企业级系统变得越来越庞大臃肿,性能急剧下降,客户抱怨频频.拆 分系