ZooKeeper与Curator注册和监控

Curator提供了对zookeeper客户端的封装,并监控连接状态和会话session,特别是会话session过期后,curator能够重新连接zookeeper,并且创建一个新的session。

对于zk的使用者来说,session的概念至关重要,如果想了解更多session的说明,请访问:http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html

zk客户端和zk服务器间主要可能存在下面几种异常情况:

1.     短暂失去连接:此时客户端检测到与服务端的连接已经断开,但是服务端维护的客户端session尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于session没有过期,zookeeper能够保证连接恢复后保持正常服务。

2.     失去连接时间很长:此时服务器相对于客户端的session已经过期了,与先前session相关的watcher和ephemeral的路径和数据都会消失;当Curator重新创建了与zk的连接后,会获取到session expired异常,Curator会销毁先前的session,并且会创建一个新的session,需要注意的是,与之前session相关的watcher和ephemeral类型的路径和数据在新的session中也不会存在,需要开发者在CuratorFramework.getConnectionStateListenable().addListener()中添加状态监听事件,对ConnectionState.LOST事件进行监听,当session过期后,使得之前的session状态得以恢复。对于ephemeral类型,在客户端应该保持数据的状态,以便及时恢复。

3.     客户端重新启动:不论先前的zk session是否已经过期,都需要重新创建临时节点、添加数据和watch事件,先前的session也会在稍后的一段时间内过期。

4.     Zk服务器重新启动:由于zk将session信息存放到了硬盘上,因此重启后,先前未过期的session仍然存在,在zk服务器启动后,客户端与zk服务器创建新的连接,并使用先前的session,与1相同。

5.     需要注意的是,当session过期了,在session过期期间另外的客户端修改了zk的值,那么这个修改在客户端重新连接到zk上时,zk客户端不会接收到这个修改的watch事件(尽管添加了watch),如果需要严格的watch逻辑,就需要在curator的状态监控中添加逻辑。

特别提示:watcher仅仅是一次性的,zookeeper通知了watcher事件后,就会将这个watcher从session中删除,因此,如果想继续监控,就要添加新的watcher。

下面提供了对persistent和ephemeral两种类型节点的监控方法,其中get方法说明了persistent节点如何监控,而register方法说明了ephemeral类型的节点如何监控。

public class CuratorTest {

private CuratorFramework zkTools;

private ConcurrentSkipListSet watchers = newConcurrentSkipListSet();

private static Charset charset = Charset.forName("utf-8");

public CuratorTest() {

zkTools = CuratorFrameworkFactory

.builder()

.connectString("10.11.21.78:12306")

.namespace("zk/test")

.retryPolicy(new RetryNTimes(2000,20000))

.build();

zkTools.start();

}

public void addReconnectionWatcher(final String path,final ZookeeperWatcherType watcherType,final CuratorWatcher watcher){

synchronized (this) {

if(!watchers.contains(watcher.toString()))//不要添加重复的监听事件

{

watchers.add(watcher.toString());

System.out.println("add new watcher " + watcher);

zkTools.getConnectionStateListenable().addListener(newConnectionStateListener() {

@Override

public void stateChanged(CuratorFramework client, ConnectionState newState) {

System.out.println(newState);

if(newState == ConnectionState.LOST){//处理session过期

try{

if(watcherType == ZookeeperWatcherType.EXITS){

zkTools.checkExists().usingWatcher(watcher).forPath(path);

}else if(watcherType == ZookeeperWatcherType.GET_CHILDREN){

zkTools.getChildren().usingWatcher(watcher).forPath(path);

}else if(watcherType == ZookeeperWatcherType.GET_DATA){

zkTools.getData().usingWatcher(watcher).forPath(path);

}else if(watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS){

//ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中,

//会处理create事件,将路径值恢复到先前状态

Stat stat =zkTools.checkExists().usingWatcher(watcher).forPath(path);

if(stat == null){

System.err.println("to create");

zkTools.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL)

.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

.forPath(path);

}

}

}catch (Exception e) {

e.printStackTrace();

}

}

}

});

}

}

}

public void create() throws Exception{

zkTools.create()//创建一个路径

.creatingParentsIfNeeded()//如果指定的节点的父节点不存在,递归创建父节点

.withMode(CreateMode.PERSISTENT)//存储类型(临时的还是持久的)

.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//访问权限

.forPath("zk/test");//创建的路径

}

public void put() throws Exception{

zkTools.//对路径节点赋值

setData().

forPath("zk/test","hello world".getBytes(Charset.forName("utf-8")));

}

public void get() throws Exception{

String path = "zk/test";

ZKWatch watch = new ZKWatch(path);

byte[] buffer = zkTools.

getData().

usingWatcher(watch).forPath(path);

System.out.println(new String(buffer,charset));

//添加session过期的监控

addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);

}

public void register() throws Exception{

String ip = InetAddress.getLocalHost().getHostAddress();

String registeNode = "zk/register/"+ip;//节点路径

byte[] data = "disable".getBytes(charset);//节点值

CuratorWatcher watcher = new ZKWatchRegister(registeNode,data);    //创建一个register watcher

Stat stat = zkTools.checkExists().forPath(registeNode);

if(stat != null){

zkTools.delete().forPath(registeNode);

}

zkTools.create()

.creatingParentsIfNeeded()          .withMode(CreateMode.EPHEMERAL)

.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

.forPath(registeNode,data);//创建的路径和值

//添加到session过期监控事件中

addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS,watcher);

data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);

System.out.println("get path form zk : "+registeNode+":"+new String(data,charset));

}

public static void main(String[] args) throws Exception {

CuratorTest test = new CuratorTest();

test.get();

test.register();

Thread.sleep(10000000000L);

}

public class ZKWatch implements CuratorWatcher{

private final String path;

public String getPath() {

return path;

}

public ZKWatch(String path) {

this.path = path;

}

@Override

public void process(WatchedEvent event) throws Exception {

System.out.println(event.getType());

if(event.getType() == EventType.NodeDataChanged){

byte[] data = zkTools.

getData().

usingWatcher(this).forPath(path);

System.out.println(path+":"+new String(data,Charset.forName("utf-8")));

}

}

}

public class ZKWatchRegister implements CuratorWatcher{

private final String path;

private byte[] value;

public String getPath() {

return path;

}

public ZKWatchRegister(String path,byte[] value) {

this.path = path;

this.value = value;

}

@Override

public void process(WatchedEvent event) throws Exception {

System.out.println(event.getType());

if(event.getType() == EventType.NodeDataChanged){

//节点数据改变了,需要记录下来,以便session过期后,能够恢复到先前的数据状态

byte[] data = zkTools.

getData().

usingWatcher(this).forPath(path);

value = data;

System.out.println(path+":"+new String(data,charset));

}else if(event.getType() == EventType.NodeDeleted){

//节点被删除了,需要创建新的节点

System.out.println(path + ":" + path +" has been deleted.");

Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);

if(stat == null){

zkTools.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL)

.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

.forPath(path);

}

}else if(event.getType() == EventType.NodeCreated){

//节点被创建时,需要添加监听事件(创建可能是由于session过期后,curator的状态监听部分触发的)

System.out.println(path + ":" +" has been created!" + "the current data is " +new String(value));

zkTools.setData().forPath(path, value);

zkTools.getData().usingWatcher(this).forPath(path);

}

}

}

public enum ZookeeperWatcherType{

GET_DATA,GET_CHILDREN,EXITS,CREATE_ON_NO_EXITS

}

}

时间: 2024-11-03 17:42:09

ZooKeeper与Curator注册和监控的相关文章

Zookeeper实现分布式集群监控

Zookeeepr实现分布式集群监控 Zookeeper中节点有两种:临时节点和永久节点 从类型上看节点又可以分为四种节点类型:PERSIST,PERSIST_SEQUENTIAL,EPHEMERAL,EPHEMERAL_SEQUENTIAL 临时节点有一个特点:当创建临时节点的程序停掉之后,这个临时节点就会消失. 监视器的特点:可以给zk中的节点注册监视器,见识这个节点的变化情况. 监视器注册一次,只能使用一次,多次使用就要多次注册. 我们利用这个Zookeeper的临时节点特性+监视器(Wa

【转帖】基于Zookeeper的服务注册与发现

http://www.techweb.com.cn/network/hardware/2015-12-25/2246973.shtml 背景 大多数系统都是从一个单一系统开始起步的,随着公司业务的快速发展,这个单一系统变得越来越庞大,带来几个问题: 1. 随着访问量的不断攀升,纯粹通过提升机器的性能来已经不能解决问题,系统无法进行有效的水平扩展 2. 维护这个单一系统,变得越来越复杂 3. 同时,随着业务场景的不同以及大研发的招兵买马带来了不同技术背景的工程师,在原有达达Python技术栈的基础

ZeroMQ接口函数之 :zmq_socket_monitor - 注册一个监控回调函数

ZeroMQ 官方地址 :http://api.zeromq.org/4-2:zmq-socket-monitor zmq_socket_monitor(3) ØMQ Manual - ØMQ/4.1.0 Name zmq_socket_monitor - 注册一个监控回调函数 Synopsis int zmq_socket_monitor (void *socket, char * *addr, int events); Description zmq_socket_monitor() 函数会

window7环境下ZooKeeper的安装运行及监控查看

原文:http://www.cnblogs.com/RainAndWind/p/4668427.html ZooKeeper是一个分布式开源框架,供了协调分布式应用的基本服务.这些天在使用DUBBO,由于开发环境是在windows环境下,需要能够先运行ZooKeeper,然后在此基础上实现服务的地址分配. 在下面,先搭建一个单机模式的的ZooKeeper环境. 首先从开源中国社会里把代码包下载下来.在这里是3.4.6(zookeeper-3.3.6.tar.gz) 再将包解压进入文件目录,可以看

关于Ring3层的注册表监控

最近一直想做远程操作的注册表,将客户端的注册表发送到主控端,遇到两个问题: 1.不能每次点击TreeControl都是一次请求的发送,太浪费资源. 2.在客户端的注册表监控效果也不是很好.(驱动不稳定,只想用Ring3层) 第一个问题比较好解决,在主控端加一个缓存结构就Ok,但是第二个问题还有一些问题. 常用的注册表监控一般都会使用钩子,Hook有关注册表操作的函数.但是这种方法是针对进程 而言,如果要监控全局,就要对每个进程Inject,这基本不现实. 一个使用DETOUR库的RegQuery

Zookeeper实现服务上下线监控服务列表

package com.billstudy.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import or

网站集群架构实战(LVS负载均衡、Nginx代理缓存、Nginx动静分离、Rsync+Inotify全网备份、Zabbix自动注册全网监控)--技术流ken

前言 最近做了一个不大不小的项目,现就删繁就简单独拿出来web集群这一块写一篇博客.数据库集群请参考<MySQL集群架构篇:MHA+MySQL-PROXY+LVS实现MySQL集群架构高可用/高性能-技术流ken>下面是项目的一些简单介绍. WEB集群项目简介 随着网站访问量的激增,势必会导致网站的负载增加,现需求搭载一套高性能,高负载,高可用的网站集群架构以保障网站的持续.高效.安全.稳定的运行. 针对以上需求,我们采用了如下的技术: 使用负载均衡技术来实现网站请求的调度分发,减小后端服务器

Zookeeper之Curator(1)客户端对节点的一些监控事件的api使用

<一>节点改变事件的监听 1 public class CauratorClientTest { 2 3 //链接地址 4 private static String zkhost="172.19.27.246:2181"; 5 //sessionTimeoutMs会话超时时间,单位为毫秒.默认是60000ms 6 private static int sessionTimeoutMs=5000; 7 //connectionTimeoutMs连接创建超时时间,单位毫秒,默

第三章 zookeeper客户端-curator详解

一.简介 Curator是Netflix公司开源的一套zookeeper客户端框架. Curator包含了几个包: curator-framework:对zookeeper的底层api的一些封装 curator-client:提供一些客户端的操作,例如重试策略等 curator-recipes:封装了一些高级特性,如:Cache事件监听.选举.分布式锁.分布式计数器.分布式Barrier等. 我们需要根据ZK的版本来选择对应的curator版本,否则会出现兼容性问题 二.环境 jdk 1.8 z