07.Curator Barrier

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。

比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

1.栅栏Barrier

1.DistributedBarrier类说明

DistributedBarrier类实现了栅栏的功能。它的构造函数如下:

  1. /**
  2. * @param client client
  3. * @param barrierPath path to use as the barrier
  4. */
  5. public DistributedBarrier(CuratorFramework client, String barrierPath)

DistributedBarrier构造函数中barrierPath参数用来确定一个栅栏,只要barrierPath参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下:

1.主导client设置一个栅栏

2.其他客户端就会调用waitOnBarrier()等待栅栏移除,程序处理线程阻塞

3.主导client移除栅栏,其他客户端的处理程序就会同时继续运行。

DistributedBarrier类的主要方法如下:

  • setBarrier() - 设置栅栏
  • waitOnBarrier() - 等待栅栏移除
  • removeBarrier() - 移除栅栏

异常处理:DistributedBarrier会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

2.编写示例程序

  1. public class DistributedBarrierExample
  2. {
  3. private static final int QTY = 5;
  4. private static final String PATH = "/examples/barrier";
  5. public static void main(String[] args) throws Exception
  6. {
  7. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  8. client.start();
  9. ExecutorService service = Executors.newFixedThreadPool(QTY);
  10. DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
  11. controlBarrier.setBarrier();
  12. for (int i = 0; i < QTY; ++i)
  13. {
  14. final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
  15. final int index = i;
  16. Callable<Void> task = new Callable<Void>()
  17. {
  18. @Override
  19. public Void call() throws Exception
  20. {
  21. Thread.sleep((long) (3 * Math.random()));
  22. System.out.println("Client #" + index + " 等待");
  23. barrier.waitOnBarrier();
  24. System.out.println("Client #" + index + " 开始");
  25. return null;
  26. }
  27. };
  28. service.submit(task);
  29. }
  30. Thread.sleep(1000 * 3);
  31. System.out.println("所有的Client都在等待");
  32. controlBarrier.removeBarrier();
  33. service.shutdown();
  34. service.awaitTermination(10, TimeUnit.MINUTES);
  35. client.close();
  36. System.out.println("OK!");
  37. }
  38. }

这个例子创建了controlBarrier来设置栅栏和移除栅栏。我们创建了5个线程,在此Barrier上等待。最后移除栅栏后所有的线程才继续执行。

如果你开始不设置栅栏,所有的线程就不会阻塞住。

3.示例程序运行结果

运行结果控制台:

  1. Client #1 等待
  2. Client #2 等待
  3. Client #0 等待
  4. Client #4 等待
  5. Client #3 等待
  6. 所有的Client都在等待
  7. Client #4 开始
  8. Client #2 开始
  9. Client #0 开始
  10. Client #3 开始
  11. Client #1 开始
  12. OK!

运行时查看Zookeeper节点信息如下:

2.双栅栏Double Barrier

双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。双栅栏类是DistributedDoubleBarrier

1. DistributedDoubleBarrier类说明

DistributedDoubleBarrier类实现了双栅栏的功能。它的构造函数如下:

  1. // client - the client
  2. // barrierPath - path to use
  3. // memberQty - the number of members in the barrier
  4. public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)

memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。当leave方法被调用时,它也阻塞调用线程,知道所有的成员都调用了leave。

就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。

注意:参数memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开!

与栅栏(DistributedBarrier)一样,双栅栏的barrierPath参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下:

1.从多个客户端在同一个路径上创建双栅栏(DistributedDoubleBarrier),然后调用enter()方法,等待栅栏数量达到memberQty时就可以进入栅栏。

2.栅栏数量达到memberQty,多个客户端同时停止阻塞继续运行,直到执行leave()方法,等待memberQty个数量的栅栏同时阻塞到leave()方法中。

3.memberQty个数量的栅栏同时阻塞到leave()方法中,多个客户端的leave()方法停止阻塞,继续运行。

DistributedDoubleBarrier类的主要方法如下:

  • enter()、enter(long maxWait, TimeUnit unit) - 等待同时进入栅栏
  • leave()、leave(long maxWait, TimeUnit unit) - 等待同时离开栅栏

异常处理:DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()和leave方法会抛出异常。

2.编写示例程序

  1. public class DistributedBarrierDoubleExample
  2. {
  3. private static final int QTY = 5;
  4. private static final String PATH = "/examples/barrier";
  5. public static void main(String[] args) throws Exception
  6. {
  7. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  8. client.start();
  9. ExecutorService service = Executors.newFixedThreadPool(QTY);
  10. for (int i = 0; i < (QTY + 2); ++i)
  11. {
  12. final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
  13. final int index = i;
  14. Callable<Void> task = new Callable<Void>()
  15. {
  16. @Override
  17. public Void call() throws Exception
  18. {
  19. Thread.sleep((long) (3 * Math.random()));
  20. System.out.println("Client #" + index + " 等待");
  21. if(false == barrier.enter(5, TimeUnit.SECONDS))
  22. {
  23. System.out.println("Client #" + index + " 等待超时!");
  24. return null;
  25. }
  26. System.out.println("Client #" + index + " 进入");
  27. Thread.sleep((long) (3000 * Math.random()));
  28. barrier.leave();
  29. System.out.println("Client #" + index + " 结束");
  30. return null;
  31. }
  32. };
  33. service.submit(task);
  34. }
  35. service.shutdown();
  36. service.awaitTermination(10, TimeUnit.MINUTES);
  37. client.close();
  38. System.out.println("OK!");
  39. }
  40. }

注意:创建双栅栏的数量为:(QTY + 2),而创建双栅栏的参数为:new DistributedDoubleBarrier(client, PATH, QTY),当等待栅栏的数量大于或等于这个值(QTY)栅栏就会打开!

3.示例程序运行结果

运行结果控制台:

  1. Client #0 等待
  2. Client #2 等待
  3. Client #3 等待
  4. Client #4 等待
  5. Client #1 等待
  6. Client #4 进入
  7. Client #2 进入
  8. Client #0 进入
  9. Client #1 进入
  10. Client #3 进入
  11. Client #4 结束
  12. Client #5 等待
  13. Client #2 结束
  14. Client #3 结束
  15. Client #6 等待
  16. Client #0 结束
  17. Client #1 结束
  18. Client #5 等待超时!
  19. Client #6 等待超时!
  20. OK!

运行时查看Zookeeper节点信息如下:

-------------------------------------------------------------------------------------------------------------------------------

来自为知笔记(Wiz)

时间: 2024-10-29 10:46:14

07.Curator Barrier的相关文章

斐讯 FIR151M 频繁掉线(OpenWRT解决方案)

0. 现象与前言 在使用斐讯 FIR151M 路由器连接网络时,传输数据时频繁掉线. 官方固件刷了两个版本,问题未解决. 建议高级用户看本教程,要做好不能使用 Web 管理界面的心理准备. 1. 准备 没有打开telnet和ssh服务,因此考虑直接在硬件上下手,在板子上找到了串口. 上排针.电烙铁.焊锡,引出了四个点,方便使用模块进行通讯,见下图. 之后拿PL2303模块连上计算机,使用超级终端通讯.(PuTTY也是可以串口通讯的) 2. 初步探究 a. 启动 打开超级终端,连上了串口,重开路由

零基础openwrt固件编译

零基础OpenWRT固件编译 戴维营教育<物联网/智能家居开发>课程系列 1.OpenWRT项目简介 OpenWRT是一个专用于无线路由器设备的Linux发行版.支持众多的芯片方案的路由器设备,由x86到ARM,MIPS等等. 目前稳定版本是: BARRIER BREAKER (14.07) 目前开发版本是: Chaos Calmer (trunk) 详细信息参看官方网址: https://openwrt.org 2.采用OpenWRT的智能路由器 极路由 小米路由 优酷路由 迅雷路由 360

Mac OS X中编译WRTNode固件

1.Disk Image CreationMac OS X系统默认的磁盘文件系统是非大小写敏感的,而Openwrt编译环境需要大小写敏感支持,故我们创建一个磁盘映像文件来新建大小写敏感的文件系统. 下面开始用MacOSX系统中自带的命令hdiutil来创建一个新磁盘镜像并挂载到系统中. Hackintosh:~ Diveinedu$ cd $HOMEHackintosh:~ Diveinedu$ hdiutil create -size 20g -fs "Case-sensitive HFS+&

CuratorBarrier

一.DistributedDoubleBarrier 同时开始,同时结束 1 package bjsxt.curator.barrier; 2 3 import java.util.Random; 4 5 import org.apache.curator.RetryPolicy; 6 import org.apache.curator.framework.CuratorFramework; 7 import org.apache.curator.framework.CuratorFramewo

百度房间卡是否可骄傲是快乐积分拉斯科

http://www.ebay.com/cln/ycn6646/-/167568259015/2015.02.07 http://www.ebay.com/cln/gon-n31/-/167197496017/2015.02.07 http://www.ebay.com/cln/hu_d027/-/167453250013/2015.02.07 http://www.ebay.com/cln/ywa2962/-/167301832012/2015.02.07 http://www.ebay.co

六、curator recipes之屏障barrier

简介 当两个进程在执行任务的时候,A调用了B,A需要等待B完成以后的通知,我们可以使用curator的屏障功能来实现. 官方文档:http://curator.apache.org/curator-recipes/barrier.html JavaDoc:http://curator.apache.org/apidocs/org/apache/curator/framework/recipes/barriers/DistributedBarrier.html 代码示例 import org.ap

03.Curator深入使用

1.Apache Curator简介 Curator提供了一套Java类库,可以更容易的使用ZooKeeper.ZooKeeper本身提供了Java Client的访问类,但是API太底层,不宜使用,易出错.Curator提供了三个组件.Curator client用来替代ZOoKeeper提供的类,它封装了底层的管理并提供了一些有用的工具.Curator framework提供了高级的API来简化ZooKeeper的使用.它增加了很多基于ZooKeeper的特性,帮助管理ZooKeeper的连

Apache Curator入门实战

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量. 1.Zookeeper安装部署 Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行. [root@vm Temp]$ wget http://mirror.bi

Zookeeper开源客户端框架Curator简介

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类