Curator源码解析(五)Curator的连接和重试机制

转载请注明出处: jiq?钦‘s
technical Blog

本文将主要关注Curator是如何处理连接丢失和会话终止这两个关键问题的。

1.   连接丢失的处理

Curator中利用类ConnectionState来管理客户端到ZooKeeper集群的连接状态,其中用到原子布尔型变量来标识当前连接是否已经建立:

private
final
AtomicBoolean isConnected=
newAtomicBoolean(false);

在事件处理函数中(ConnectionState实现了Watcher接口)修改isConnected的值:

@Override

publicvoidprocess(WatchedEvent event)

{

//逐个调用parentWatchers容器中的Watcher的process函数

for ( Watcher parentWatcher :
parentWatchers )

{

TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",
tracer.get());

parentWatcher.process(event);

timeTrace.commit();

}

//记录旧连接状态

boolean wasConnected =
isConnected.get();

boolean newIsConnected = wasConnected;

if ( event.getType() ==Watcher.Event.EventType.None )

{

//获取新连接状态

newIsConnected =checkState(event.getState(), wasConnected);

}

//若状态发生变化,则修改

if (
newIsConnected
!= wasConnected )

{

isConnected.set(newIsConnected);

connectionStartMs = System.currentTimeMillis();

}

}

其中checkState函数获取当前连接状态是否为已连接:

private
boolean
checkState(Event.KeeperState state, boolean wasConnected)

{

boolean isConnected = wasConnected;

boolean checkNewConnectionString =
true;

switch ( state )

{

default:

//连接丢失

case Disconnected:

{

isConnected = false;

break;

}

//连接建立

case SyncConnected:

case ConnectedReadOnly:

{

isConnected = true;

break;

}

//验证失败

case AuthFailed:

{

isConnected = false;

log.error("Authentication failed");

break;

}

//会话终止

case Expired:

{

isConnected = false;

checkNewConnectionString = false;

handleExpiredSession();

break;

}

case SaslAuthenticated:

{

// NOP

break;

}

}

if ( checkNewConnectionString &&
zooKeeper.hasNewConnectionString())

{

handleNewConnectionString();

}

return isConnected;

}

若平时发生连接丢失,isConnected(标识当前连接状态)被置为false,ZooKeeper自动重连回来之后isConnected被置为true,所以在平时连接与否无关紧要,但是当发起ZooKeeper操作(like getChildren,get/setData,
create, delete)时,若发生连接丢失的情况,则会抛出ConnectionLossexception,那么Curator这个时候是如何处理的呢?

下面以SetData操作来看,下面是Curator执行SetData操作的代码:

TimeTrace   trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Foreground");

Stat resultStat =RetryLoop.callWithRetry

(

client.getZookeeperClient(),

new Callable<Stat>()

{

@Override

public Stat call()
throws Exception

{

return
client.getZooKeeper().setData(path, data,
version);

}

}

);

可以看到真正的setData操作被包装到了callWithRetry函数中:

public
static
<T>T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)
throws Exception

{

T result = null;

RetryLoop retryLoop =client.newRetryLoop();

while ( retryLoop.shouldContinue() )

{

try

{

//检测当前连接状态,若未连接则等待一定时间直到连接完成

client.internalBlockUntilConnectedOrTimedOut();

//调用带返回值的Callable方法

result = proc.call();

retryLoop.markComplete();

}

catch ( Exception e )

{

retryLoop.takeException(e);

}

}

return result;

}

这个函数其实很简单,步骤如下:

(1)     检测当前是否已连接,若已连接则执行下一句代码,否则等待一定时间;

(2)     执行真正的ZooKeeper操作;

(3)     执行成功,标记为执行完成。

若执行ZooKeeper操作的时候发生任何异常,将会执行takeException函数:

public
void
takeException(Exception exception) throws Exception

{

boolean rethrow =
true;

if (
isRetryException
(exception) )

{

//是否允许继续重试

if (
retryPolicy.allowRetry(retryCount++,System.currentTimeMillis() -
startTimeMs, sleeper))

{

rethrow = false;

}

}

if ( rethrow )

{

throw exception;

}

}

---如果isRetryException函数判断抛出的异常是否是“连接丢失异常/会话终止异常”,如果是则判断是否允许重试(其实传递进来的重试策略就是简单地进行三次重试),允许重试的话就不抛出异常,返回,继续下一轮循环。

---如果isRetryException函数判断不属于“连接丢失异常/会话终止异常”,比如是其他的一些其他异常(create操作可能引起NodeExists 异常, delete操作可能引起NoNode异常),那么将继续把异常抛出,callWithRetry函数将因为异常而结束返回。

这就是Curator处理连接丢失的策略,平时仅仅是通过在watch事件响应函数中记录连接状态isConnected,执行ZooKeeper操作的时候,先等待连接状态isConnected变为true再执行操作,若执行期间若发生异常,仅仅在当异常类型为“连接丢失/会话终止”时进行重试,反复几次。

这种机制个人认为已经足够应付所有场景。

2.   会话终止的处理

和连接丢失一样,我们需要分别来分析平时和执行ZooKeeper操作时发生“会话终止”异常Curator怎么来处理。

还是看ConnectionState类中watch事件响应函数,其中有这么一段代码:

//会话终止

case Expired:

{

isConnected = false;

checkNewConnectionString = false;

handleExpiredSession();

break;

}

关键是看handleExpiredSession函数:

private
void
handleExpiredSession()

{

try

{

reset();

}

catch ( Exception e )

{

queueBackgroundException(e);

}

}

就是一个reset函数:

private
synchronizedvoid
reset()
throws
Exception

{

log.debug("reset");

instanceIndex.incrementAndGet();

isConnected.set(false);

connectionStartMs = System.currentTimeMillis();

zooKeeper.closeAndReset();

zooKeeper.getZooKeeper();  
// initiateconnection

}

关键是看最后两句代码,先是执行HandleHolder的closeAndReset函数:

void closeAndReset()
throws Exception

{

internalClose();

helper = new Helper()

{

private
volatile
ZooKeeper zooKeeperHandle =
null;

private
volatile
String connectionString =
null;

@Override

public ZooKeeper getZooKeeper()
throws Exception

{

synchronized(this)

{

if (
zooKeeperHandle == null)

{

connectionString =
ensembleProvider.getConnectionString();

zooKeeperHandle =
zookeeperFactory.newZooKeeper(connectionString,
sessionTimeout, watcher,
canBeReadOnly);

}

helper =
new
Helper()

{

@Override

public ZooKeepergetZooKeeper()
throwsException

{

return
zooKeeperHandle;

}

@Override

public StringgetConnectionString()

{

return
connectionString;

}

};

return
zooKeeperHandle;

}

}

@Override

public String getConnectionString()

{

return
connectionString;

}

};

}

如果对这个函数不清楚,可以回过头看看之前文章讲的Curator的初始化和启动的源码分析,HandleHolder是原生ZooKeeper对象的持有者,维护ZooKeeper对象的单例就是通过这个函数。

一旦发生会话终止异常,ZooKeeper句柄会被自动关闭,所以之前初始化的helper对象中的zooKeeperHandle变量将会变得不可用,所以需要调用这个closeAndReset函数重新初始化helper对象,然后再调用一次getZooKeeper函数执行zookeeperFactory.newZooKeeper初始化好ZooKeeper句柄。

注意:会话终止后ZooKeeper句柄会被自动关闭,但并不是被置为null了,所以在用原来的helper对象的getZooKeeper方法返回的句柄是不可用的。

再看看当执行ZooKeeper操作时发生了会话终止时怎么处理。如果执行ZooKeeper时发生了会话终止,watch事件响应函数中会中心构建ZooKeeper句柄,callWithRetry函数中不但判断当前发生的是连接丢失异常时会进行重试,判断是会话终止异常也会进行重试。

所以说Curator处理会话终止的方法,就是在收到Expired事件时候重新构建HandleHolder中维护的ZooKeeper句柄。

关于临时节点和watch事件

特别注意,有的应用程序创建的临时节点和注册的watch事件至关重要,无法忍受丢失的情况,若发生会话终止,它们必定会被ZooKeeper服务器端删除掉,并且Curator无法帮助你重新还原回来。

这个时候就需要应用程序自己处理,在收到会话终止异常之后,重新注册关键的watch事件,以及重新创建关键的临时节点。

时间: 2024-12-28 10:45:46

Curator源码解析(五)Curator的连接和重试机制的相关文章

Spring 源码解析之DispatcherServlet源码解析(五)

Spring 源码解析之DispatcherServlet源码解析(五) 前言 本文需要有前四篇文章的基础,才能够清晰易懂,有兴趣可以先看看详细的流程,这篇文章可以说是第一篇文章,也可以说是前四篇文章的的汇总,Spring的整个请求流程都是围绕着DispatcherServlet进行的 类结构图 根据类的结构来说DispatcherServlet本身也是继承了HttpServlet的,所有的请求都是根据这一个Servlet来进行转发的,同时解释了为什么需要在web.xml进行如下配置,因为Spr

Curator源码解析(四)ZooKeeper存在的连接问题

都说Curator的连接机制比较牛逼,所以在分析Curator的连接和重试机制之前,我想先搞清楚原生的ZooKeeper的连接存在哪些问题. 下面是我查阅资料总结的结果,转载请注明出处: jiq?钦's technical Blog Curator虽然提供所谓的高层抽象API来简化了ZooKeeper的使用,但更重要的是封装了管理到ZooKeeper集群的连接以及重试机制的复杂性,下面我们来详细分析一下Curator在这方面都是怎么做的,不过在这之前先要搞清楚ZooKeeper目前在连接方面有哪

Curator源码解析(一)源码结构和测试程序

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类

Curator源码解析(三)访问接口分析

接着上一篇,将分析测试程序中的访问接口部分. 2调用ZooKeeper访问接口 初始化和启动分析完了,操作接口调用代码如下: String path = ZKPaths.makePath(PATH, name); byte[] bytes =args[1].getBytes(); try { client.setData().forPath(path,bytes); } catch (KeeperException.NoNodeException e ) { client.create().cr

Tomcat请求处理过程(Tomcat源码解析五)

前面已经分析完了Tomcat的启动和关闭过程,本篇就来接着分析一下Tomcat中请求的处理过程. 在开始本文之前,咋们首先来看看一个Http请求处理的过程,一般情况下是浏览器发送http请求->建立Socket连接->通过Socket读取数据->根据http协议解析数据->调用后台服务完成响应,详细的流程图如上图所示,等读者读完本篇,应该就清楚了上图所表达的意思.Tomcat既是一个HttpServer也是一个Servlet 容器,那么这里必然也涉及到如上过程,首先根据HTTP协议

iOS即时通讯之CocoaAsyncSocket源码解析五

接上篇:iOS即时通讯之CocoaAsyncSocket源码解析四         原文 正文待补...

Istio技术与实践01: 源码解析之Pilot多云平台服务发现机制

服务模型 首先,Istio作为一个(微)服务治理的平台,和其他的微服务模型一样也提供了Service,ServiceInstance这样抽象服务模型.如Service的定义中所表达的,一个服务有一个全域名,可以有一个或多个侦听端口. type Service struct { // Hostname of the service, e.g. "catalog.mystore.com" Hostname Hostname `json:"hostname"` Addre

Curator源码解析(二)初始化和启动分析

上一篇文章这里已经列出了Curator的一个使用的例子,这篇文章将详细分析其初始化和启动部分. 测试程序分析 1      初始化和启动 (1) newClient方法返回CuratorFramework接口对象: public staticCuratorFramework newClient(String connectString, int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retryPolicy) { return b

AFNetworking (3.1.0) 源码解析 &lt;五&gt;

这次主要开始讲解一下文件夹Serialization下的类AFURLRequestSerialization. AFURLRequestSerialization类遵守`AFURLRequestSerialization`和`AFURLResponseSerialization`协议,提供一个查询字符串/表单编码的参数序列化和默认请求头的具体的基本的实现,以及响应状态代码和内容类型验证.也就是对发出的请求进行一些处理. 处理HTTP的任何请求或响应序列化被鼓励归入“AFHTTPRequestSe