ZooKeeper典型应用(二) 生产者与消费者

In this tutorial, we show simple implementations of barriers and producer-consumer queues using ZooKeeper. We call the respective classes Barrier and Queue. These examples assume that you have at least one ZooKeeper server running.

本文将告诉你如何使用 Zookeeper 实现两种常用的分布式数据结构,屏障(barriers) 和队列(queues),我们为此还分别实现了两个类:Barrier and Queue. 本文中的例子假设你已经成功运行了Zookeeper服务器。

Both primitives use the following common excerpt of code:

static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

Both classes extend SyncPrimitive. In this way, we execute steps that are common to all primitives in the constructor of SyncPrimitive. To keep the examples simple, we create a ZooKeeper object the first time we instantiate either a barrier object or a queue object, and we declare a static variable that is a reference to this object. The subsequent instances of Barrier and Queue check whether a ZooKeeper object exists. Alternatively, we could have the application creating a ZooKeeper object and passing it to the constructor of Barrier and Queue.

We use the process() method to process notifications triggered due to watches. In the following discussion, we present code that sets watches. A watch is internal structure that enables ZooKeeper to notify a client of a change to a node. For example, if a client is waiting for other clients to leave a barrier, then it can set a watch and wait for modifications to a particular node, which can indicate that it is the end of the wait. This point becomes clear once we go over the examples.

Barriers

A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.

In this example, each process instantiates a Barrier object, and its constructor takes as parameters:

  • the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")
  • the path of the barrier node on ZooKeeper (e.g., "/b1")
  • the size of the group of processes

The constructor of Barrier passes the address of the Zookeeper server to the constructor of the parent class. The parent class creates a ZooKeeper instance if one does not exist. The constructor of Barrier then creates a barrier node on ZooKeeper, which is the parent node of all process nodes, and we call root (Note: This is not the ZooKeeper root "/").

/**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;
            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }
        }

To enter the barrier, a process calls enter(). The process creates a node under the root to represent it, using its host name to form. the node name. It then wait until enough processes have entered the barrier. A process does it by checking the number of children the root node has with "getChildren()", and waiting for notifications in the case it does not have enough. To receive a notification when there is a change to the root node, a process has to set a watch, and does it through the call to "getChildren()". In the code, we have that "getChildren()" has two parameters. The first one states the node to read from, and the second is a boolean flag that enables the process to set a watch. In the code the flag is true.

 /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List list = zk.getChildren(root, true);
                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

Note that enter() throws both KeeperException and InterruptedException, so it is the reponsability of the application to catch and handle such exceptions.

Once the computation is finished, a process calls leave() to leave the barrier. First it deletes its corresponding node, and then it gets the children of the root node. If there is at least one child, then it waits for a notification (obs: note that the second parameter of the call to getChildren() is true, meaning that ZooKeeper has to set a watch on the the root node). Upon reception of a notification, it checks once more whether the root node has any child.

 /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

Producer-Consumer Queues

A producer-consumer queue is a distributed data estructure thata group of processes use to generate and consume items. Producer processes create new elements and add them to the queue. Consumer processes remove elements from the list, and process them. In this implementation, the elements are simple integers. The queue is represented by a root node, and to add an element to the queue, a producer process creates a new node, a child of the root node.

The following excerpt of code corresponds to the constructor of the object. As with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, that creates a ZooKeeper object if one doesn‘t exist. It then verifies if the root node of the queue exists, and creates if it doesn‘t.

/**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

A producer process calls "produce()" to add an element to the queue, and passes an integer as an argument. To add an element to the queue, the method creates a new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to append the value of the sequencer counter associated to the root node. In this way, we impose a total order on the elements of the queue, thus guaranteeing that the oldest element of the queue is the next one consumed.

/**
         * Add element to the queue.
         *
         * @param i
         * @return
         */
        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;
            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);
            return true;
        }

To consume an element, a consumer process obtains the children of the root node, reads the node with smallest counter value, and returns the element. Note that if there is a conflict, then one of the two contending processes won‘t be able to delete the node and the delete operation will throw an exception.

A call to getChildren() returns the list of children in lexicographic order. As lexicographic order does not necessary follow the numerical order of the counter values, we need to decide which element is the smallest. To decide which one has the smallest counter value, we traverse the list, and remove the prefix "element" from each one.

 /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;
            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();
                        return retvalue;
                    }
                }
            }
        }
    }

完整代码如下:

SyncPrimitive.Java
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
public class SyncPrimitive implements Watcher {
    static ZooKeeper zk = null;
    static Integer mutex;
    String root;
    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }
    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;
        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;
            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }
        }
        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List list = zk.getChildren(root, true);
                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }
        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }
    /**
     * Producer-Consumer queue
     */
    static public class Queue extends SyncPrimitive {
        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }
        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */
        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;
            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);
            return true;
        }
        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;
            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();
                        return retvalue;
                    }
                }
            }
        }
    }
    public static void main(String args[]) {
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);
    }
    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");
        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);
        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){
                } catch (InterruptedException e){
                }
        } else {
            System.out.println("Consumer");
            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){
                }
            }
        }
    }
    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){
        } catch (InterruptedException e){
        }
        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }
        try{
            b.leave();
        } catch (KeeperException e){
        } catch (InterruptedException e){
        }
        System.out.println("Left barrier");
    }
}
时间: 2024-12-10 14:15:49

ZooKeeper典型应用(二) 生产者与消费者的相关文章

深入理解并发(二)--生产者及消费者

生产者及消费者问题,是线程操作中的一个经典案列.但由于线程运行的不确定性,生产者及消费者可能会产生一些问题: 试想,如果生产者线程向存储数据空间添加了部分信息,但没有添加全部,这时就切换到消费者线程,这时消费者线程将会把已经添加了的部分信息,后上一次的信息混淆了,导致出错. 或者,若生产者放数据,与消费者取数据的速度不匹配,也会出现问题:即可能会出现,生产者放了多条数据,消费者才取了一条,导致数据丢失:或生产者只放了一条数据,但消费者已经取了多条,这会导致重复取出数据. 举例说明: class

zookeeper客户端命令行查看dubbo服务的生产者和消费者

假设zookeeper安装在192.168.5.130这台服务器上,现在我们通过命令行查看dubbo在zookeeper注册服务的生产者和消费者信息 首先通过命令切换到/usr/zookeeper-3.4.10/bin目录,然后输入 ./zkCli.sh -server 192.168.5.130:2888 (2888为zookeeper在服务器上提供服务的端口)会看到如下截图: 然后在命令行再输入: ls / 查看目录信息,就能看到注册的dubbo服务,截图如下: 3 在命令行依次输入: ls

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

JAVA基础再回首(二十五)——Lock锁的使用、死锁问题、多线程生产者和消费者、线程池、匿名内部类使用多线程、定时器、面试题

JAVA基础再回首(二十五)--Lock锁的使用.死锁问题.多线程生产者和消费者.线程池.匿名内部类使用多线程.定时器.面试题 版权声明:转载必须注明本文转自程序员杜鹏程的博客:http://blog.csdn.net/m366917 我们来继续学习多线程 Lock锁的使用 虽然我们可以理解同步代码块和同步方法的锁对象问题,但是我们并没有直接看到在哪里加上了锁,在哪里释放了锁,为了更清晰的表达如何加锁和释放锁,JDK5以后提供了一个新的锁对象Lock Lock void lock():获取锁 v

JAVA学习笔记(四十二)-生产者消费者模型

wait().notify() /* * wait().notify() * * 1.两个方法都只能在synchronized代码块中执行,因为要对持有锁的线程操作,只有同步中才有锁 * 2.两个方法在操作同步中的线程时,必须要标识所操作线程持有的对象锁 * 3.等待和唤醒必须是同一个对象锁 */ public class Test05 { public static void main(String[] args) { MyThread3 mt=new MyThread3(); Thread

搞懂分布式技术6:Zookeeper典型应用场景及实践

搞懂分布式技术6:Zookeeper典型应用场景及实践 一.ZooKeeper典型应用场景实践 ZooKeeper是一个高可用的分布式数据管理与系统协调框架.基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题.网上对ZK的应用场景也有不少介绍,本文将介绍比较常用的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍. 值得注意的是,ZK并非天生就是为这些应用场景设计的,都是后来众多开发者根据其框架的特性,利用其提

Parallel Programming-实现并行操作的流水线(生产者、消费者)

本文介绍如何使用C#实现并行执行的流水线(生产者消费者): 流水线示意图 实现并行流水线 一.流水线示意图 上图演示了流水线,action1接收input,然后产生结果保存在buffer1中,action2读取buffer1中由action1产生的数据,以此类推指导action4完成产生Output. 以上也是典型的生产者消费者模式. 上面的模式如果使用普通常规的串行执行是很简单的,按部就班按照流程图一步一步执行即可.如果为了提高效率,想使用并行执行,也就是说生产者和消费者同时并行执行,该怎么办

ZooKeeper典型应用场景(转)

ZooKeeper是一个高可用的分布式数据管理与系统协调框架.基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题.网上对ZK的应用场景也有不少介绍,本文将结合作者身边的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍. 值得注意的是,ZK并非天生就是为这些应用场景设计的,都是后来众多开发者根据其框架的特性,利用其提供的一系列API接口(或者称为原语集),摸索出来的典型使用方法.因此,也非常欢迎读者分享你在ZK

zooKeeper 典型应用场景一览

http://nileader.blog.51cto.com/1381108/1040007 转载请用注明 @ni掌柜  [email protected] ZooKeeper是一个高可用的分布式数据管理与系统协调框架.基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题.网上对ZK的应用场景也有不少介绍,本文将结合作者身边的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍. 值得注意的是,ZK并非天生就是为这