zookeeper应用 - 配置服务

一端不停的更新配置,另一端监听这个配置的变化。

需要注意的是:监听端不一定读取到所有的变化。在zk服务器发送通知到客户端,客户端读取数据注册监听之间可能发生了多次数据变化,这些数据变化是得不到通知的。但可以保证的是每次通知得到的数据都是比之前的数据要新的。

ZKUtils.java

package config;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class ZKUtils {
	/**
	 * 构建zookeeper客户端对象
	 * @param hosts
	 * @return
	 * @throws Exception
	 */
	public static ZooKeeper open(String hosts) throws Exception {
		final CountDownLatch singal = new CountDownLatch(1);

		ZooKeeper zk = new ZooKeeper(hosts, 2000, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				singal.countDown();
			}
		});

		singal.await();

		return zk;
	}
}

  

ConfigUpdater.java

package config;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
 * 不停的更新/test节点上的数据, 模拟配置更新
 *
 */
public class ConfigUpdater {
	public static final String HOSTS = "hadoop1:2181";
	public static final String PATH = "/test";

	public static void main(String[] args) throws Exception {
		ZooKeeper zk = ZKUtils.open(HOSTS);

		while(true) {
			String data = UUID.randomUUID().toString();

			Stat stat = zk.exists(PATH, false);
			if(stat == null) {
				zk.create(PATH, data.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			} else {
				zk.setData(PATH, data.getBytes("UTF-8"), -1);
			}

			TimeUnit.SECONDS.sleep(5);
		}
	}
}

  

ConfigUpdateWatcher .java

package config;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
/**
 * 注册监听/test节点上的数据变化
 *
 */
public class ConfigUpdateWatcher implements Watcher {
	private ZooKeeper zk = null;

	public ConfigUpdateWatcher() {
		try {
			zk = ZKUtils.open(ConfigUpdater.HOSTS);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void process(WatchedEvent event) {
		System.out.println(event);

		if(event.getType().equals(EventType.NodeDataChanged)) {
			try {
				//读取事件后, 再次注册数据监听事件
				byte[] data = zk.getData(ConfigUpdater.PATH, this, null);
				System.out.printf("接收到了事件%s, 新的数据是:%s", EventType.NodeDataChanged, new String(data, "UTF-8"));
				System.out.println();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	private void run() {
		try {
			//注册数据变化监听
			zk.getData(ConfigUpdater.PATH, this, null);
		} catch (Exception e) {
			e.printStackTrace();
		}

		try {
			TimeUnit.SECONDS.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		new ConfigUpdateWatcher().run();
	}
}

  

完善

在操作zookeeper的时候,如果是幂等操作(多次操作不影响结果),在失败时可以多次重试以增加可靠性,比如ConfigUpdater的写操作可以进行多次重试,修改后的代码如下:

package config;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
 * 不停的更新/test节点上的数据, 模拟配置更新
 *
 */
public class ConfigUpdater {
	public static final String HOSTS = "hadoop1:2181";
	public static final String PATH = "/test";
	public static final int RETRIES = 3; //重试次数
	public static final int RETRY_PERIOD = 500; //重试间隔

	public static void main(String[] args) throws Exception {
		ZooKeeper zk = ZKUtils.open(HOSTS);

		//不停的模拟数据更新操作
		while(true) {

			String data = UUID.randomUUID().toString();

			/*
			 * 多次重试,增加可靠性
			 */
			int retied = 0;
			while(true) {
				try {
					Stat stat = zk.exists(PATH, false);
					if(stat == null) {
						zk.create(PATH, data.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
					} else {
						zk.setData(PATH, data.getBytes("UTF-8"), -1);
					}

					//执行成功则跳出循环,否则继续(重试)
					break;
				} catch (KeeperException e) {
					retied++;

					//如果会话过期了则重新创建一个ZooKeeper客户端对象
					if(e.code().equals(KeeperException.Code.SESSIONEXPIRED)) {
						zk = ZKUtils.open(HOSTS);
					} else {
						//其他KeeperException Code的处理

						//KeeperException.Code.CONNECTIONLOSS异常可以不用处理:ZooKeeper客户端对象会自动进行重新连接
					}

					//可以重试3次,每次间隔500毫秒
					if(retied == RETRIES) {
						throw e;
					} else {
						TimeUnit.MICROSECONDS.sleep(RETRY_PERIOD);
					}
				}
			}

			//模拟其他操作占用的时间
			TimeUnit.SECONDS.sleep(5);
		}
	}
}

  

时间: 2024-10-21 08:04:09

zookeeper应用 - 配置服务的相关文章

Hadoop系列之zookeeper(分布式协调服务)安装配置

Hadoop系列之zookeeper(分布式协调服务)安装配置 1.安装cd /root/softtar zxvf zookeeper-3.4.6.tar.gzmv zookeeper-3.4.6 /usr/local/hadoop/zookeeper2.修改配置文件mkdir -p /data/zookeeper/data //3个节点都需要创建此目录cd /usr/local/hadoop/zookeeper/confcp zoo_sample.cfg zoo.cfg#vim zoo.cfg

使用zookeeper实现配置同步

前言 应用项目中都会有一些配置信息,这些配置信息数据量少,一般会保存到内存.文件或者数据库,有时候需要动态更新.当需要在多个应用服务器中修改这些配置文件时,需要做到快速.简单.不停止应用服务器的方式修改并同步配置信息到所有应用中去.本篇文章就是介绍如何使用ZooKeeper来实现配置的动态同步. ZooKeeper 在<hive Driver类运行过程>一文中可以看到hive为了支持并发访问引入了ZooKeeper来实现分布式锁.参考<ZooKeeper典型应用场景一览>一文,Zo

ZooKeeper学习第二期--ZooKeeper安装配置

一.Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式. ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境:■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例:■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个"集合体"(ensemble) Zookeeper通过复制来实现高可用性,只要集合体中半数以上的机器处于可用状态,它就能够保证服务继续.为什么一定要超过半数呢?这

ZooKeeper安装配置

一.Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式. ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境:■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例:■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现高可用性,只要集合体中半数以上的机器处于可用状态,它就能够保证服务继续.为什么一定要超过半数呢?这跟Zook

ZooKeeper系列1:ZooKeeper的配置

问题导读:1.zookeeper有哪些配置文件?2.zookeeper最低配置需要哪些配置项?3.zookeeper高级配置需要配置哪些项? ZooKeeper 的功能特性通过 ZooKeeper 配置文件来进行控制管理( zoo.cfg 配置文件). ZooKeeper 这样的设计其实是有它自身的原因的.通过前面对 ZooKeeper 的配置可以看出,对 ZooKeeper 集群进行配置的时候,它的配置文档是完全相同的(对于集群伪分布模式来说,只有很少的部分是不同的).这样的配置方使得在部署Z

ZooKeeper安装配置(转载)

转载自:http://www.cnblogs.com/sunddenly/p/4018459.html 一.Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式. ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境:■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例:■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个"集合体"(ensemble) Zookeeper通过复制

[转载] zookeeper 分布式锁服务

转载自http://www.cnblogs.com/shanyou/archive/2012/09/22/2697818.html 分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那一层来挡.当大量的行锁.表锁.事务充斥着数据库的时候.一般web应用很多的瓶颈都在数据库上,这里给大家介绍的是减轻数据库锁负担的一种方案,使用zookeeper分布式锁服务. zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hiv

dubbo+zookeeper+springboot构建服务

本次和大家分享的是dubbo框架应用的初略配置和zookeeper注册中心的使用:说到注册中心现在我使用过的只有两种:zookeeper和Eureka,zk我结合dubbo来使用,而Eureka结合springcloud使用,因此后面将和大家分享一些关于微服务的一些篇章,希望对你有好的帮助. 安装注册中心zookeeper dubbo框架之provider和consumer dubbo-admin部署 安装注册中心zookeeper 首先,我们需要在网上搜索下zookeeper下载地址,我这里是

ZooKeeper: 简介, 配置及运维指南

1. 概览 ZooKeeper是一个供其它分布式应用程序使用的软件, 它为其它分布式应用程序提供所谓的协调服务. 所谓的协调服务, 是指ZooKeeper的如下能力 naming 命名 configuration management 配置管理 synchronization 同步 group service 分组服务 上面四个功能可能现在不太好说清, 但大致上目前你需要明白ZooKeeper就是为其它分布式应用程序提供一些基础功能的程序就好了. 我们以其中的配置管理为例. 假设你在写一个可横向