zookeeper应用 - FIFO 队列 分布式队列

使用ZooKeeper实现的FIFO队列,这个队列是分布式的。

package fifo;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
 * 使用ZooKeeper实现的FIFO队列
 * @author lisg
 *
 */
public class ZKFIFO {
	private static final String HOSTS = "vm1";
	private ZooKeeper zk = null;
	private static final String PARENT_PATH = "/fifo";
	private static final String SEQ_PREFIX = "seq-";

	public ZKFIFO() {
		try {
			final CountDownLatch cdl = new CountDownLatch(1);
			zk = new ZooKeeper(HOSTS, 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if(KeeperState.SyncConnected.equals(event.getState())) {
						cdl.countDown();
					}
				}
			});

			cdl.await();

			//创建父节点
			Stat stat = zk.exists(PARENT_PATH, false);
			if(stat == null) {
				zk.create(PARENT_PATH, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		} catch (Exception e) {
			System.out.println("zookeeper集群连接失败!");
			e.printStackTrace();
		}
	}

	/**
	 * 在父节点下创建顺序子节点
	 * @param data
	 */
	public void push(String data) {
		if(data == null) {
			data = "";
		}

		try {
			zk.create(PARENT_PATH + "/" + SEQ_PREFIX,
					data.getBytes("UTF-8"),
					ZooDefs.Ids.OPEN_ACL_UNSAFE,
					CreateMode.PERSISTENT_SEQUENTIAL);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 删除字第一个子节点,并返回它的值
	 * @return
	 */
	public String pop() {
		try {
			final List<String> children = zk.getChildren(PARENT_PATH, false);
			if(children.isEmpty()) {
				return null;
			}

			Collections.sort(children);

			String firstChildPath = PARENT_PATH + "/" + children.get(0);

			final byte[] data = zk.getData(firstChildPath, false, null);
			zk.delete(firstChildPath, -1);

			return new String(data, "UTF-8");
		} catch (Exception e) {
			e.printStackTrace();
		}

		return null;
	}

	public void close() {
		try {
			this.zk.close();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		final ZKFIFO fifo = new ZKFIFO();

		/*
		for(int i=0; i<10; i++) {
			new Thread() {
				public void run() {
					fifo.push("data-" + UUID.randomUUID().toString().replace("-", ""));
				};
			}.start();
		}
		*/

		System.out.println(fifo.pop());

		fifo.close();
	}
}

  

需要改进的地方:

1)zookeeper异常处理、重试

时间: 2024-12-21 04:24:07

zookeeper应用 - FIFO 队列 分布式队列的相关文章

Zookeeper分布式队列的实现

摘要:本文要通过zookeeper实现一个简单可靠的分布式队列 本文源码请在这里下载:https://github.com/appleappleapple/DistributeLearning 一.队列 Zookeeper可以处理两种类型的队列:(1)同步队列当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达.例如一个班去旅游,看是否所有人都到齐了,到齐了就发车.例如有个大任务分解为多个子任务,要所有子任务都完成了才能进入到下一流程.(2)先进先出队列按照FIFO方式进行入队和出

分布式队列ZooKeeper的实现

一.背景 有一些时候,多个团队需要共同完成一个任务,比如,A团队将Hadoop集群计算的结果交给B团队继续计算,B完成了自己任务再交给C团队继续做.这就有点像业务系统的工作流一样,一环一环地传下 去,直到最后一部分完成.在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB(企业服务股总线)服务器上部署自己的服务,然后通过消息中间件完成调度任务.对亍分步式的多个 Hadoop集群系统的协作,同样可以用这种架构来做只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了. 本文楼

ZooKeeper实现分布式队列Queue

ZooKeeper实现分布式队列Queue 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了. 通过

[转载] ZooKeeper实现分布式队列Queue

转载自http://blog.fens.me/zookeeper-queue/ 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就

一种基于zookeeper的分布式队列的设计与实现

package com.ysl.zkclient.queue; import com.ysl.zkclient.ZKClient; import com.ysl.zkclient.exception.ZKNoNodeException; import com.ysl.zkclient.utils.ExceptionUtil; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.Logg

分布式队列编程:模型、实战

介绍 作为一种基础的抽象数据结构,队列被广泛应用在各类编程中.大数据时代对跨进程.跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在.但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点: 使用分布式队列的时候,没有意识到它是队列. 有具体需求的时候,忘记了分布式队列的存在. 文章首先从最基础的需求出发,详细剖析分布式队列编程模型的需求来源.定义.结构以及其变化多样性.通过这一部分的讲解,作者期望能在两方面帮助读者:一方面,提供一个系统性的思考方法,使读者能

分布式队列 Celery

详情参见: 分布式队列神器 Celery 个人学习总结后续更新……

基于外部ZooKeeper的GlusterFS作为分布式文件系统的完全分布式HBase集群安装指南

(WJW)基于外部ZooKeeper的GlusterFS作为分布式文件系统的完全分布式HBase集群安装指南 [X] 前提条件 服务器列表: 192.168.1.84 hbase84 #hbase-master 192.168.1.85 hbase85 #hbase-regionserver,zookeeper 192.168.1.86 hbase86 #hbase-regionserver,zookeeper 192.168.1.87 hbase87 #hbase-regionserver,z

Zookeeper是如何实现分布式锁的

Zookeeper是如何实现分布式锁的 标签 : Zookeeper 分布式 实现分布式锁要考虑的重要问题 1. 三个核心要素 加锁, 解锁, 锁超时 2. 三个问题 要保证原子性操作, 加锁和锁超时的操作要一次性执行完毕 防止误删锁 在误删的基础上, 加一个守护线程, 为锁续命. 什么是临时顺序节点 Zookeeper的数据存储结构就像是一棵树, 这棵树由节点组成, 这种节点叫做Znode. Znode分为四种类型. 1. 持久节点(Persistent) 默认的节点类型, 创建节点的客户端和