上一篇文章这里已经列出了Curator的一个使用的例子,这篇文章将详细分析其初始化和启动部分。
测试程序分析
1 初始化和启动
(1) newClient方法返回CuratorFramework接口对象:
public staticCuratorFramework newClient(String connectString, int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retryPolicy) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). build(); }
看看builder()方法:
//返回用于构建CuratorFramework的新的builder对象 publicstaticBuilder builder() { return new Builder(); }
可以看到这个方法返回一个构建CuratorFramework的Builder。Builder类就在CuratorFrameworkFactory.java文件中。
//设置连接到的ZooKeeper集群的地址列表 public BuilderconnectString(String connectString) { ensembleProvider =newFixedEnsembleProvider(connectString); return this; }
前面方法都是设置当前对象的属性,然后将当前Builder对象返回,设置的属性可以看到包括这些:
private EnsembleProvider ensembleProvider; private int sessionTimeoutMs =DEFAULT_SESSION_TIMEOUT_MS; private int connectionTimeoutMs =DEFAULT_CONNECTION_TIMEOUT_MS; private int maxCloseWaitMs =DEFAULT_CLOSE_WAIT_MS; private RetryPolicy retryPolicy; private ThreadFactory threadFactory =null; private String namespace; private String authScheme =null; private byte[] authValue =null; private byte[] defaultData =LOCAL_ADDRESS; private CompressionProvidercompressionProvider=DEFAULT_COMPRESSION_PROVIDER; private ZookeeperFactory zookeeperFactory =DEFAULT_ZOOKEEPER_FACTORY; private ACLProvider aclProvider =DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly =false;
主要看最后一个build()方法:
//使用当前的Builder对象构建一个CuratorFramework接口对象 public CuratorFramework build() { return new CuratorFrameworkImpl(this); }
可以看到创建一个CuratorFrameworkImpl实例,将当前Builder对象传递进去。
CuratorFrameworkImpl类是CuratorFramework接口的实现类,看看其构造函数:
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { ZookeeperFactory localZookeeperFactory= makeZookeeperFactory(builder.getZookeeperFactory()); this.client =new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(),builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(),new Watcher() { @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null); processEvent(event); } }, builder.getRetryPolicy(),builder.canBeReadOnly()); listeners = new ListenerContainer<CuratorListener>(); unhandledErrorListeners =newListenerContainer<UnhandledErrorListener>(); backgroundOperations = newDelayQueue<OperationAndData<?>>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); connectionStateManager =new ConnectionStateManager(this,builder.getThreadFactory()); compressionProvider =builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); byte[] builderDefaultData =builder.getDefaultData(); defaultData = (builderDefaultData !=null) ? Arrays.copyOf(builderDefaultData,builderDefaultData.length):newbyte[0]; if ( builder.getAuthScheme() !=null ) { authInfo.set(new AuthInfo(builder.getAuthScheme(),builder.getAuthValue())); } failedDeleteManager = new FailedDeleteManager(this); namespaceFacadeCache =new NamespaceFacadeCache(this); }
CuratorFrameworkImpl主要是对CuratorZookeeperClient的封装,所以我们主要看构造函数中第二句代码是如何构建CuratorZookeeperClient对象的。构造函数中除了参数1和参数5,其他参数都是来自builder对象,参数1是localZookeeperFactory,通过下面方法构造:
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
下面是makeZookeeperFactory()方法的实现代码:
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory) { return new ZookeeperFactory() { @Override public ZooKeeper newZooKeeper(StringconnectString,intsessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception { ZooKeeper zooKeeper =actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher,canBeReadOnly); AuthInfo auth = authInfo.get(); if ( auth !=null ) { zooKeeper.addAuthInfo(auth.scheme, auth.auth); } return zooKeeper; } }; }
传递进来的是builder中定义的ZookeeperFactory对象,实际上就是Curator提供的DefaultZookeeperFactory类,定义如下:
public classDefaultZookeeperFactory implements ZookeeperFactory { @Override publicZooKeeper newZooKeeper(String connectString,int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception { return new ZooKeeper(connectString, sessionTimeout,watcher, canBeReadOnly); } }
仅仅是简单地new出一个原生的ZooKeeper对象,所以传递到CuratorZookeeperClient构造函数中的ZookeeperFactory类的newZooKeeper返回的是原生的ZooKeeper对象。
参数5是一个Watcher对象,其中事件响应函数process()又调用了processEvent()方法:
private void processEvent(finalCuratorEvent curatorEvent) { if ( curatorEvent.getType() ==CuratorEventType.WATCHED ) { validateConnection(curatorEvent.getWatchedEvent().getState()); } listeners.forEach(new Function<CuratorListener, Void>() { @Override public Void apply(CuratorListenerlistener) { try { TimeTrace trace = client.startTracer("EventListener"); listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent); trace.commit(); } catch ( Exception e ) { logError("Event listener threw exception", e); } returnnull; } }); }
这个watcher什么时候被调用?作用是什么?这个问题稍后再解答。
接着深入进去看的CuratorZookeeperClient类的构造函数:
publicCuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProviderensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy,boolean canBeReadOnly) { this.connectionTimeoutMs = connectionTimeoutMs; state = new ConnectionState(zookeeperFactory,ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher,tracer, canBeReadOnly); setRetryPolicy(retryPolicy); }
主要在构造函数的最后两行初始化下面两个成员变量:
private finalConnectionState state;
private finalAtomicReference<RetryPolicy> retryPolicy =newAtomicReference<RetryPolicy>();
继续看ConnectionState的构造函数:
ConnectionState(ZookeeperFactoryzookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs,Watcher parentWatcher, AtomicReference<TracerDriver> tracer,boolean canBeReadOnly) { this.ensembleProvider = ensembleProvider; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; this.tracer = tracer; if ( parentWatcher !=null ) { parentWatchers.offer(parentWatcher); } zooKeeper = new HandleHolder(zookeeperFactory,this, ensembleProvider,sessionTimeoutMs, canBeReadOnly); }
做了两件事情,一个是把传递进来的watcher对象放入parentWatchers容器中,一个是new出HandleHolder对象,注意这里讲当前ConnectionState对象作为watcher参数传递到HandleHolder构造函数。继续看HandleHolder构造函数:
HandleHolder(ZookeeperFactoryzookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider,int sessionTimeout,boolean canBeReadOnly) { this.zookeeperFactory = zookeeperFactory; this.watcher = watcher; this.ensembleProvider = ensembleProvider; this.sessionTimeout = sessionTimeout; this.canBeReadOnly = canBeReadOnly; }
简单字段赋值,这个类是ZooKeeper对象的持有者,其中包含两个关键函数:
ZooKeeper getZooKeeper() throws Exception { return (helper !=null) ?helper.getZooKeeper() : null; } void closeAndReset()throws Exception { internalClose(); // first helper is synchronized when getZooKeeper is called. Subsequentcalls // are not synchronized. helper = new Helper() { private volatile ZooKeeper zooKeeperHandle =null; private volatile String connectionString =null; @Override public ZooKeeper getZooKeeper()throws Exception { synchronized(this) { if (zooKeeperHandle ==null) { connectionString =ensembleProvider.getConnectionString(); zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout,watcher,canBeReadOnly); } helper = newHelper() { @Override public ZooKeepergetZooKeeper()throwsException { returnzooKeeperHandle; } @Override public StringgetConnectionString() { returnconnectionString; } }; returnzooKeeperHandle; } } @Override public String getConnectionString() { returnconnectionString; } }; }
可以看到这个类提供了一个getZooKeeper()方法,返回ZooKeeper对象,closeAndReset()方法是对helper对象的初始化,Helper中的getZooKeeper()方法返回的是ZooKeeper对象的单例,保障一个HandleHolder只会持有一个ZooKeeper对象。
总结一下,通过CuratorFrameworkFactory类的newClient()方法将会返回一个实现了CuratorFramework接口的实现类CuratorFrameworkImpl的对象,这个对象中包含一个CuratorZookeeperClient对象,里面又包含一个ConnectionState对象,再里面又包含一个HandleHolder对象,这个对象通过从最外层逐层传递进来的DefaultZookeeperFactory对象获取原生ZooKeeper对象,并以单例进行维护,每一层都有一个getZooKeeper()方法,在外面调用会最终到HandleHolder这里来取得一个ZooKeeper对象。
这里面HandleHolder是ZooKeeper对象的持有者,外层封装的ConnectionState类是核心,管理ZooKeeper的连接状态,响应ZooKeeper的watch回调事件。这个回调函数是创建HandleHolder对象时将自己传递进去注册的。
初始化这一步骤注册的真正的原生ZooKeeper对象的watcher响应事件是ConnectionState类中的process()函数,我们看看这个函数:
@Override publicvoidprocess(WatchedEvent event) { //逐个调用parentWatchers容器中的Watcher的process函数 for ( Watcher parentWatcher :parentWatchers ) { TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get()); parentWatcher.process(event); timeTrace.commit(); } boolean wasConnected =isConnected.get(); boolean newIsConnected = wasConnected; if ( event.getType() ==Watcher.Event.EventType.None ) { newIsConnected =checkState(event.getState(), wasConnected); } //若当前连接状态不为false,则真正设置isConnected = true if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); } }
这个watch事件响应函数主要做两件事:
(1)将parentWatchers容器中的所有Watcher都调用一次;
(2)检查并更新ConnectionState类中维护的ZooKeeper的连接状态isConnected。
那么parentWatchers容器中有哪些Watcher呢,目前只有CuratorFrameworkImpl构造函数中初始化CuratorZookeeperClient对象时传递进去的Watcher,如下所示:
this.client=newCuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(), builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),new Watcher() { @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null); processEvent(event); } }, builder.getRetryPolicy(),builder.canBeReadOnly());
其中的processEvent()函数实际上是将CuratorListener列表中的所有事件响应函数全部调用一次,这个和异步执行ZooKeeper操作相关,具体不介绍了,详细可以参考这个例子:
public staticvoid setDataAsync(CuratorFramework client,String path,byte[]payload)throwsException { // this is one method of getting event/async notifications CuratorListener listener =newCuratorListener() { @Override public void eventReceived(CuratorFramework client,CuratorEvent event) throws Exception { // examine event for details } }; client.getCuratorListenable().addListener(listener); // set data for the given node asynchronously. The completion notification // is done via the CuratorListener. client.setData().inBackground().forPath(path, payload); }
(2)CuratorFramework 的start()方法启动:
CuratorFramework的start方法会调用CuratorZookeeperClient对象的start方法,内部又调用ConnectionState的start方法,最后ConnectionState的start方法调用一个reset方法:
private synchronizedvoidreset() throwsException { isConnected.set(false); connectionStartMs = System.currentTimeMillis(); zooKeeper.closeAndReset(); zooKeeper.getZooKeeper(); // initiateconnection }
主要是最后两句代码,调用HandleHolder类对象zooKeeper的closeAndReset方法是为了实例化获取ZooKeeper对象的Helper对象,调用一次getZooKeeper方法是为了先第一次实例化好ZooKeeper对象,提高之后调用访问接口时的性能。