06.Curator分布式锁

锁:分布式的锁全局同步,这意味着任何一个时间点不会有两个客户端都拥有相同的锁。

1.可重入锁Shared Reentrant Lock

首先我们先看一个全局可重入的锁(可以多次获取,不会被阻塞)。Shared意味着锁是全局可见的,客户端都可以请求锁。Reentrant和JDK的ReentrantLock类似,意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。

1.可重入锁相关类介绍

它是由类InterProcessMutex来实现。它的主要方法:

  1. // 构造方法
  2. public InterProcessMutex(CuratorFramework client, String path)
  3. public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
  4. // 通过acquire获得锁,并提供超时机制:
  5. public void acquire() throws Exception
  6. public boolean acquire(long time, TimeUnit unit) throws Exception
  7. // 撤销锁
  8. public void makeRevocable(RevocationListener<InterProcessMutex> listener)
  9. public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)

错误处理:还是强烈推荐你使用ConnectionStateListener处理连接状态的改变。当连接LOST时你不再拥有锁。

2.编写示例程序

首先让我们创建一个模拟的共享资源, 这个资源期望只能单线程的访问,否则会有并发问题。

  1. public class FakeLimitedResource
  2. {
  3. private final AtomicBoolean inUse = new AtomicBoolean(false);
  4. // 模拟只能单线程操作的资源
  5. public void use() throws InterruptedException
  6. {
  7. if (!inUse.compareAndSet(false, true))
  8. {
  9. // 在正确使用锁的情况下,此异常不可能抛出
  10. throw new IllegalStateException("Needs to be used by one client at a time");
  11. }
  12. try
  13. {
  14. Thread.sleep((long) (3 * Math.random()));
  15. }
  16. finally
  17. {
  18. inUse.set(false);
  19. }
  20. }
  21. }

然后创建一个ExampleClientThatLocks类,它负责请求锁,使用资源,释放锁这样一个完整的访问过程。

  1. public class ExampleClientThatLocks
  2. {
  3. private final InterProcessMutex lock;
  4. private final FakeLimitedResource resource;
  5. private final String clientName;
  6. public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName)
  7. {
  8. this.resource = resource;
  9. this.clientName = clientName;
  10. lock = new InterProcessMutex(client, lockPath);
  11. }
  12. public void doWork(long time, TimeUnit unit) throws Exception
  13. {
  14. if (!lock.acquire(time, unit))
  15. {
  16. throw new IllegalStateException(clientName + " 不能得到互斥锁");
  17. }
  18. try
  19. {
  20. System.out.println(clientName + " 已获取到互斥锁");
  21. resource.use(); // 使用资源
  22. Thread.sleep(1000 * 1);
  23. }
  24. finally
  25. {
  26. System.out.println(clientName + " 释放互斥锁");
  27. lock.release(); // 总是在finally中释放
  28. }
  29. }
  30. }

最后创建主程序来测试:

  1. public class InterProcessMutexExample
  2. {
  3. private static final int QTY = 5;
  4. private static final int REPETITIONS = QTY * 10;
  5. private static final String PATH = "/examples/locks";
  6. public static void main(String[] args) throws Exception
  7. {
  8. final FakeLimitedResource resource = new FakeLimitedResource();
  9. final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>();
  10. for (int i = 0; i < QTY; i++)
  11. {
  12. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  13. client.start();
  14. clientList.add(client);
  15. }
  16. System.out.println("连接初始化完成!");
  17. ExecutorService service = Executors.newFixedThreadPool(QTY);
  18. for (int i = 0; i < QTY; ++i)
  19. {
  20. final int index = i;
  21. Callable<Void> task = new Callable<Void>()
  22. {
  23. @Override
  24. public Void call() throws Exception
  25. {
  26. try
  27. {
  28. final ExampleClientThatLocks example = new ExampleClientThatLocks(clientList.get(index), PATH, resource, "Client " + index);
  29. for (int j = 0; j < REPETITIONS; ++j)
  30. {
  31. example.doWork(10, TimeUnit.SECONDS);
  32. }
  33. }
  34. catch (Throwable e)
  35. {
  36. e.printStackTrace();
  37. }
  38. finally
  39. {
  40. CloseableUtils.closeQuietly(clientList.get(index));
  41. }
  42. return null;
  43. }
  44. };
  45. service.submit(task);
  46. }
  47. service.shutdown();
  48. service.awaitTermination(10, TimeUnit.MINUTES);
  49. System.out.println("OK!");
  50. }
  51. }

代码也很简单,生成5个client,每个client重复执行10次 请求锁--访问资源--释放锁的过程。每个client都在独立的线程中。

结果可以看到,锁是随机的被每个实例排他性的使用。

既然是可重入锁,你可以在一个线程中多次调用acquire,在线程拥有锁时它总是返回true。

注意:你不应该在多个线程中用同一个InterProcessMutex, 你可以在每个线程中都生成一个InterProcessMutex实例,它们的path都一样,这样它们可以共享同一个锁。

3.示例运行结果

运行结果控制台如下:

  1. 连接初始化完成!
  2. Client 4 已获取到互斥锁
  3. Client 4 释放互斥锁
  4. Client 3 已获取到互斥锁
  5. Client 3 释放互斥锁
  6. ......
  7. Client 2 已获取到互斥锁
  8. Client 2 释放互斥锁
  9. OK!

运行时查看Zookeeper节点信息如下:

2.不可重入锁Shared Lock

这个锁和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。这个类是InterProcessSemaphoreMutex使用方法和上面的类类似

首先我们将上面的例子修改一下,测试一下它的重入。修改ExampleClientThatLocks.doWork,连续两次acquire:

  1. public void doWork(long time, TimeUnit unit) throws Exception
  2. {
  3. if (!lock.acquire(time, unit))
  4. {
  5. throw new IllegalStateException(clientName + " 不能得到互斥锁");
  6. }
  7. System.out.println(clientName + " 已获取到互斥锁");
  8. if (!lock.acquire(time, unit))
  9. {
  10. throw new IllegalStateException(clientName + " 不能得到互斥锁");
  11. }
  12. System.out.println(clientName + " 再次获取到互斥锁");
  13. try
  14. {
  15. resource.use(); // 使用资源
  16. Thread.sleep(1000 * 1);
  17. }
  18. finally
  19. {
  20. System.out.println(clientName + " 释放互斥锁");
  21. lock.release(); // 总是在finally中释放
  22. lock.release(); // 获取锁几次 释放锁也要几次
  23. }
  24. }

注意:我们也需要调用release两次。这和JDK的ReentrantLock用法一致。如果少调用一次release,则此线程依然拥有锁。

上面的代码没有问题,我们可以多次调用acquire,后续的acquire也不会阻塞。

但是将上面的InterProcessMutex换成不可重入锁InterProcessSemaphoreMutex,如果再运行上面的代码,结果就会发现线程被阻塞在第二个acquire上,直到超时。也就是此锁不是可重入的。

3.可重入读写锁Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。

此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 --->读锁 ---->释放写锁。从读锁升级成写锁是不行的。

1.可重入读写锁相关类介绍

可重入读写锁主要由两个类实现:InterProcessReadWriteLock、InterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex。

2.编写示例程序

示例程序仍使用上面的FakeLimitedResource、InterProcessMutexExample类

  1. public class ExampleClientReadWriteLocks
  2. {
  3. private final InterProcessReadWriteLock lock;
  4. private final InterProcessMutex readLock;
  5. private final InterProcessMutex writeLock;
  6. private final FakeLimitedResource resource;
  7. private final String clientName;
  8. public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName)
  9. {
  10. this.resource = resource;
  11. this.clientName = clientName;
  12. lock = new InterProcessReadWriteLock(client, lockPath);
  13. readLock = lock.readLock();
  14. writeLock = lock.writeLock();
  15. }
  16. public void doWork(long time, TimeUnit unit) throws Exception
  17. {
  18. // 注意只能先得到写锁再得到读锁,不能反过来!!!
  19. if (!writeLock.acquire(time, unit))
  20. {
  21. throw new IllegalStateException(clientName + " 不能得到写锁");
  22. }
  23. System.out.println(clientName + " 已得到写锁");
  24. if (!readLock.acquire(time, unit))
  25. {
  26. throw new IllegalStateException(clientName + " 不能得到读锁");
  27. }
  28. System.out.println(clientName + " 已得到读锁");
  29. try
  30. {
  31. resource.use(); // 使用资源
  32. Thread.sleep(1000 * 1);
  33. }
  34. finally
  35. {
  36. System.out.println(clientName + " 释放读写锁");
  37. readLock.release();
  38. writeLock.release();
  39. }
  40. }
  41. }

在这个类中我们首先请求了一个写锁,然后降级成读锁。执行业务处理,然后释放读写锁。修改InterProcessMutexExample类中的ExampleClientThatLocks为ExampleClientReadWriteLocks然后运行示例。

3.示例运行结果

运行结果控制台:

  1. 连接初始化完成!
  2. Client 1 已得到写锁
  3. Client 1 已得到读锁
  4. Client 1 释放读写锁
  5. ......
  6. Client 3 已得到写锁
  7. Client 3 已得到读锁
  8. Client 3 释放读写锁
  9. OK!

此时查看Zookeeper数据节点如下:

4.信号量Shared Semaphore

一个计数的信号量类似JDK的Semaphore。JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)

有两种方式可以决定semaphore的最大租约数。第一种方式是有用户给定的path决定。第二种方式使用SharedCountReader类。

如果不使用SharedCountReader,没有内部代码检查进程是否假定有10个租约而进程B假定有20个租约。 所以所有的实例必须使用相同的numberOfLeases值.

1.信号量实现类说明

主要类有:

  • InterProcessSemaphoreV2 - 信号量实现类
  • Lease - 租约(单个信号)
  • SharedCountReader - 计数器,用于计算最大租约数量

这次调用acquire会返回一个租约对象。客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。但是,如果客户端session由于某种原因比如crash丢掉,那么这些客户端持有的租约会自动close,这样其它客户端可以继续使用这些租约。

租约还可以通过下面的方式返还:

  1. public void returnLease(Lease lease)
  2. public void returnAll(Collection<Lease> leases)

注意一次你可以请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。同时还提供了超时的重载方法。

  1. public Lease acquire() throws Exception
  2. public Collection<Lease> acquire(int qty) throws Exception
  3. public Lease acquire(long time, TimeUnit unit) throws Exception
  4. public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception

2.编写示例程序

  1. public class InterProcessSemaphoreExample
  2. {
  3. private static final int MAX_LEASE = 10;
  4. private static final String PATH = "/examples/locks";
  5. public static void main(String[] args) throws Exception
  6. {
  7. FakeLimitedResource resource = new FakeLimitedResource();
  8. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  9. client.start();
  10. InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
  11. Collection<Lease> leases = semaphore.acquire(5);
  12. System.out.println("获取租约数量:" + leases.size());
  13. Lease lease = semaphore.acquire();
  14. System.out.println("获取单个租约");
  15. resource.use();
  16. Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
  17. System.out.println("获取租约,如果为空则超时: " + leases2);
  18. System.out.println("释放租约");
  19. semaphore.returnLease(lease);
  20. System.out.println("释放集合中的所有租约");
  21. semaphore.returnAll(leases);
  22. client.close();
  23. System.out.println("OK!");
  24. }
  25. }

首先我们先获得了5个租约,接着请求了一个租约,因为semaphore还有5个租约,所以请求可以满足,返回一个租约,还剩4个租约。

然后再请求一个租约,因为租约不够,阻塞到超时,还是没能满足,返回结果为null。

3.示例运行结果

运行结果控制台如下:

  1. 获取租约数量:5
  2. 获取单个租约
  3. 获取租约,如果为空则超时: null
  4. 释放租约
  5. 释放集合中的所有租约
  6. OK!

此时查看Zookeeper数据节点如下:

注意:上面所讲的4种锁都是公平锁(fair)。从ZooKeeper的角度看,每个客户端都按照请求的顺序获得锁。相当公平。

5.多锁对象 Multi Shared Lock

Multi Shared Lock是一个锁的容器。当调用acquire,所有的锁都会被acquire,如果请求失败,所有的锁都会被release。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。

1.主要类说明

主要涉及两个类:

  • InterProcessMultiLock - 对所对象实现类
  • InterProcessLock - 分布式锁接口类

它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。用法和Shared Lock相同。

  1. public InterProcessMultiLock(CuratorFramework client, List<String> paths)
  2. public InterProcessMultiLock(List<InterProcessLock> locks)

2.编写示例程序

  1. public class InterProcessMultiLockExample
  2. {
  3. private static final String PATH1 = "/examples/locks1";
  4. private static final String PATH2 = "/examples/locks2";
  5. public static void main(String[] args) throws Exception
  6. {
  7. FakeLimitedResource resource = new FakeLimitedResource();
  8. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  9. client.start();
  10. InterProcessLock lock1 = new InterProcessMutex(client, PATH1); // 可重入锁
  11. InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); // 不可重入锁
  12. InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
  13. if (!lock.acquire(10, TimeUnit.SECONDS))
  14. {
  15. throw new IllegalStateException("不能获取多锁");
  16. }
  17. System.out.println("已获取多锁");
  18. System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
  19. System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
  20. try
  21. {
  22. resource.use(); // 资源操作
  23. }
  24. finally
  25. {
  26. System.out.println("释放多个锁");
  27. lock.release(); // 释放多锁
  28. }
  29. System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
  30. System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
  31. client.close();
  32. System.out.println("OK!");
  33. }
  34. }

新建一个InterProcessMultiLock,包含一个重入锁和一个非重入锁。调用acquire后可以看到线程同时拥有了这两个锁。调用release看到这两个锁都被释放了。

注意:再重申一遍,强烈推荐使用ConnectionStateListener监控连接的状态。

3.示例运行结果

运行结果控制台如下:

  1. 已获取多锁
  2. 是否有第一个锁: true
  3. 是否有第二个锁: true
  4. 释放多个锁
  5. 是否有第一个锁: false
  6. 是否有第二个锁: false
  7. OK!

此时查看Zookeeper数据节点如下:

-------------------------------------------------------------------------------------------------------------------------------

来自为知笔记(Wiz)

时间: 2024-08-05 07:05:03

06.Curator分布式锁的相关文章

curator 分布式锁InterProcessMutex

写这篇文章的目的主要是为了记录下自己在zookeeper 锁上踩过的坑,以及踩坑之后自己的一点认识; 从zk分布式锁原理说起,原理很简单,大家也应该都知道,简单的说就是zookeeper实现分布式锁是通过在zk集群上的路径实现的,在获取分布式锁的时候在zk服务器集群节点上创建临时顺序节点,释放锁的时候删除该临时节点. 多么简单的一句话,但是当你实现起来,想去做点优化的时候往往会变得很难,难的我们后续说; 再从需求说起,需求就是加锁,但是由于原来吞吐量不是很大,只是配置了一个固定的锁路径,但是却不

SpringBoot电商项目实战 — Zookeeper的分布式锁实现

上一篇演示了基于Redis的Redisson分布式锁实现,那今天我要再来说说基于Zookeeper的分布式现实. Zookeeper分布式锁实现 要用Zookeeper实现分布式锁,我就不得不说说zookeeper的数据存储.首先zookeeper的核心保存结构是一个DataTree数据结构,其实内部是一个Map<String, DataNode> nodes的数据结构,其中key是path,DataNode才是真正保存数据的核心数据结构,DataNode核心字段包括byte data[]用于

【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

前言 前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别. 在分布式系统中,常见的分布式锁实现方案还有Zookeeper,接下来会深入研究Zookeeper是如何来实现分布式锁的. Zookeeper初识 文件系统 Zookeeper维护一个类似文件系统的数据结构 image.png 每个子目录项如NameService都被称为znoed,和文件系统一样,我们能够自由的增加.删除znode,在znode下增加.删除子znode,唯一不同的在于znode是

【zookeeper】Apache curator的使用及zk分布式锁实现

上篇,本篇主要讲Apache开源的curator的使用,有了curator,利用Java对zookeeper的操作变得极度便捷. 其实在学之前我也有个疑虑,我为啥要学curator,撇开涨薪这些外在的东西,就单技术层面来讲,学curator能帮我做些什么?这就不得不从zookeeper说起,上篇我已经大篇幅讲了zk是做什么的了,但真正要靠zk去实现多服务器自动拉取更新的配置文件等功能是非常难的,如果没有curator,直接去写的话基本上能把你累哭,就好比连Mybatis或者jpa都没有,让你用原

分布式锁-curator实现

理论篇: Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要

五、Curator使用:分布式锁

分布式锁介绍 分布式执行一些不需要同时执行的复杂任务,curator利用zk的特质,实现了这个选举过程.其实就是利用了多个zk客户端在同一个位置建节点,只会有一个客户端建立成功这个特性.来实现同一时间,只会选择一个客户端执行任务 代码 //分布式锁 InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path"); CountDownLatch down = new CountDownLatch(1); for (int i

Curator实现zookeeper分布式锁的基本原理

一.写在前面 之前写过一篇文章(<拜托,面试请不要再问我Redis分布式锁的实现原理>),给大家说了一下Redisson这个开源框架是如何实现Redis分布式锁原理的,这篇文章再给大家聊一下ZooKeeper实现分布式锁的原理. 同理,我是直接基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现. 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省事儿的方式. 二.ZooKeeper分布

spring整合curator实现分布式锁

为什么要有分布式锁? 比如说,我们要下单,分为两个操作,下单成功(订单服务),扣减库存(商品服务).如果没有锁的话,同时两个请求进来.先检查有没有库存,一看都有,然后下订单,减库存.这时候肯定会出现错误.我们想要的结果是.只有一个请求可以进来.当完成这个操作之后,下一个请求再进来.这才不会出现库存卖超的现象.这时候,就需要我们使用分布式锁来实现. 实现分布式锁的方法有很多种.redis,zk都可以.但是还是推荐zk. 先说下大体思路: 首先,我们在下单的时候,先获取锁.如果获取成功,就进行我们下

高级java必会系列一:zookeeper分布式锁

方案1: 算法思路:利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁.解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁.特点:这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单.缺点是会产生"惊群"效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁.方案2:算法思路:临时顺序节点实现共享锁        客户端调用create(