分布式队列ZooKeeper的实现

一、背景

  有一些时候,多个团队需要共同完成一个任务,比如,A团队将Hadoop集群计算的结果交给B团队继续计算,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下

去,直到最后一部分完成。在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB(企业服务股总线)服务器上部署自己的服务,然后通过消息中间件完成调度任务。对亍分步式的多个

Hadoop集群系统的协作,同样可以用这种架构来做只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

  本文楼主将使用zookeeper做为分步式消息中间件构造一个大型超市的部分数据计算模型来完成各个区域利润计算的业务需求。

  由于采购和销售分别是由不同厂商进行的软件开发和维护,而且业务往来也在不同的城市和地区。 所以在每月底结算时,工作量都特别大。 比如,计算利润表: 当月利润 = 当月销售金额 - 当月采购

额 - 当月其他支出(楼主只是粗略计算)。如果采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统, 如何能让多个系统,配合起来完成该需求?

二、系统构思

  楼主基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。排除了ESB的部分,只保留zookeeper进行实现。

  1.   采购数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
  2.   销售数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
  3.   其他费用支出:为少量数据,基于文件或数据库存储和分析

  我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售 (sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润, 幵创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次 。

  

  Hadoop mapreduce1,Hadoop mapreduce2 是2个独立的Hadoop集群应用。 Java App 是2个独立的Java应用 。ZooKeeper集群的有3个节点 。

  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop mapreduce1提交,用于统计采购金额
  • /queue/sell,是znode队列中,2号排对者,由Hadoop mapreduce2提交,用于统计销售金额
  • /queue/other,是znode队列中,3号排对者,由Java App提交,用于统计其他费用支出金额
  • /queue/profit,当znode队列中满了,触发创建利润节点。

  当/qeueu/profit被创建后,利润java app被启动,所有zookeeper的连接通知同步程序(红色线),队列已完成,所有程序结束。

三、环境准备

  1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)zookeeper集群。至少三个节点。安装参考楼主这篇文章(http://www.cnblogs.com/qq503665965/p/6790580.html

  3)java开发环境。

四、mapreduce及java app程序

  计算采购金额:

  1 package zkqueue;
  2 import java.io.IOException;
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 import java.util.regex.Pattern;
  6
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapred.JobConf;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19
 20
 21 /**
 22  * 采购金额计算
 23  * @author Jon_China
 24  *
 25  */
 26 public class Purchase {
 27
 28     public static final String HDFS = "hdfs://192.168.8.101:9000";
 29     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
 30
 31     public static class PurchaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
 32
 33         private String month = "2017-01";
 34         private Text k = new Text(month);
 35         private IntWritable v = new IntWritable();
 36         private int money = 0;
 37
 38         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
 39             System.out.println(values.toString());
 40             String[] tokens = DELIMITER.split(values.toString());//拆分源数据
 41             if (tokens[3].startsWith(month)) {// 过滤1月份数据
 42                 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//计算
 43                 v.set(money);
 44                 context.write(k, v);
 45             }
 46         }
 47     }
 48
 49     public static class PurchaseReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 50         private IntWritable v = new IntWritable();
 51         private int money = 0;
 52
 53         @Override
 54         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 55             for (IntWritable line : values) {
 56                 money += line.get();
 57             }
 58             v.set(money);
 59             context.write(null, v);
 60             System.out.println("Output:" + key + "," + money);
 61         }
 62
 63     }
 64
 65     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
 66         JobConf conf = config();
 67         String local_data = path.get("purchase");
 68         String input = path.get("input");
 69         String output = path.get("output");
 70
 71
 72         HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
 73         hdfs.rmr(input);
 74         hdfs.mkdirs(input);
 75         hdfs.copyFile(local_data, input);
 76
 77         Job job = Job.getInstance(conf);
 78         job.setJarByClass(Purchase.class);
 79
 80         job.setOutputKeyClass(Text.class);
 81         job.setOutputValueClass(IntWritable.class);
 82
 83         job.setMapperClass(PurchaseMapper.class);
 84         job.setReducerClass(PurchaseReducer.class);
 85
 86         job.setInputFormatClass(TextInputFormat.class);
 87         job.setOutputFormatClass(TextOutputFormat.class);
 88
 89         FileInputFormat.setInputPaths(job, new Path(input));
 90         FileOutputFormat.setOutputPath(job, new Path(output));
 91
 92         job.waitForCompletion(true);
 93     }
 94
 95     public static JobConf config() {
 96         JobConf conf = new JobConf(Purchase.class);
 97         conf.setJobName("purchase");
 98         conf.addResource("classpath:/hadoop/core-site.xml");
 99         conf.addResource("classpath:/hadoop/hdfs-site.xml");
100         conf.addResource("classpath:/hadoop/mapred-site.xml");
101         conf.addResource("classpath:/hadoop/yarn-site.xml");
102         return conf;
103     }
104
105     public static Map<String,String> path(){
106         Map<String, String> path = new HashMap<String, String>();
107         path.put("purchase", Purchase.class.getClassLoader().getResource("logfile/biz/purchase.csv").getPath());// 源文件数据
108         path.put("input", HDFS + "/user/hdfs/biz/purchase");//hdfs存储路径
109         path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); //hdfs输出路径
110         return path;
111     }
112
113     public static void main(String[] args) throws Exception {
114         run(path());
115     }
116
117 }

  销售数据计算:

  1 package zkqueue;
  2
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 import java.util.regex.Pattern;
  7
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapred.JobConf;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20
 21 /**
 22  * 销售数据计算
 23  * @author Jon_China
 24  *
 25  */
 26 public class Sell {
 27
 28     public static final String HDFS = "hdfs://192.168.8.101:9000";
 29     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
 30
 31     public static class SellMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
 32
 33         private String month = "2013-01";
 34         private Text k = new Text(month);
 35         private IntWritable v = new IntWritable();
 36         private int money = 0;
 37
 38         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
 39             System.out.println(values.toString());
 40             String[] tokens = DELIMITER.split(values.toString());
 41             if (tokens[3].startsWith(month)) {// 1月的数据
 42                 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
 43                 v.set(money);
 44                 context.write(k, v);
 45             }
 46         }
 47     }
 48
 49     public static class SellReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 50         private IntWritable v = new IntWritable();
 51         private int money = 0;
 52
 53         @Override
 54         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 55             for (IntWritable line : values) {
 56                 money += line.get();
 57             }
 58             v.set(money);
 59             context.write(null, v);
 60             System.out.println("Output:" + key + "," + money);
 61         }
 62
 63     }
 64
 65     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
 66         JobConf conf = config();
 67         String local_data = path.get("sell");
 68         String input = path.get("input");
 69         String output = path.get("output");
 70
 71         // 初始化sell
 72         HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
 73         hdfs.rmr(input);
 74         hdfs.mkdirs(input);
 75         hdfs.copyFile(local_data, input);
 76
 77         Job job = Job.getInstance(conf);
 78         job.setJarByClass(Sell.class);
 79
 80         job.setOutputKeyClass(Text.class);
 81         job.setOutputValueClass(IntWritable.class);
 82
 83         job.setMapperClass(SellMapper.class);
 84         job.setReducerClass(SellReducer.class);
 85
 86         job.setInputFormatClass(TextInputFormat.class);
 87         job.setOutputFormatClass(TextOutputFormat.class);
 88
 89         FileInputFormat.setInputPaths(job, new Path(input));
 90         FileOutputFormat.setOutputPath(job, new Path(output));
 91
 92         job.waitForCompletion(true);
 93     }
 94
 95     public static JobConf config() {// Hadoop集群的远程配置信息
 96         JobConf conf = new JobConf(Purchase.class);
 97         conf.setJobName("purchase");
 98         conf.addResource("classpath:/hadoop/core-site.xml");
 99         conf.addResource("classpath:/hadoop/hdfs-site.xml");
100         conf.addResource("classpath:/hadoop/mapred-site.xml");
101         conf.addResource("classpath:/hadoop/yarn-site.xml");
102         return conf;
103     }
104
105     public static Map<String,String> path(){
106         Map<String, String> path = new HashMap<String, String>();
107         path.put("sell", Sell.class.getClassLoader().getResource("logfile/biz/sell.csv").getPath());// 本地的数据文件
108         path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
109         path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
110         return path;
111     }
112
113     public static void main(String[] args) throws Exception {
114         run(path());
115     }
116
117 }

  其他金额计算:

 1 package zkqueue;
 2
 3 import java.io.BufferedReader;
 4 import java.io.File;
 5 import java.io.FileReader;
 6 import java.io.IOException;
 7 import java.util.regex.Pattern;
 8
 9 public class Other {
10
11     public static String file = "/logfile/biz/other.csv";
12     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
13     private static String month = "2017-01";
14
15     public static void main(String[] args) throws IOException {
16         calcOther(file);
17     }
18
19     public static int calcOther(String file) throws IOException {
20         int money = 0;
21         BufferedReader br = new BufferedReader(new FileReader(new File(file)));
22
23         String s = null;
24         while ((s = br.readLine()) != null) {
25             String[] tokens = DELIMITER.split(s);
26             if (tokens[0].startsWith(month)) {// 1月的数据
27                 money += Integer.parseInt(tokens[1]);
28             }
29         }
30         br.close();
31         System.out.println("Output:" + month + "," + money);
32         return money;
33     }
34 }

  计算利润:

  

 1 package zkqueue;
 2
 3 import java.io.IOException;
 4
 5
 6 /**
 7  * 利润计算
 8  * @author Jon_China
 9  *
10  */
11 public class Profit {
12
13     public static void main(String[] args) throws Exception {
14         profit();
15     }
16
17     public static void profit() throws Exception {
18         int sell = getSell();
19         int purchase = getPurchase();
20         int other = getOther();
21         int profit = sell - purchase - other;
22         System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
23     }
24
25     public static int getPurchase() throws Exception {
26         HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
27         return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
28     }
29
30     public static int getSell() throws Exception {
31         HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
32         return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
33     }
34
35     public static int getOther() throws IOException {
36         return Other.calcOther(Other.file);
37     }
38
39 }

  zookeeper任务调度:

  1 package zkqueue;
  2
  3 import java.io.IOException;
  4 import java.util.List;
  5
  6 import org.apache.zookeeper.CreateMode;
  7 import org.apache.zookeeper.KeeperException;
  8 import org.apache.zookeeper.WatchedEvent;
  9 import org.apache.zookeeper.Watcher;
 10 import org.apache.zookeeper.ZooDefs.Ids;
 11 import org.apache.zookeeper.ZooKeeper;
 12 /**
 13  * 分布式队列zookeeper调度
 14  * @author Jon_China
 15  *
 16  */
 17 public class QueueZookeeper {
 18     //设置队列目录树
 19     final public static String QUEUE = "/queue";
 20     final public static String PROFIT = "/queue/profit";
 21     final public static String PURCHASE = "/queue/purchase";
 22     final public static String SELL = "/queue/sell";
 23     final public static String OTHER = "/queue/other";
 24
 25     public static void main(String[] args) throws Exception {
 26         if (args.length == 0) {
 27             System.out.println("Please start a task:");
 28         } else {
 29             doAction(Integer.parseInt(args[0]));
 30         }
 31     }
 32     public static void doAction(int client) throws Exception {
 33         //zookeeper地址
 34         String host1 = "192.168.8.104:2181";
 35         String host2 = "192.168.8.105:2181";
 36         String host3 = "192.168.8.106:2181";
 37
 38         ZooKeeper zk = null;
 39         switch (client) {//1,2,3分别将不同任务加入队列
 40         case 1:
 41             zk = connection(host1);
 42             initQueue(zk);
 43             doPurchase(zk);
 44             break;
 45         case 2:
 46             zk = connection(host2);
 47             initQueue(zk);
 48             doSell(zk);
 49             break;
 50         case 3:
 51             zk = connection(host3);
 52             initQueue(zk);
 53             doOther(zk);
 54             break;
 55         }
 56     }
 57
 58     // 创建一个与服务器的连接
 59     public static ZooKeeper connection(String host) throws IOException {
 60         ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
 61             // 监控所有被触发的事件
 62             public void process(WatchedEvent event) {
 63                 if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
 64                     System.out.println("Queue has Completed!!!");
 65                 }
 66             }
 67         });
 68         return zk;
 69     }
 70     /**
 71      * 初始化队列
 72      * @param zk
 73      * @throws KeeperException
 74      * @throws InterruptedException
 75      */
 76     public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
 77         System.out.println("WATCH => " + PROFIT);
 78         zk.exists(PROFIT, true);
 79
 80         if (zk.exists(QUEUE, false) == null) {
 81             System.out.println("create " + QUEUE);
 82             zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 83         } else {
 84             System.out.println(QUEUE + " is exist!");
 85         }
 86     }
 87     /**
 88      * 采购任务
 89      * @param zk
 90      * @throws Exception
 91      */
 92     public static void doPurchase(ZooKeeper zk) throws Exception {
 93         if (zk.exists(PURCHASE, false) == null) {
 94
 95             Purchase.run(Purchase.path());
 96
 97             System.out.println("create " + PURCHASE);
 98             zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 99         } else {
100             System.out.println(PURCHASE + " is exist!");
101         }
102         isCompleted(zk);
103     }
104     /**
105      * 销售任务
106      * @param zk
107      * @throws Exception
108      */
109     public static void doSell(ZooKeeper zk) throws Exception {
110         if (zk.exists(SELL, false) == null) {
111
112             Sell.run(Sell.path());
113
114             System.out.println("create " + SELL);
115             zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
116         } else {
117             System.out.println(SELL + " is exist!");
118         }
119         isCompleted(zk);
120     }
121     /**
122      * 其他计算任务
123      * @param zk
124      * @throws Exception
125      */
126     public static void doOther(ZooKeeper zk) throws Exception {
127         if (zk.exists(OTHER, false) == null) {
128
129             Other.calcOther(Other.file);
130
131             System.out.println("create " + OTHER);
132             zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
133         } else {
134             System.out.println(OTHER + " is exist!");
135         }
136         isCompleted(zk);
137     }
138     /**
139      * 检测完成情况
140      * @param zk
141      * @throws Exception
142      */
143     public static void isCompleted(ZooKeeper zk) throws Exception {
144         int size = 3;
145         List<String> children = zk.getChildren(QUEUE, true);
146         int length = children.size();
147
148         System.out.println("Queue Complete:" + length + "/" + size);
149         if (length >= size) {
150             System.out.println("create " + PROFIT);
151             Profit.profit();
152             zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
153
154             for (String child : children) {// 清空节点
155                 zk.delete(QUEUE + "/" + child, -1);
156             }
157
158         }
159     }
160 }

四、运行结果

  在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求 。然后,通过同步的分步式队列自动启动了计算利润的程序,幵在日志中打印了2017 年1月的利润为-6693765。

  示例代码地址:https://github.com/LJunChina/hadoop/tree/master/distributed_mq

时间: 2024-07-30 13:51:03

分布式队列ZooKeeper的实现的相关文章

Zookeeper分布式队列的实现

摘要:本文要通过zookeeper实现一个简单可靠的分布式队列 本文源码请在这里下载:https://github.com/appleappleapple/DistributeLearning 一.队列 Zookeeper可以处理两种类型的队列:(1)同步队列当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达.例如一个班去旅游,看是否所有人都到齐了,到齐了就发车.例如有个大任务分解为多个子任务,要所有子任务都完成了才能进入到下一流程.(2)先进先出队列按照FIFO方式进行入队和出

ZooKeeper实现分布式队列Queue

ZooKeeper实现分布式队列Queue 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了. 通过

[转载] ZooKeeper实现分布式队列Queue

转载自http://blog.fens.me/zookeeper-queue/ 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就

一种基于zookeeper的分布式队列的设计与实现

package com.ysl.zkclient.queue; import com.ysl.zkclient.ZKClient; import com.ysl.zkclient.exception.ZKNoNodeException; import com.ysl.zkclient.utils.ExceptionUtil; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.Logg

分布式队列编程:模型、实战

介绍 作为一种基础的抽象数据结构,队列被广泛应用在各类编程中.大数据时代对跨进程.跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在.但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点: 使用分布式队列的时候,没有意识到它是队列. 有具体需求的时候,忘记了分布式队列的存在. 文章首先从最基础的需求出发,详细剖析分布式队列编程模型的需求来源.定义.结构以及其变化多样性.通过这一部分的讲解,作者期望能在两方面帮助读者:一方面,提供一个系统性的思考方法,使读者能

(九)分布式服务----Zookeeper注册中心

==>>点击查看本系列文章目录 首先看一下几种注册中心: 最老的就是Zookeeper了, 比较新的有Eureka,Consul 都可以做注册中心.可以自行搜索对比三者的优缺点. Zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了. 几年前大数据很火的时候,只要学hadoop必学zookeeper,当然还有其他成员. 大数据简单说就是分布式,比如分布式文件存储hdfs,分布式数据库hbase,分布式协调zookeeper,还有kafka

分布式队列 Celery

详情参见: 分布式队列神器 Celery 个人学习总结后续更新……

【分布式】Zookeeper的Leader选举-选举过程介绍

[分布式]Zookeeper的Leader选举-选举过程介绍 选举开始,服务器会各自为自己投票,在投票完成后,会将投票信息发送给集群中的所有服务器(观察者服务器不参与选举). 选票由两部分组成:服务器唯一标识myid和事务编号zxid,即(myid,xzid). zxid越大说明数据越新,在选择算法中的权重越大.myid越大,在选择算法中的权重越大. 比较选票时会先比较zxid,zxid大的获胜,zxid相同时比较myid,myid大的获胜,胜利方选票不变,失败方选票将变成与胜利方一样,并再次将

zookeeper应用 - FIFO 队列 分布式队列

使用ZooKeeper实现的FIFO队列,这个队列是分布式的. package fifo; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeep