一种基于zookeeper的分布式队列的设计与实现

package com.ysl.zkclient.queue;

import com.ysl.zkclient.ZKClient;
import com.ysl.zkclient.exception.ZKNoNodeException;
import com.ysl.zkclient.utils.ExceptionUtil;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;

/**
 * 一种分布式队列的实现
 * @param <T>
 */
public class ZKDistributedQueue<T extends Serializable> {

    private static final Logger LOG = LoggerFactory.getLogger(ZKDistributedQueue.class);

    private static final String ELEMENT_NAME = "node";

    private ZKClient client;
    private String rootPath;

    /**
     * 创建分布式队列
     * @param client zk客户端
     * @param rootPath 队列的跟路径
     */
    public ZKDistributedQueue(ZKClient client, String rootPath) {
        this.client = client;
        this.rootPath = rootPath;
        if(!client.exists(rootPath)){
            throw new ZKNoNodeException("the root path is not exists, please create path first ["+rootPath+"]");
        }
    }

    /**
     * 添加一个元素
     * @param node
     * @return
     */
    public boolean offer(T node){
        try{
            client.create(rootPath+"/"+ELEMENT_NAME + "-",node, CreateMode.PERSISTENT_SEQUENTIAL);
        }catch (Exception e){
            throw ExceptionUtil.convertToRuntimeException(e);
        }
        return true;
    }

    /**
     * 删除并返回顶部元素
     * @return
     */
    public T pool(){
        while(true){
            Node node = getFirstNode();
            if(node == null){
                return null;
            }

            try{
                boolean flag = client.delete(node.getName());
                if(flag){
                    return (T)node.getData();
                }else{
                    //删除失败,说明数据已经被其他的线程获取,重新获取底部元素
                }
            }catch (Exception e){
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    }

    /**
     * 获取队列顶部元素
     * @return
     */
    private Node<T> getFirstNode() {
        try{
            while(true){
                List<String> children = client.getChild(rootPath,true);
                if(children == null || children.isEmpty()){
                    return null;
                }

                String nodeName = getNodeName(children);
                try{
                    return new Node<T>(rootPath+"/"+nodeName,(T)client.getData(rootPath+"/"+nodeName));
                }catch (ZKNoNodeException e){
                    //如果抛出此异常,证明该节点已被其他线程获取
                }
            }
        }catch (Exception e){
            throw ExceptionUtil.convertToRuntimeException(e);
        }
    }

    /**
     * 获取编号最小的节点
     * @param children
     * @return
     */
    private String getNodeName(List<String> children) {
        String child= children.get(0);
        for(String path : children){
            if(path.compareTo(child) < 0){
                child = path;
            }
        }
        return child;
    }

    public boolean isEmpty(){
        return client.getChild(rootPath,true).size() == 0;
    }

    public T peek(){
        Node<T> node = getFirstNode();
        if(node == null){
            return null;
        }
        return node.getData();
    }

    private class Node<T>{

        private String name;
        private T data;

        public Node(String name, T data) {
            this.name = name;
            this.data = data;
        }

        public String getName() {
            return name;
        }

        public T getData() {
            return data;
        }
    }
}

测试代码

/**
     * 测试分布式队列
     * @throws Exception
     * @return void
     */
    @Test
    public void testDistributedQueue() throws Exception{
        final String rootPath = "/zk/queue";
        //创建rootPath
        zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);

        final List<String> list1 = new ArrayList<String>();
        final List<String> list2 = new ArrayList<String>();
        for(int i=0;i<21;i++){
            Thread thread1 = new Thread(new Runnable() {
                public void run() {
                    ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
                    queue.offer(Thread.currentThread().getName());
                    list1.add(Thread.currentThread().getName());
                }
            });
            thread1.start();
        }

        //等待事件到达
        int size1 = TestUtil.waitUntil(21, new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return list1.size();
            }

        }, TimeUnit.SECONDS, 100);
        System.out.println(zkClient.getChildren(rootPath));

        for(int i=0;i<20;i++){
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
                    list2.add(queue.poll());
                }
            });
            thread.start();
        }
        //等待事件到达
        int size2 = TestUtil.waitUntil(20, new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return list2.size();
            }

        }, TimeUnit.SECONDS, 100);
        assertThat(size2).isEqualTo(20);
        boolean flag = true;
        for(int i =0;i<20;i++){
           if(!list1.get(i).equals(list2.get(i))){
               flag = false;
               break;
           }
        }
        assertThat(flag).isTrue();

        ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
        assertThat(queue.peek()).isEqualTo(queue.poll());
    }
时间: 2024-10-11 06:10:37

一种基于zookeeper的分布式队列的设计与实现的相关文章

ZooKeeper实现分布式队列Queue

ZooKeeper实现分布式队列Queue 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了. 通过

基于ZooKeeper的分布式Session实现(转)

1.   认识ZooKeeper ZooKeeper—— “动物园管理员”.动物园里当然有好多的动物,游客可以根据动物园提供的向导图到不同的场馆观赏各种类型的动物,而不是像走在原始丛林里,心惊胆颤的被动 物所观赏.为了让各种不同的动物呆在它们应该呆的地方,而不是相互串门,或是相互厮杀,就需要动物园管理员按照动物的各种习性加以分类和管理,这样我们才 能更加放心安全的观赏动物.回到我们企业级应用系统中,随着信息化水平的不断提高,我们的企业级系统变得越来越庞大臃肿,性能急剧下降,客户抱怨频频.拆 分系

基于ZooKeeper的分布式Session实现

基于ZooKeeper的分布式Session实现 1.   认识ZooKeeper ZooKeeper—— “动物园管理员”.动物园里当然有好多的动物,游客可以根据动物园提供的向导图到不同的场馆观赏各种类型的动物,而不是像走在原始丛林里,心惊胆颤的被动 物所观赏.为了让各种不同的动物呆在它们应该呆的地方,而不是相互串门,或是相互厮杀,就需要动物园管理员按照动物的各种习性加以分类和管理,这样我们才 能更加放心安全的观赏动物.回到我们企业级应用系统中,随着信息化水平的不断提高,我们的企业级系统变得越

Java Web学习总结(20)——基于ZooKeeper的分布式session实现

1.   认识ZooKeeper ZooKeeper-- "动物园管理员".动物园里当然有好多的动物,游客可以根据动物园提供的向导图到不同的场馆观赏各种类型的动物,而不是像走在原始丛林里,心惊胆颤的被动 物所观赏.为了让各种不同的动物呆在它们应该呆的地方,而不是相互串门,或是相互厮杀,就需要动物园管理员按照动物的各种习性加以分类和管理,这样我们才 能更加放心安全的观赏动物.回到我们企业级应用系统中,随着信息化水平的不断提高,我们的企业级系统变得越来越庞大臃肿,性能急剧下降,客户抱怨频频

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

[转载] ZooKeeper实现分布式队列Queue

转载自http://blog.fens.me/zookeeper-queue/ 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就

基于zookeeper的分布式配置中心(一)

最近在学习zookeeper,发现zk真的是一个优秀的中间件.在分布式环境下,可以高效解决数据管理问题.在学习的过程中,要深入zk的工作原理,并根据其特性做一些简单的分布式环境下数据管理工具.本文首先对zk的工作原理和相关概念做一下介绍,然后带大家做一个简单的分布式配置中心. zookeeper介绍 zookeeper是一个分布式协调框架,主要是解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务.状态同步服务.集群管理.分布式应用配置项的管理.分布式锁等. zookeeper使用 查看

一种基于Orleans的分布式Id生成方案

基于Orleans的分布式Id生成方案,因Orleans的单实例.单线程模型,让这种实现变的简单,贴出一种实现,欢迎大家提出意见 public interface ISequenceNoGenerator : Orleans.IGrainWithIntegerKey { Task<Immutable<string>> GetNext(); } public class SequenceNoGenerator : Orleans.Grain, ISequenceNoGenerator

spring boot 定时任务基于zookeeper的分布式锁实现

基于ZooKeeper分布式锁的流程 在zookeeper指定节点(locks)下创建临时顺序节点node_n 获取locks下所有子节点children 对子节点按节点自增序号从小到大排序 判断本节点是不是第一个子节点,若是,则获取锁:若不是,则监听比该节点小的那个节点的删除事件 若监听事件生效,则回到第二步重新进行判断,直到获取到锁 具体实现 添加Maven依赖: <?xml version="1.0" encoding="UTF-8"?> <