Curator源码解析(二)初始化和启动分析

上一篇文章这里已经列出了Curator的一个使用的例子,这篇文章将详细分析其初始化和启动部分。

测试程序分析

1      初始化和启动

(1) newClient方法返回CuratorFramework接口对象:

public staticCuratorFramework newClient(String connectString, int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retryPolicy)
   {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
           connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }

看看builder()方法:

   //返回用于构建CuratorFramework的新的builder对象
   publicstaticBuilder builder()
   {
        return new Builder();
}

可以看到这个方法返回一个构建CuratorFramework的Builder。Builder类就在CuratorFrameworkFactory.java文件中。

        //设置连接到的ZooKeeper集群的地址列表
        public BuilderconnectString(String connectString)
        {
            ensembleProvider =newFixedEnsembleProvider(connectString);
            return this;
        }

前面方法都是设置当前对象的属性,然后将当前Builder对象返回,设置的属性可以看到包括这些:

private EnsembleProvider   ensembleProvider;
        private int                 sessionTimeoutMs =DEFAULT_SESSION_TIMEOUT_MS;
        private int                 connectionTimeoutMs =DEFAULT_CONNECTION_TIMEOUT_MS;
        private int                 maxCloseWaitMs =DEFAULT_CLOSE_WAIT_MS;
        private RetryPolicy        retryPolicy;
        private ThreadFactory      threadFactory =null;
        private String             namespace;
        private String             authScheme =null;
        private byte[]              authValue =null;
        private byte[]              defaultData =LOCAL_ADDRESS;
        private CompressionProvidercompressionProvider=DEFAULT_COMPRESSION_PROVIDER;
        private ZookeeperFactory   zookeeperFactory =DEFAULT_ZOOKEEPER_FACTORY;
        private ACLProvider        aclProvider =DEFAULT_ACL_PROVIDER;
        private boolean             canBeReadOnly =false;

主要看最后一个build()方法:

//使用当前的Builder对象构建一个CuratorFramework接口对象
        public CuratorFramework build()
        {
            return new CuratorFrameworkImpl(this);
    }

可以看到创建一个CuratorFrameworkImpl实例,将当前Builder对象传递进去。

CuratorFrameworkImpl类是CuratorFramework接口的实现类,看看其构造函数:

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
   {
        ZookeeperFactory localZookeeperFactory= makeZookeeperFactory(builder.getZookeeperFactory());
        this.client =new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(),builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(),new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null);
               processEvent(event);
            }
        }, builder.getRetryPolicy(),builder.canBeReadOnly());

        listeners = new ListenerContainer<CuratorListener>();
        unhandledErrorListeners =newListenerContainer<UnhandledErrorListener>();
       backgroundOperations = newDelayQueue<OperationAndData<?>>();
        namespace = new NamespaceImpl(this, builder.getNamespace());
        threadFactory = getThreadFactory(builder);
        maxCloseWaitMs = builder.getMaxCloseWaitMs();
        connectionStateManager =new ConnectionStateManager(this,builder.getThreadFactory());
        compressionProvider =builder.getCompressionProvider();
        aclProvider = builder.getAclProvider();
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);

        byte[] builderDefaultData =builder.getDefaultData();
        defaultData = (builderDefaultData !=null) ? Arrays.copyOf(builderDefaultData,builderDefaultData.length):newbyte[0];

        if ( builder.getAuthScheme() !=null )
        {
            authInfo.set(new AuthInfo(builder.getAuthScheme(),builder.getAuthValue()));
        }

        failedDeleteManager = new FailedDeleteManager(this);
        namespaceFacadeCache =new NamespaceFacadeCache(this);
}

CuratorFrameworkImpl主要是对CuratorZookeeperClient的封装,所以我们主要看构造函数中第二句代码是如何构建CuratorZookeeperClient对象的。构造函数中除了参数1和参数5,其他参数都是来自builder对象,参数1是localZookeeperFactory,通过下面方法构造:

ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());

下面是makeZookeeperFactory()方法的实现代码:

private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
   {
        return new ZookeeperFactory()
        {
            @Override
            public ZooKeeper newZooKeeper(StringconnectString,intsessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception
            {
                ZooKeeper zooKeeper =actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher,canBeReadOnly);
                AuthInfo auth = authInfo.get();
                if ( auth !=null )
                {
                    zooKeeper.addAuthInfo(auth.scheme, auth.auth);
                }

                return zooKeeper;
            }
        };
}

传递进来的是builder中定义的ZookeeperFactory对象,实际上就是Curator提供的DefaultZookeeperFactory类,定义如下:

public classDefaultZookeeperFactory implements ZookeeperFactory
{
   @Override
   publicZooKeeper newZooKeeper(String connectString,int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception
   {
        return new ZooKeeper(connectString, sessionTimeout,watcher, canBeReadOnly);
   }
}

仅仅是简单地new出一个原生的ZooKeeper对象,所以传递到CuratorZookeeperClient构造函数中的ZookeeperFactory类的newZooKeeper返回的是原生的ZooKeeper对象。

参数5是一个Watcher对象,其中事件响应函数process()又调用了processEvent()方法:

private void processEvent(finalCuratorEvent curatorEvent)
   {
        if ( curatorEvent.getType() ==CuratorEventType.WATCHED )
        {
           validateConnection(curatorEvent.getWatchedEvent().getState());
        }

        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListenerlistener)
            {
                try
                {
                    TimeTrace trace = client.startTracer("EventListener");
                   listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    logError("Event listener threw exception", e);
                }
                returnnull;
            }
       });
}

这个watcher什么时候被调用?作用是什么?这个问题稍后再解答。

接着深入进去看的CuratorZookeeperClient类的构造函数:

   publicCuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProviderensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy,boolean canBeReadOnly)
   {
        this.connectionTimeoutMs = connectionTimeoutMs;
        state = new ConnectionState(zookeeperFactory,ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher,tracer, canBeReadOnly);
        setRetryPolicy(retryPolicy);
    }

主要在构造函数的最后两行初始化下面两个成员变量:

private finalConnectionState state;

private finalAtomicReference<RetryPolicy> retryPolicy =newAtomicReference<RetryPolicy>();

继续看ConnectionState的构造函数:

ConnectionState(ZookeeperFactoryzookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs,Watcher parentWatcher, AtomicReference<TracerDriver> tracer,boolean canBeReadOnly)
   {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        if ( parentWatcher !=null )
        {
            parentWatchers.offer(parentWatcher);
        }

        zooKeeper = new HandleHolder(zookeeperFactory,this, ensembleProvider,sessionTimeoutMs, canBeReadOnly);
}

做了两件事情,一个是把传递进来的watcher对象放入parentWatchers容器中,一个是new出HandleHolder对象,注意这里讲当前ConnectionState对象作为watcher参数传递到HandleHolder构造函数。继续看HandleHolder构造函数:

HandleHolder(ZookeeperFactoryzookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider,int sessionTimeout,boolean canBeReadOnly)
   {
        this.zookeeperFactory = zookeeperFactory;
        this.watcher = watcher;
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeout = sessionTimeout;
        this.canBeReadOnly = canBeReadOnly;
}

简单字段赋值,这个类是ZooKeeper对象的持有者,其中包含两个关键函数:

ZooKeeper getZooKeeper() throws Exception
   {
        return (helper !=null) ?helper.getZooKeeper() : null;
}

void closeAndReset()throws Exception
   {
        internalClose();

        // first helper is synchronized when getZooKeeper is called. Subsequentcalls
        // are not synchronized.
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle =null;
            private volatile String connectionString =null;

            @Override
            public ZooKeeper getZooKeeper()throws Exception
            {
                synchronized(this)
                {
                    if (zooKeeperHandle ==null)
                    {
                        connectionString =ensembleProvider.getConnectionString();
                        zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout,watcher,canBeReadOnly);
                    }

                    helper = newHelper()
                    {
                        @Override
                        public ZooKeepergetZooKeeper()throwsException
                        {
                            returnzooKeeperHandle;
                        }

                        @Override
                        public StringgetConnectionString()
                        {
                            returnconnectionString;
                        }
                    };

                    returnzooKeeperHandle;
                }
            }

            @Override
            public String getConnectionString()
            {
                returnconnectionString;
            }
        };
}

可以看到这个类提供了一个getZooKeeper()方法,返回ZooKeeper对象,closeAndReset()方法是对helper对象的初始化,Helper中的getZooKeeper()方法返回的是ZooKeeper对象的单例,保障一个HandleHolder只会持有一个ZooKeeper对象。

总结一下,通过CuratorFrameworkFactory类的newClient()方法将会返回一个实现了CuratorFramework接口的实现类CuratorFrameworkImpl的对象,这个对象中包含一个CuratorZookeeperClient对象,里面又包含一个ConnectionState对象,再里面又包含一个HandleHolder对象,这个对象通过从最外层逐层传递进来的DefaultZookeeperFactory对象获取原生ZooKeeper对象,并以单例进行维护,每一层都有一个getZooKeeper()方法,在外面调用会最终到HandleHolder这里来取得一个ZooKeeper对象。

这里面HandleHolder是ZooKeeper对象的持有者,外层封装的ConnectionState类是核心,管理ZooKeeper的连接状态,响应ZooKeeper的watch回调事件。这个回调函数是创建HandleHolder对象时将自己传递进去注册的。

初始化这一步骤注册的真正的原生ZooKeeper对象的watcher响应事件是ConnectionState类中的process()函数,我们看看这个函数:

@Override
   publicvoidprocess(WatchedEvent event)
   {
       //逐个调用parentWatchers容器中的Watcher的process函数
        for ( Watcher parentWatcher :parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }

        boolean wasConnected =isConnected.get();
        boolean newIsConnected = wasConnected;
        if ( event.getType() ==Watcher.Event.EventType.None )
        {
            newIsConnected =checkState(event.getState(), wasConnected);
        }

        //若当前连接状态不为false,则真正设置isConnected = true
        if ( newIsConnected != wasConnected )
        {
            isConnected.set(newIsConnected);
            connectionStartMs = System.currentTimeMillis();
        }
   }

这个watch事件响应函数主要做两件事:

(1)将parentWatchers容器中的所有Watcher都调用一次;

(2)检查并更新ConnectionState类中维护的ZooKeeper的连接状态isConnected。

那么parentWatchers容器中有哪些Watcher呢,目前只有CuratorFrameworkImpl构造函数中初始化CuratorZookeeperClient对象时传递进去的Watcher,如下所示:

this.client=newCuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(), builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null);
                processEvent(event);
            }
        }, builder.getRetryPolicy(),builder.canBeReadOnly());

其中的processEvent()函数实际上是将CuratorListener列表中的所有事件响应函数全部调用一次,这个和异步执行ZooKeeper操作相关,具体不介绍了,详细可以参考这个例子:

public staticvoid     setDataAsync(CuratorFramework client,String path,byte[]payload)throwsException
   {
        // this is one method of getting event/async notifications
        CuratorListener listener =newCuratorListener()
        {
            @Override
            public void eventReceived(CuratorFramework client,CuratorEvent event) throws Exception
            {
                // examine event for details
            }
        };
       client.getCuratorListenable().addListener(listener);

        // set data for the given node asynchronously. The completion notification
        // is done via the CuratorListener.
       client.setData().inBackground().forPath(path, payload);
}

(2)CuratorFramework 的start()方法启动:

CuratorFramework的start方法会调用CuratorZookeeperClient对象的start方法,内部又调用ConnectionState的start方法,最后ConnectionState的start方法调用一个reset方法:

private synchronizedvoidreset() throwsException
{
    isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        zooKeeper.closeAndReset();
        zooKeeper.getZooKeeper();  // initiateconnection
}

主要是最后两句代码,调用HandleHolder类对象zooKeeper的closeAndReset方法是为了实例化获取ZooKeeper对象的Helper对象,调用一次getZooKeeper方法是为了先第一次实例化好ZooKeeper对象,提高之后调用访问接口时的性能。

时间: 2024-10-31 02:27:21

Curator源码解析(二)初始化和启动分析的相关文章

Spring 源码解析之HandlerAdapter源码解析(二)

Spring 源码解析之HandlerAdapter源码解析(二) 前言 看这篇之前需要有Spring 源码解析之HandlerMapping源码解析(一)这篇的基础,这篇主要是把请求流程中的调用controller流程单独拿出来了 解决上篇文章遗留的问题 getHandler(processedRequest) 这个方法是如何查找到对应处理的HandlerExecutionChain和HandlerMapping的,比如说静态资源的处理和请求的处理肯定是不同的HandlerMapping ge

SpringMVC源码解析- HandlerAdapter初始化

HandlerAdapter初始化时,主要是进行注解解析器初始化注册;返回值处理类初始化;全局注解@ControllerAdvice内容读取并缓存. 目录: 注解解析器初始化注册:@ModelAttribute(往model中添加属性) 注解解析器初始化注册:@InitBinder(用于注册校验器,参数编辑器等) 返回值处理returnValueHandlers初始化 全局的@ControllerAdvice注解使用类的@ModelAttribute 和 @InitBinder信息读取并缓存 注

chenglei1986/DatePicker源码解析(二)

接上一篇文章chenglei1986/DatePicker源码解析(一),我们继续将剩余的部分讲完,其实剩余的内容,就是利用Numberpicker来组成一个datePicker,代码非常的简单 为了实现自定义布局的效果,我们给Datepciker定制了一个layout,大家可以定制自己的layout <?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="h

Curator源码解析(一)源码结构和测试程序

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类

Curator源码解析(三)访问接口分析

接着上一篇,将分析测试程序中的访问接口部分. 2调用ZooKeeper访问接口 初始化和启动分析完了,操作接口调用代码如下: String path = ZKPaths.makePath(PATH, name); byte[] bytes =args[1].getBytes(); try { client.setData().forPath(path,bytes); } catch (KeeperException.NoNodeException e ) { client.create().cr

erlang下lists模块sort(排序)方法源码解析(二)

上接erlang下lists模块sort(排序)方法源码解析(一),到目前为止,list列表已经被分割成N个列表,而且每个列表的元素是有序的(从大到小) 下面我们重点来看看mergel和rmergel模块,因为我们先前主要分析的split_1_*对应的是rmergel,我们先从rmergel查看,如下 ....................................................... split_1(X, Y, [], R, Rs) -> rmergel([[Y, X

Curator源码解析(五)Curator的连接和重试机制

转载请注明出处: jiq?钦's technical Blog 本文将主要关注Curator是如何处理连接丢失和会话终止这两个关键问题的. 1.   连接丢失的处理 Curator中利用类ConnectionState来管理客户端到ZooKeeper集群的连接状态,其中用到原子布尔型变量来标识当前连接是否已经建立: private finalAtomicBoolean isConnected= newAtomicBoolean(false); 在事件处理函数中(ConnectionState实现

volley源码解析(二)--Request&lt;T&gt;类的介绍

在上一篇文章中,我们已经提到volley的使用方式和设计的整体思路,从这篇文章开始,我就要结合具体的源码来给大家说明volley功能的具体实现. 我们第一个要介绍的类是Request<T>这个一个抽象类,我将Request称为一个请求,通过继承Request<T>来自定义request,为volley提供了更加灵活的接口. Request<T>中的泛型T,是指解析response以后的结果.在上一篇文章中我们知道,ResponseDelivery会把response分派

Mybatis 源码解析(二) - Configuration.xml解析

文章个人学习源码所得,若存在不足或者错误之处,请大家指出. 上一章中叙述了Configuration.xml流化到Mybatis内存中的过程,那么接下来肯定就是Configuration.xml文件解析操作,在Mybatis中,这个解析的操作由SqlSesssionFactoryBuilder负责.接下来我们看看SqlSessionFactoryBuilder的方法签名: SqlSessionFactoryBuilder提供了9个签名方法,其中前8个方法都是Configuration.xml的解