zookeeper - java操作

ZKUtils.java

package test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class ZKUtils {
	public static ZooKeeper openZk() throws IOException, InterruptedException {
		//连接zookeeper成功的标志
		final CountDownLatch connectedSignal = new CountDownLatch(1);

		ZooKeeper zk = new ZooKeeper(Test.connectString, Test.sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				if(KeeperState.SyncConnected.equals(event.getState())) {
					//连接成功则打开当前进程
					connectedSignal.countDown();
				}
			}
		});

		//对CountDownLatch对象调用await()方法后,当前线程会堵塞等待,直到对象的计数器为0(调用对象的countDown()方法减1)
		//堵塞当前进程
		connectedSignal.await();
		return zk;
	}
}

  

Test.java

package test;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
public class Test {
	public static final String connectString = "hadoop1:2181";
	public static final int sessionTimeout = 5000;
	public static void main(String[] args) throws Exception {
		//创建path
//		createZnode("/b");

		//列出子znode
//		listChildren("/");

//		delete("/b");

//		deleteAsynchronous("/b");
//		watch();

		//强制客户端连接的服务区跟领导者进行同步,以更新指定path的状态,只能是异步调用
		sync();

		/*
		 * 验证
		 */
//		auth1();
//		auth2();
	}
	/**
	 * 创建znode
	 * @param path
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void createZnode(String path) throws IOException, InterruptedException,
			KeeperException {
		ZooKeeper zk = ZKUtils.openZk();
		String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		System.out.println("创建" + createdPath + "成功");

		// 创建znode, 数据是hello, ACL(访问控制列表)是完全放开的列表, 短暂类型的znode(session断开后,znode将被zookeeper服务器删除)
//		String createdPath = zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		// 这里打印的是
//		System.out.println(createdPath);
		// 顺序的短暂znode
//		createdPath = zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		// 这里打印的是path000000000N, 路径名+10位的序列号
//		System.out.println(createdPath);
	}

	/**
	 * 列出子znode
	 * @param parent
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void listChildren(String parent) throws IOException, InterruptedException,
			KeeperException {
		ZooKeeper zk = ZKUtils.openZk();

		Stat state = zk.exists(parent, false);
		if(state == null) {
			return;
		}

		List<String> children = zk.getChildren(parent, false);
		for (String child : children) {
			System.out.println(child);
		}
	}

	/**
	 * 删除znode及其子znode
	 * @param path
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void delete(String path) throws IOException, InterruptedException, KeeperException {
		ZooKeeper zk = ZKUtils.openZk();

		Stat state = zk.exists(path, false);
		if(state == null) {
			return;
		}

		System.out.println(state.getVersion());

		List<String> children = zk.getChildren(path, false);
		for (String child : children) {
			delete(path + "/" + child);
		}

		//需要指定path和version, version为-1则取消版本号验证
		zk.delete(path, -1);
	}

	/**
	 * 异步操作
	 * zookeeper同时提供同步、异步两个版本的API,业务对读取效率没影响的情况下选择哪个方式都可以.
	 * @param path
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void deleteAsynchronous(String path) throws IOException, InterruptedException, KeeperException {
		ZooKeeper zk = ZKUtils.openZk();

		Stat state = zk.exists(path, false);
		if(state == null) {
			return;
		}

		List<String> children = zk.getChildren(path, false);
		for (String child : children) {
			delete(path + "/" + child);
		}

		//需要指定path和version, version为-1则取消版本号验证
//		zk.delete(path, -1);
		zk.delete(path, -1, new VoidCallback() {
			@Override
			public void processResult(int rc, String path, Object ctx) {
				System.out.println("异步删除操作执行完毕 , rc: " + rc + ", path: " + path);
			}
		}, null);

		//等待一会, 否则主线程直接结束了就看不到异步线程的输出结果了
		Thread.sleep(2000);
	}

	/**
	 * 监听事件
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void watch() throws IOException, InterruptedException, KeeperException {
		final CountDownLatch singal = new CountDownLatch(1);
		ZooKeeper zk = new ZooKeeper("hadoop1:2181", 5000, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				System.out.println("接受到了一个事件:" + event);
				if(Watcher.Event.KeeperState.SyncConnected.equals(event.getState())) {
					singal.countDown();
				}
			}
		});
		singal.await();
		//exists、getData、getChildren操作可以设置监控

		//判断"/b"是否存在并对其进行监控, 使用创建zookeeper时的watcher处理
		zk.exists("/b", true);

		//使用指定的watcher处理
//		zk.exists("/b", new Watcher());

		Thread.sleep(Long.MAX_VALUE);

	}

	/**
	 * 强制客户端连接的服务器跟领导者进行同步,以更新指定znode的状态
	 * 只能异步调用
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void sync() throws IOException, InterruptedException, KeeperException {
		ZooKeeper zk = ZKUtils.openZk();
		zk.sync("/a", new VoidCallback(){
			@Override
			public void processResult(int rc, String path, Object ctx) {
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("同步完毕");
			}

		}, null);

		System.out.println("here");

		Stat stat = zk.exists("/a", false);
		byte[] data = zk.getData("/a", false, stat);
		System.out.println("data: " + new String(data));

		Thread.sleep(5000);
	}

	/**
	 * 使用自定义ACL创建znode
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 * @throws NoSuchAlgorithmException
	 */
	private static void auth1() throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {

		List<ACL> acls = new ArrayList<ACL>();
		//用户名密码验证方式
		Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("lisg:123456"));
		acls.add(new ACL(Perms.READ, id));

		ZooKeeper zk = ZKUtils.openZk();
		zk.create("/c", "test".getBytes(), acls, CreateMode.PERSISTENT);
	}

	/**
	 * ACL验证
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private static void auth2() throws IOException, InterruptedException, KeeperException {
		ZooKeeper zk = ZKUtils.openZk();

		Stat stat = zk.exists("/c", false);
		if(stat == null) {
			return;
		}

		List<ACL> cacls = zk.getACL("/c", stat);
		System.out.println("/c的ACL列表是:" + cacls);

		//KeeperErrorCode = NoAuth for /c 异常
		zk.addAuthInfo("digest", "lisg:123456".getBytes());
		byte[] data = zk.getData("/c", false, stat);
		System.out.println("data: " + new String(data));
	}
}

  

时间: 2025-01-01 03:19:05

zookeeper - java操作的相关文章

java操作hbase例子

hbase安装方法请参考:hbase-0.94安装方法详解 hbase常用的shell命令请参考:hbase常用的shell命令例子 java操作hbase,在eclipse中创建一个java项目,将hbase安装文件根目录的jar包和lib目录下jar包导入项目,然后就可以编写java代码操作hbase了.下面代码给出来一个简单的示例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguoliang.hbase;

Hadoop之——Java操作HBase

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463617 不多说,直接上代码,大家都懂得 package hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbas

Java操作hbase总结

用过以后,总得写个总结,不然,就忘喽. 一.寻找操作的jar包. java操作hbase,首先要考虑到使用hbase的jar包. 因为咱装的是CDH5,比较方便,使用SecureCRT工具,远程连接到你安装的那台服务器上. jar包的存放位置在/opt/cloudera/parcels/CDH/lib/hbase,找到,下载下来. 在当前路径下,有一个lib包,里面是支持hbase的hadoop的jar包,根据需求,可以下载下来. 二.找一个API文档当成手册,哪里不会查哪里 百度分享,http

java操作Hbase实例

所用HBase版本为1.1.2,hadoop版本为2.4 /* * 创建一个students表,并进行相关操作 */ import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apach

java操作hbase样例

hbase安装方法请參考:hbase-0.94安装方法具体解释 hbase经常使用的shell命令请參考:hbase经常使用的shell命令样例 java操作hbase,在eclipse中创建一个java项目.将hbase安装文件根文件夹的jar包和lib文件夹下jar包导入项目,然后就能够编写java代码操作hbase了. 以下代码给出来一个简单的演示样例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguol

Zookeeper——JAVA Client

一.zookeeper简介     一款管理分布式应用程序的协调服务系统 二.zookeeper应用场景          网上也有很多介绍,可以参见    http://blog.csdn.net/xinguan1267/article/details/38422149 本文主要介绍基于java的客户端开发 三.基于JAVA客户端实战 3.1Client      // 创建一个与服务器的连接 需要(服务端的 ip+端口号)(session过期时间)(Watcher监听注册)   ZooKee

新浪微博数据解析与java操作Hbase实例

之前发过一篇开发新浪微博的文章,对于大家比较感兴趣的内容之一便是如何解析新浪微博的JSON. 其实一开始的时候,也遇过一些挫折,比如直接用JsonArray和JsonObject去解析JSON内容的话,是解析不了的. 因为JSON的格式比较固定,像新浪微博返回的JSON内容则是多了一个中括号及statues标签,如下: { "statuses": [ { "created_at": "Tue May 31 17:46:55 +0800 2011"

hbase-0.98整合hadoop-2.6,附java操作代码

cd /opt/hbase-0.98.13-hadoop2/conf vi hbase-env.sh export JAVA_HOME=/opt/jdk1.7.0_75 vi hbase-site.xml <!--设置hbase根目录,master为机器的hostname--><property><name>hbase.rootdir</name><value>hdfs://master:9000/hbase</value></

java 操作 Excel,java导出excel

WritableWorkbook out = null; try { response.getServletResponse().reset(); ((HttpServletResponse) response.getServletResponse()).setHeader("Content-Disposition", "attachment;filename=export.xls"); response.getServletResponse().setConten