session会话机制
client请求和服务端建立连接,服务端会保留和标记当前client的session,包含session过期时间,sessionId,然后服务端开始在session过期时间的基础上倒计时,在这段时间内,client需要向server发送心跳包,目的是然server重置session过期时间
使用quit命令,退出可端,但是server端的session不会立即消失,使用ls / 依然可以看到创建的临时节点
节点的类型:
- 持久节点,不加任何参数,默认创建的是持久节点
- 临时节点: 当客户端断开连接时,这个节点会消失
- 持久顺序节点: 父节点为他的第一级子节点维护一份时序,标记节点的创建顺序,这个标记其实就是一个数字后缀,作为新节点的名字,数字的范围就是整形的最大值(1024*1024)
- 临时顺序节点,同上. (临时节点不能再创建的节点)
创建节点时,可以指定每个节点的data信息,zookeeper默认每个节点的数量的最大上限是1M
常用命令
创建节点:
语法 : create /path data
[zk: localhost:2181(CONNECTED) 2] create /changwu 1
Created /changwu
[zk: localhost:2181(CONNECTED) 3] ls /
[changwu, zookeeper]
创建顺序节点
语法create -s /path data
[zk: localhost:2181(CONNECTED) 9] create -s /seq1 1
Created /seq10000000001
[zk: localhost:2181(CONNECTED) 10] ls /
[changwu, zookeeper, seq10000000001]
创建临时节点 (extra)
语法: create -e /path data
[zk: localhost:2181(CONNECTED) 16] create -e /extra1 1
Created /extra1
[zk: localhost:2181(CONNECTED) 17] ls /
[changwu, extra1, zookeeper, seq10000000001]
当客户端断开连接时,临时节点被删除
获取节点和节点指定的元数据
语法: get /path
[zk: localhost:2181(CONNECTED) 22] get /changwu
1 # data 源数据
cZxid = 0x200000008
ctime = Tue Sep 17 12:06:40 CST 2019
mZxid = 0x200000008
mtime = Tue Sep 17 12:06:40 CST 2019
pZxid = 0x200000008
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
访问临时节点
注意点,访问顺序节点,需要带上 节点的全称
[zk: localhost:2181(CONNECTED) 37] get /seq10000000001
1
cZxid = 0x200000009
ctime = Tue Sep 17 12:08:16 CST 2019
mZxid = 0x200000009
mtime = Tue Sep 17 12:08:16 CST 2019
pZxid = 0x200000009
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
设置元数据
语法: set /path data
[zk: localhost:2181(CONNECTED) 51] ls /
[changwu, extra1, zookeeper, seq10000000001]
[zk: localhost:2181(CONNECTED) 52] set /changwu 2
# 创建znode的事务id
cZxid = 0x200000008
# znode的创建时间
ctime = Tue Sep 17 12:06:40 CST 2019
# 最后修改的znode的事务id
mZxid = 0x20000000b
# znode的最近修改的时间
mtime = Tue Sep 17 12:15:35 CST 2019
# 最后修改/删除/添加znode的事务id
pZxid = 0x200000008
# 对当前znode 子节点 的修改次数
cversion = 0
# 对当前znode节点data的修改的次数
dataVersion = 1
# 对znode的acl修改的次数
aclVersion = 0
# 如果znode是(ephemeral短暂)类型节点,这就是znode所有者的sessionId
# 如果不是(ephemeral短暂)类型,设置为0
ephemeralOwner = 0x0
# znode数据字段的长度
dataLength = 1
# znode子节点数量
numChildren = 0
创建子节点
注意点,创建子节点时添加上父节点的路径
语法: create / 父节点/子节点 子节点data
[zk: localhost:2181(CONNECTED) 67] create /changwu/child1 firstchild
Created /changwu/child1
列出子节点
语法: ls /父节点
[zk: localhost:2181(CONNECTED) 70] ls /changwu
[child1]
查看znode的状态
语法: stat /path
[zk: localhost:2181(CONNECTED) 73] stat /changwu
# 创建znode节点的事务id
cZxid = 0x200000008
# znode节点的创建时间
ctime = Tue Sep 17 12:06:40 CST 2019
# 最后修改znode的事务id
mZxid = 0x20000000b
# znode的最后修改时间
mtime = Tue Sep 17 12:15:35 CST 2019
# 最后修还,添加.删除 znode节点的事务id
pZxid = 0x20000000c
# 对当前znode子节点更改的次数
cversion = 1
# 对znode data更改的次数
dataVersion = 1
# 对znode acl 更改的次数
aclVersion = 0
# 如果znode是epemeral类型的节点,则当前值就是znode所有者的sessionid
# 如果znode 不是 ephemeral 类型的,当前值=0
ephemeralOwner = 0x0
# znode data长度
dataLength = 1
# 子节点的数量
numChildren = 1
移除节点1
语法: delete /path
如果当前节点存在子节点,delete 是删除不掉它的
移除节点2
语法: rmr /path
[zk: localhost:2181(CONNECTED) 87] rmr /
changwu extra1 zookeeper seq10000000001
[zk: localhost:2181(CONNECTED) 87] rmr /extra1
[zk: localhost:2181(CONNECTED) 88] ls /
[changwu, zookeeper, seq10000000001]
什么是ACL(access control list 访问控制列表)
zookeeper在分布式系统中承担中间件的作用,它管理的每一个节点上可能都存储这重要的信息,因为应用可以读取到任意节点,这就可能造成安全问题,ACL的作用就是帮助zookeeper实现权限控制
- zookeeper的权限控制基于节点,每个znode可以有不同的权限
- 子节点不会继承父节点的权限, 访问不了不节点,不代表访问不到子节点
写法: schema??permission
Schema: 鉴权的策略
- world: 只有一个用户 anyone: 所有人:权限
- ip: 使用ip地址认证
- auth: 使用已经添加的认证用户认证
- disgest: 使用"用户名:密码"方式认证
授权对象 ID
权限Permission
- create / c 可以创建子节点
- delete / d 仅可以删除子节点
- read / r 读取当前节点数据,及子节点列表
- write / w 设置节点数据
- admin / a 设置节点的访问控制列表
查看ACL
语法getAcl /parentpath
[zk: localhost:2181(CONNECTED) 94] getAcl /changwu
'world,'anyone
: cdrwa
设置ACL
[zk: localhost:2181(CONNECTED) 102] setAcl /changwu world:anyone:wa
cZxid = 0x200000008
ctime = Tue Sep 17 12:06:40 CST 2019
mZxid = 0x20000000b
mtime = Tue Sep 17 12:15:35 CST 2019
pZxid = 0x20000000c
cversion = 1
dataVersion = 1
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 1
numChildren = 1
再次尝试获取,就没有读权限了,还掉了线
[zk: localhost:2181(CONNECTED) 115] get /changwu
Authentication is not valid : /changwu
添加用户,并登录
[zk: localhost:2181(CONNECTED) 0] addauth digest lisi:123456
使用第四种授权策略
[zk: localhost:2181(CONNECTED) 2] get[zk: localhost:2181(CONNECTED) 0] addauth digest lisi:123456
将这个授权添加到本次会话后就可以读了
[zk: localhost:2181(CONNECTED) 4] get /changwu
2
cZxid = 0x200000008
ctime = Tue Sep 17 12:06:40 CST 2019
mZxid = 0x20000000b
mtime = Tue Sep 17 12:15:35 CST 2019
pZxid = 0x20000000c
cversion = 1
dataVersion = 1
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 1
numChildren = 1
zookeeper的客户端
原生客户端 ZooKeeper
Zookeeper 存在一个问题,session有存在丢弃的可能性
client--网络中断-->server client和server通过网络发送心跳保持沟通,如果因为网络的原因,client发送的心跳包都没有到达server端, server端的session倒计时机制就会把 session 和 watch全部清除
- 原生zookeeper客户端
public static void main(String[] args) throws Exception {
ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.err.println("连接,触发");
}
});
Stat stat = new Stat();
- 创建客户端
/**
* 1. path
* 2. data
* 3. acl 安全模式
* 4. 持久化/临时 策略
*/
client.create("/node2","value of node2".getBytes(), (List<ACL>) ZooDefs.Ids.ANYONE_ID_UNSAFE,CreateMode.PERSISTENT);
- 获取节点数据
client.getData("/node1",true,stat);
- 监听回调
String content = new String( client.getData("/node1", new Watcher() {
@Override
public void process(WatchedEvent event) {
// todo 任何连接上这个节点的客户端修改了这个节点的 data数据,都会引起process函数的回调
// todo 特点1: watch只能使用1次
if (event.getType().equals(Event.EventType.NodeDataChanged)){
System.err.println("当前节点数据发生了改变");
}
}
}, new Stat()));
- 非阻塞,直接回调
client.getData("/node1", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("123123");
}
},null);
Curator
- 提供了多种zookeeper集群leader的选举机制
- 封装了原生的zookeeper client 和 zookeeper server
- Fluent Api 流式api
- java API创建客户端
public static void main(String[] args) throws Exception {
// 初始化客户端
// 总共重试三次,每次一秒
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new RetryNTimes(3,100));
client.start(); // todo instance must be started before calling this method
// 创建一个节点
//client.create().withMode(CreateMode.PERSISTENT).forPath("/node3","value of node3".getBytes());
// 订阅
String path = "/node1";
/**
* todo NodeCache就是对这个path节点内存的缓存
* 第一次执行会触发, 而且,以后每次node被改变都会执行
*/
NodeCache cache = new NodeCache(client,path);
// cache.start(true); 启动时会同步数据到缓存中, 既然数据一致了,启动时,下面的回调就不会触发
// cache.start(false); 默认是false, 不会触发
cache.start();
cache.getListenable().addListener(new NodeCacheListener(){
// todo 监听节点值的改变
@Override
public void nodeChanged() {
System.err.println("nodeChanged 执行了");
}
});
System.in.read();
}
也可以使用仅回调一次的api
// 和原生的客户端一样,仅仅触发一次
client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.err.println("process...");
}
}).forPath(path);
Curator解决session会话失效
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new RetryNTimes(3,100));
client.start(); // todo instance must be started before calling this method
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if(connectionState.equals(ConnectionState.LOST)){
// todo 重新连接
}
}
});
// todo dosomething
}
原文地址:https://www.cnblogs.com/ZhuChangwu/p/11537460.html