08.Curator计数器

这一篇文章我们将学习使用Curator来实现计数器。顾名思义,计数器是用来计数的,利用ZooKeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值,这是由ZooKeeper的一致性保证的。Curator有两个计数器,一个是用int来计数,一个用long来计数。

1.SharedCount

1.SharedCount计数器介绍

这个类使用int类型来计数。 主要涉及三个类。

  • SharedCount - 管理一个共享的整数。所有看同样的路径客户端将有共享的整数(考虑ZK的正常一致性保证)的最高最新的值。
  • SharedCountReader - 一个共享的整数接口,并允许监听改变它的值。
  • SharedCountListener - 用于监听共享整数发生变化的监听器。

SharedCount类代表计数器,可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值,包括字面值和带版本信息的值VersionedValue。

注意:使用SharedCount之前需要调用start(),使用完成之后需要调用stop()

2.编写示例程序

  1. public class SharedCounterExample implements SharedCountListener
  2. {
  3. private static final int QTY = 5;
  4. private static final String PATH = "/examples/counter";
  5. public static void main(String[] args) throws IOException, Exception
  6. {
  7. final Random rand = new Random();
  8. SharedCounterExample example = new SharedCounterExample();
  9. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  10. client.start();
  11. SharedCount baseCount = new SharedCount(client, PATH, 0);
  12. baseCount.addListener(example);
  13. baseCount.start();
  14. List<SharedCount> examples = Lists.newArrayList();
  15. ExecutorService service = Executors.newFixedThreadPool(QTY);
  16. for (int i = 0; i < QTY; ++i)
  17. {
  18. final SharedCount count = new SharedCount(client, PATH, 0);
  19. examples.add(count);
  20. Callable<Void> task = new Callable<Void>()
  21. {
  22. @Override
  23. public Void call() throws Exception
  24. {
  25. count.start();
  26. Thread.sleep(rand.nextInt(10000));
  27. count.setCount(rand.nextInt(10000));
  28. System.out.println("计数器当前值:" + count.getVersionedValue().getValue());
  29. System.out.println("计数器当前版本:" + count.getVersionedValue().getVersion());
  30. System.out.println("trySetCount:" + count.trySetCount(count.getVersionedValue(), 123));
  31. return null;
  32. }
  33. };
  34. service.submit(task);
  35. }
  36. service.shutdown();
  37. service.awaitTermination(10, TimeUnit.MINUTES);
  38. for (int i = 0; i < QTY; ++i)
  39. {
  40. examples.get(i).close();
  41. }
  42. baseCount.close();
  43. client.close();
  44. System.out.println("OK!");
  45. }
  46. @Override
  47. public void stateChanged(CuratorFramework client, ConnectionState newState)
  48. {
  49. System.out.println("连接状态: " + newState.toString());
  50. }
  51. @Override
  52. public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
  53. {
  54. System.out.println("计数器值改变:" + newCount);
  55. }
  56. }

在这个例子中,我们使用baseCount来监听计数值(addListener方法)。任意的SharedCount,只要使用相同的PATH,都可以得到这个计数值。然后我们使用5个线程为计数值增加一个10以内的随机数。

这里我们使用trySetCount去设置计数器。第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值,你的更新可能不成功,但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。而setCount是强制更新计数器的值。

注意:计数器必须start,使用完之后必须调用close关闭它。

在这里再重复一遍前面讲到的, 强烈推荐你监控ConnectionStateListener, 尽管我们的有些例子没有监控它。 在本例中SharedCountListener扩展了ConnectionStateListener。 这一条针对所有的Curator recipes都适用,后面的文章中就不专门提示了。

3.示例程序运行结果

运行结果控制台:

  1. 连接状态: CONNECTED
  2. 计数器当前值:1684
  3. 计数器当前版本:11
  4. trySetCount:true
  5. 计数器值改变:123
  6. 计数器当前值:8425
  7. 计数器当前版本:13
  8. trySetCount:true
  9. 计数器值改变:123
  10. 计数器当前值:9369
  11. 计数器当前版本:15
  12. trySetCount:true
  13. 计数器值改变:123
  14. 计数器当前值:4075
  15. 计数器当前版本:17
  16. trySetCount:true
  17. 计数器值改变:123
  18. 计数器当前值:9221
  19. 计数器当前版本:19
  20. trySetCount:true
  21. OK!

Zookeeper节点信息如下:

2.DistributedAtomicLong

再看一个Long类型的计数器。除了计数的范围比SharedCount大了之外,它首先尝试使用乐观锁的方式设置计数器,如果不成功(比如期间计数器已经被其它client更新了),它使用InterProcessMutex方式来更新计数值。还记得InterProcessMutex是什么吗?它是我们前面讲的分布式可重入锁。这和上面的计数器的实现有显著的不同。

1.DistributedAtomicLong计数器介绍

DistributedAtomicLong计数器和上面的计数器的实现有显著的不同,可以从它的内部实现DistributedAtomicValue.trySet中看出端倪。

  1. public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
  2. {
  3. private final DistributedAtomicValue value;
  4. ......
  5. }
  6. public class DistributedAtomicValue
  7. {
  8. ......
  9. AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
  10. {
  11. MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
  12. tryOptimistic(result, makeValue);
  13. if ( !result.succeeded() && (mutex != null) )
  14. {
  15. tryWithMutex(result, makeValue);
  16. }
  17. return result;
  18. }
  19. ......
  20. }

此计数器有一系列的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加特定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

你必须检查返回结果的succeeded(),它代表此操作是否成功。如果操作成功,preValue()代表操作前的值,postValue()代表操作后的值。

2.编写示例程序

我们下面的例子中使用5个线程对计数器进行加一操作,如果成功,将操作前后的值打印出来。

  1. public class DistributedAtomicLongExample
  2. {
  3. private static final int QTY = 5;
  4. private static final String PATH = "/examples/counter";
  5. public static void main(String[] args) throws IOException, Exception
  6. {
  7. final Random rand = new Random();
  8. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  9. client.start();
  10. List<DistributedAtomicLong> examples = Lists.newArrayList();
  11. ExecutorService service = Executors.newFixedThreadPool(QTY);
  12. for (int i = 0; i < QTY; ++i)
  13. {
  14. final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
  15. examples.add(count);
  16. Callable<Void> task = new Callable<Void>()
  17. {
  18. @Override
  19. public Void call() throws Exception
  20. {
  21. try
  22. {
  23. Thread.sleep(1000 + rand.nextInt(10000));
  24. AtomicValue<Long> value = count.increment();
  25. System.out.println("修改成功: " + value.succeeded());
  26. if (value.succeeded())
  27. {
  28. System.out.println("修改之前的值:" + value.preValue() + " | 修改之后的值:" + value.postValue());
  29. }
  30. }
  31. catch (Exception e)
  32. {
  33. e.printStackTrace();
  34. }
  35. return null;
  36. }
  37. };
  38. service.submit(task);
  39. }
  40. service.shutdown();
  41. service.awaitTermination(10, TimeUnit.MINUTES);
  42. client.close();
  43. System.out.println("OK!");
  44. }
  45. }

注意:你必须检查返回结果的succeeded(),它代表此操作是否成功。如果操作成功,preValue()代表操作前的值,postValue()代表操作后的值。

3.示例程序运行结果

运行结果控制台:

  1. 修改成功: true
  2. 修改之前的值:0 | 修改之后的值:1
  3. 修改成功: true
  4. 修改之前的值:1 | 修改之后的值:2
  5. 修改成功: true
  6. 修改之前的值:2 | 修改之后的值:3
  7. 修改成功: true
  8. 修改之前的值:3 | 修改之后的值:4
  9. 修改成功: true
  10. 修改之前的值:4 | 修改之后的值:5
  11. OK!

Zookeeper节点信息如下:

-------------------------------------------------------------------------------------------------------------------------------

来自为知笔记(Wiz)

时间: 2024-08-22 17:34:02

08.Curator计数器的相关文章

大数据精英实战项目班-Hadoop-Spark-真实企业项目

2018最新最全大数据技术视频,项目视频.整套视频,非那种杂七杂八自己拼凑的,内容如下,需要的联系QQ:3164282908(加Q注明大数据) 更有海量大数据技术视频.大数据项目视频,机器学习深度学习技术视频.项目视频.Python编程视频.Oracle数据库视频.Java培训视频高级架构师视频等等等. ├----------01-大数据Java基础------------- │├java第01天 ││├java第01天-01.类型转换.avi ││├java第01天-02.归档分析与实现.av

百度哈斯发卡号是减肥哈卡斯加分了卡斯

http://www.ebay.com/cln/ta_ya20/-/167521224015/2015.02.08 http://www.ebay.com/cln/p-m6466/-/167398283011/2015.02.08 http://www.ebay.com/cln/ta_ya20/-/167521242015/2015.02.08 http://www.ebay.com/cln/p-m6466/-/167398294011/2015.02.08 http://www.ebay.co

克同极用后管期果要议向中如极示听适VybVfesyhpR

社保划到税务征收,将大大提升社保费的征管效率.税务的征管能力是目前而言最强的,以后税务征收社保不是代收,属于本职了. 之前税局要把社保信息和交个税的工资比对起来有困难!现在好了,个税是自己的,社保也是自己的,比对困难?不存在的! 这一变革,会给那些不给员工上社保.不全额上社保的企业致命一击! 最新案例 前段时间的发改委关于限制特定严重失信人乘坐民航的一则意见--发改财金[2018]385号,其中还有税务总局的联合署名. http://weibo.com/20180408PP/2309279811

03.Curator深入使用

1.Apache Curator简介 Curator提供了一套Java类库,可以更容易的使用ZooKeeper.ZooKeeper本身提供了Java Client的访问类,但是API太底层,不宜使用,易出错.Curator提供了三个组件.Curator client用来替代ZOoKeeper提供的类,它封装了底层的管理并提供了一些有用的工具.Curator framework提供了高级的API来简化ZooKeeper的使用.它增加了很多基于ZooKeeper的特性,帮助管理ZooKeeper的连

Apache Curator入门实战

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量. 1.Zookeeper安装部署 Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行. [root@vm Temp]$ wget http://mirror.bi

Zookeeper开源客户端框架Curator简介

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类

Hadoop学习笔记—7.计数器与自定义计数器

一.Hadoop中的计数器 计数器:计数器是用来记录job的执行进度和状态的.它的作用可以理解为日志.我们通常可以在程序的某个位置插入计数器,用来记录数据或者进度的变化情况,它比日志更便利进行分析. 例如,我们有一个文件,其中包含如下内容: hello you hello me 它被WordCount程序执行后显示如下日志: 在上图所示中,计数器有19个,分为四个组:File Output Format Counters.FileSystemCounters.File Input Format

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

Curator的使用

Curator 为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连.主从选举.分布式计数器.分布式锁等等适用于各种复杂的zookeeper场景的API封装 1 Curator框架使用(一) Curator框架中使用链式编程风格,易读性更强,使用工厂方法创建连接对象. 1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现 1.1 c