接着上一篇文章,继续分析和Watcher相关的类的源码。
ClientWatchManager
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
该接口,只有一个方法,需要实现。该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
ZKWatchManager
1、ZKWatchManager是ZooKeeper的内部类,实现了ClientWatchManager。
2、ZKWatchManager定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。
static class ZKWatchManager implements ClientWatchManager {
//数据变化的watchers
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
//节点存在与否的watchers
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
//子节点变化的watchers
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
3、materialize方法
该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
//返回结果集合
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {//事件类型
case None://无类型
//添加默认watcher
result.add(defaultWatcher);
//根据disableAutoWatchReset和Zookeeper的状态是否为同步连接判断是否需要清空
boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
//针对3个不同的watcherMap进行操作
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
// 添加至结果集合
result.addAll(ws);
}
if (clear) { // 是否需要清空
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:// 节点数据变化
case NodeCreated:// 创建节点
synchronized (dataWatches) {
//移除clientPath对应的Watcher后全部添加至结果集合
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
//移除clientPath对应的Watcher后全部添加至结果集合
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged: // 节点子节点变化
synchronized (childWatches) {
// 移除clientPath对应的Watcher后全部添加至结果集合
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:// 删除节点
synchronized (dataWatches) {
// 移除clientPath对应的Watcher后全部添加至结果集合
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn‘t be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn‘t happen!");
}
}
synchronized (childWatches) {
//移除clientPath对应的Watcher后全部添加至结果集合
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
}
原文地址:https://blog.51cto.com/janephp/2455948
时间: 2024-10-03 11:05:44