第三章 zookeeper客户端-curator详解

一、简介

  Curator是Netflix公司开源的一套zookeeper客户端框架。

  Curator包含了几个包:

  curator-framework:对zookeeper的底层api的一些封装

  curator-client:提供一些客户端的操作,例如重试策略等

  curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

  我们需要根据ZK的版本来选择对应的curator版本,否则会出现兼容性问题

二、环境

  jdk 1.8

  zk 3.4.12

  curator-recipes 2.8.0

三、常用api介绍

  1)客户端创建

  CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, 5000,3000,new RetryNTimes(10, 5000));  1、ZK_ADDRESS 连接zk的地址 格式host1:port1,host2:port2,。。。  2、sessionTimeoutMs 会话超时时间,单位是毫秒,可不填测此参数,默认值为60000ms  3、connectionTimeoutMs   4、retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口。

  2)客户端创建

  client.start()

  3)创建数据节点

   四种节点
   PERSISTENT:持久化
   PERSISTENT_SEQUENTIAL:持久化并且带序列号
   EPHEMERAL:临时
   EPHEMERAL_SEQUENTIAL:临时并且带序列号

  1、默认持久化节点
  client.create().forPath("/data1");
  2、创建持久化节点并赋值
  client.create().forPath("/data2", "this is data2".getBytes());
  3、创建临时空节点
  client.create().withMode(CreateMode.EPHEMERAL).forPath("/data3");
  4、创建临时节点并赋值
  client.create().withMode(CreateMode.EPHEMERAL).forPath("/data4", "this is data4".getBytes());
  5、创建临时有序节点并赋值并递归创建父节点
  client.create().creatingParentsIfNeeded().withMode(CreateMode.EPH_SEQ).forPath("/patent/data", "children".getBytes());

   4)删除数据节点

  流式风格,拼接顺序可以调整。
  1、只能删除叶子节点,否则抛出异常
  client.delete().forPath("/data1");
  2、删除一个节点并递归删除其所有的子节点
  client.delete().deletingChildrenIfNeeded().forPath("/patent/data");
  3、保证强制删除一个节点,只要客户端会话有效就会持续进行删除直到删除成功。
  client.delete().guaranteed().forPath("/data2");

   5)查询数据节点

  1、读节点的数据内容,返回byte数组
  byte[] data=client.getData().forPath("/data2");
  2、读取一个节点的数据内容,同时获取到该节点的stat
  Stat stat = new Stat();
  client.getData().storingStatIn(stat).forPath("data4");
  3、获取一个路径下所有的子节点
  List<String> childrens = client.getChildren().forPath("/");
  4、检查节点是否存在,返回一个stat
  stat= client.checkExists().forPath("/data3");

  6)数据节点更新

  1、更新一个节点的数据内容,返回一个stat
  Stat stat = client.setData().forPath("/data1","update data1 data".getBytes());

  7)事务

    CuratorFramework的实例包含inTransaction()接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。  

    client.inTransaction()
    .and().create().withMode(CreateMode.PERSISTENT).forPath("/data5", "this is data5".getBytes())
    .and().setData().forPath("/data5", "update data5".getBytes())
    .and().commit();

    8)监听watch

   Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。

    1、Path Cache

     Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

   

package com.vi.test;

import com.vi.util.CuratorClientUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;

import java.util.List;

public class CuratorWatchTest {
    private static final String PATH = "/path/cache";

    public static void main(String[] args) {
        //创建客户端
        CuratorFramework client = CuratorClientUtil.getClient();
        client.start();
        try {
            PathChildrenCache cache = pathCacheTest(client);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes());
            Thread.sleep(500);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes());
            Thread.sleep(500);
            //获取所有子节点
            List<ChildData> datas =cache.getCurrentData();
            for(ChildData childData : datas){
                System.out.println("节点路径:"+childData.getPath() + ",节点值:" + new String(childData.getData()));
            }
            client.setData().forPath(PATH + "/data1", "update data".getBytes());
            Thread.sleep(500);
            client.delete().deletingChildrenIfNeeded().forPath("/path");
            Thread.sleep(500);
            cache.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static PathChildrenCache pathCacheTest(CuratorFramework client) throws Exception {
        //第三个参数不为true时,不会缓存data数据
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        /*
            三种启动方式
            NORMAL:正常初始化。
            BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()。
            POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个
            cache.start(PathChildrenCache.StartMode.NORMAL);
         */
        cache.start();
        //增加监听事件
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("事件为:" + event.getType() + ",数据为:" + new String(event.getData().getData()));
            }
        });
        return cache;
    }

}

  2)Node Cache

    Node Cache与Path Cache类似,Node Cache只能监听某一个特定的节点。

  

package com.vi.test;

import com.vi.util.CuratorClientUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;

public class CuratorNodeCacheTest {
    private static final String PATH = "/path/cache";

    public static void main(String[] args) {
        //创建客户端
        CuratorFramework client = CuratorClientUtil.getClient();
        client.start();
        try {
            NodeCache cache = nodeCacheTest(client);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "first data".getBytes());
            Thread.sleep(500);
            client.setData().forPath(PATH, "update data".getBytes());
            Thread.sleep(500);
            client.delete().deletingChildrenIfNeeded().forPath("/path");
            Thread.sleep(500);
            cache.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static NodeCache nodeCacheTest(CuratorFramework client) throws Exception {
        //只能监控一个节点
        NodeCache cache = new NodeCache(client, PATH);
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData childData = cache.getCurrentData();
                if (childData == null) {
                    System.out.println("节点删除!");
                } else {
                    System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
                }
            }
        });
        cache.start();
        return cache;
    }
}

   3)Tree Cache

   Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合。

package com.vi.test;

import com.vi.util.CuratorClientUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.zookeeper.CreateMode;

public class CuratorTreeCacheTest {
    private static final String PATH = "/path/cache";

    public static void main(String[] args) {
        //创建客户端
        CuratorFramework client = CuratorClientUtil.getClient();
        client.start();
        try {
            TreeCache cache = treeCacheTest(client);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes());
            Thread.sleep(500);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes());
            Thread.sleep(500);
            client.setData().forPath(PATH , "update path data".getBytes());
            Thread.sleep(500);
            client.setData().forPath(PATH + "/data1", "update data".getBytes());
            Thread.sleep(500);
            client.delete().deletingChildrenIfNeeded().forPath("/path");
            Thread.sleep(1000);
            cache.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static TreeCache treeCacheTest(CuratorFramework client) throws Exception {
        //只能监控一个节点
        TreeCache cache = new TreeCache(client, PATH);
        cache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("事件为:" + event.getType() + ",节点路径为:" + event.getData().getPath() + ",数据为:" +  new String(event.getData().getData()));
            }
        });
        cache.start();
        return cache;
    }
}

 四、总结

 curator-recipes中有一些高级特性可以使用,在后面的章节会具体介绍。

 源码地址:  https://github.com/binary-vi/binary.github.io/tree/master/zk-curator

原文地址:https://www.cnblogs.com/vi-2525/p/9014091.html

时间: 2024-10-25 19:21:25

第三章 zookeeper客户端-curator详解的相关文章

zookeeper客户端命令详解

今天同事突然向看一下zookeeper中都创建了哪些节点,而我本人对zookeeper的客服端命令了解的很少,有些操作竟然不知道怎么用,于是乎就索性整理一下zookeeper客服端命令的使用,并再此记录一下. 想要用zkClient链接zookeeper,首先执行如下命令,连接到zookeeper server ./zkCli.sh -server localhost:2181 help命令 help命令用于查询客服端所支持的所用的命令,执行help,输入如下: ZooKeeper -serve

WebSocket安卓客户端实现详解(三)–服务端主动通知

WebSocket安卓客户端实现详解(三)–服务端主动通知 本篇依旧是接着上一篇继续扩展,还没看过之前博客的小伙伴,这里附上前几篇地址 WebSocket安卓客户端实现详解(一)–连接建立与重连 WebSocket安卓客户端实现详解(二)–客户端发送请求 终于是最后一篇啦,有点激动\ ( ≧▽≦ ) /啦啦啦, 服务端主动通知 热身完毕,我们先回顾下第一篇中讲到的服务端主动通知的流程 根据notify中事件类型找到对应的处理类,处理对应逻辑. 然后用eventbus通知对应的ui界面更新. 如果

LVS负载均衡群集(三种工作模式原理详解)

LVS负载均衡群集(三种工作模式原理详解) 一.前言 ? 在互联网应用中,随着站点对硬件性能.响应速度.服务稳定性.数据可靠性等要求越来越高,单台服务器力不从心.所以我们需要通过一些方法来解决这样的瓶颈. ? 最简单的方法就是使用价格昂贵的大.小型的主机:但这样在大多数企业中显然是不可取或者说不现实的.那么我们就需要通过多个普通服务器构建服务器群集. 二.相关概念概述 2.1何为LVS? ? LVS--Linux Virtual Server,即Linux虚拟服务器(虚拟主机.共享主机),虚拟主

Linux下ORACLE客户端安装详解

1.首先去oracle官网下载以下安装包(http://www.oracle.com/technetwork/topics/linuxsoft-082809.html) instantclient-basic-linux.x64-11.2.0.3.0.zip instantclient-odbc-linux-11.2.0.3.0.zip instantclient-sdk-linux.x64-11.2.0.3.0.zip instantclient-sqlplus-linux.x64-11.2.

SaltStack 入门到精通第三篇:Salt-Minion配置文件详解

SaltStack 入门到精通第三篇:Salt-Minion配置文件详解 作者:ArlenJ  发布日期:2014-06-09 17:52:16 ##### 主要配置设置 ##### 配置 默认值 说明 例子 default_include minion.d/*.conf master可以从其他文件读取配置,默认情况下master将自动的将master.d/*.conf中的配置读取出来并应用,其中master.d目录是相对存在于主配置文件所在的目录 default_include: minion

zabbix专题:第二章 zabbix3.0安装详解

zabbix3.0安装详解 本节目录大纲 安装配置mariadb 安装服务器端 zabbix web配置 web页面初始化 更改为中文 中文乱码问题 zabbix专题:第二章 zabbix3.2安装详解 zabbix专题:第二章 zabbix3.2安装详解 官方文档地址: https://www.zabbix.com/documentation/3.2/manual/installation/install_from_packages 我安装zabbix用的rpm包,可以从官网的源里面去下载,需

WebSocket安卓客户端实现详解(一)–连接建立与重连

http://blog.csdn.net/zly921112/article/details/72973054 前言 这里特别说明下因为WebSocket服务端是公司线上项目所以这里url和具体协议我全部抹去了,但我会尽力给大家讲明白并且demo我都是测试过,还望各位看官见谅 我们先粗犷的讲下流程,掌握个大概的方向,然后在深入讲解细节的实现.这里先解答一个疑惑,为啥我们这要用WebSocket而不是Socket呢,因为WebSocket是一个应用层协议很多东西都规定好了我们直接按他的规定来用就好

Java进阶(三十二) HttpClient使用详解

Java进阶(三十二) HttpClient使用详解 Http协议的重要性相信不用我多说了,HttpClient相比传统JDK自带的URLConnection,增加了易用性和灵活性(具体区别,日后我们再讨论),它不仅是客户端发送Http请求变得容易,而且也方便了开发人员测试接口(基于Http协议的),即提高了开发的效率,也方便提高代码的健壮性.因此熟练掌握HttpClient是很重要的必修内容,掌握HttpClient后,相信对于Http协议的了解会更加深入. 一.简介 HttpClient是A

boost asio异步读写网络聊天程序客户端 实例详解

// // chat_client.cpp // ~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://ww