ZooKeeperNet源码解析

转载请注明出处:jiq?钦‘s technical Blog

ZooKeeperNet是ZooKeeper的.NET客户端,下载地址:https://github.com/ewhauser/zookeeper

测试程序

ZooKeeper zkClient =
new ZooKeeper(URL,new
TimeSpan(0, 0, 0, 10000), watcher);

if(zkClient.Exists("/config/configJ",null) ==null)

{

zkClient.Create("/config/configJ", json.GetBytes(),Ids.OPEN_ACL_UNSAFE,
CreateMode.Persistent);

}

初始化和启动

首先看ZooKeeper构造函数:

public ZooKeeper(string connectstring,TimeSpan sessionTimeout,
IWatcherwatcher, long sessionId,
byte[] sessionPasswd)

{

LOG.Info(string.Format("Initiating client connection, connectstring={0}sessionTimeout={1} watcher={2} sessionId={3} sessionPasswd={4}",connectstring, sessionTimeout,
watcher, sessionId, (sessionPasswd == null ?
"<null>": "<hidden>")));

//之后连接状态发生变化后会放入ClientConnectionEventConsumer回调

watchManager.defaultWatcher =watcher;

cnxn = newClientConnection(connectstring,sessionTimeout,this, watchManager, sessionId,sessionPasswd);

cnxn.Start();

}

创建了一个ClientConnection对象,继续看其构造函数:

public ClientConnection(string hosts,TimeSpan sessionTimeout,
ZooKeeperzooKeeper,
ZKWatchManager watcher,long sessionId,
byte[]sessionPasswd)

{

this.hosts= hosts;

this.zooKeeper= zooKeeper;

this.watcher= watcher;

SessionTimeout = sessionTimeout;

SessionId = sessionId;

SessionPassword = sessionPasswd;

// parseout chroot, if any

hosts = SetChrootPath();

GetHosts(hosts);

SetTimeouts(sessionTimeout);

CreateConsumer();

CreateProducer();

}

关键是看最后两句代码,实现代码为:

private
void CreateConsumer()

{

consumer = newClientConnectionEventConsumer(this);

}

private
voidCreateProducer()

{

producer = newClientConnectionRequestProducer(this);

}

这两个是什么东西?

(1)先看ClientConnectionEventConsumer:

public ClientConnectionEventConsumer(ClientConnectionconn)

{

this.conn= conn;

eventThread = new
Thread(new SafeThreadStart(PollEvents).Run){ Name ="ZK-EventThread " +conn.zooKeeper.Id, IsBackground =true };

}

创建了一个线程,线程的主体是:

public void PollEvents()

{

try

{

while(!waitingEvents.IsCompleted)

{

object@event = waitingEvents.Take();

try

{

if (@eventis
ClientConnection.WatcherSetEventPair)

{

// each watcher will process the event

ClientConnection.WatcherSetEventPairpair = (ClientConnection.WatcherSetEventPair)
@event;

foreach (IWatcherwatcherin pair.watchers)

{

try

{

watcher.Process([email protected]);

}

catch (Exceptiont)

{

LOG.Error("Error while calling watcher ", t);

}

}

}

}

catch(OperationCanceledException)

{

//ignored

}

catch(Exception t)

{

LOG.Error("Caught unexpected throwable", t);

}

}

}

catch(ThreadInterruptedException e)

{

LOG.Error("Event thread exiting due to interruption",e);

}

LOG.Info("EventThreadshut down");

}

核心代码作用是不断从waitingEvents中取出@event,将其中的IWatcher的process函数通通调用一遍,看来ClientConnectionEventConsumer的作用是维护IWatcher集合,并调用它们对应的事件响应函数,至于事件是如何add到waitingEvents中的,稍后再看。

(2)在看看ClientConnectionRequestProducer:

public ClientConnectionRequestProducer(ClientConnectionconn)

{

this.conn= conn;

zooKeeper = conn.zooKeeper;

requestThread = new
Thread(new SafeThreadStart(SendRequests).Run){ Name ="ZK-SendThread" +conn.zooKeeper.Id, IsBackground =true };

}

同样创建了一个线程,线程的主体是:

public void SendRequests()

{

DateTimenow =
DateTime.Now;

DateTimelastHeard = now;

DateTimelastSend = now;

//当state != CLOSED && state != AUTH_FAILED循环

while(zooKeeper.State.IsAlive())

{

try

{

//TcpClient是否为空

if(client ==
null)

{

// don‘t re-establish connection if we are closing

if (conn.closing)

{

break;

}

StartConnect();
//建立到Zookeeper的TCP连接

lastSend = now;

lastHeard = now;

}

//若还未变为CONNECTED状态,判断是否超时

TimeSpanidleRecv = now - lastHeard;

TimeSpanidleSend = now - lastSend;

TimeSpanto = conn.readTimeout - idleRecv;

if(zooKeeper.State !=ZooKeeper.States.CONNECTED)

{

to =conn.connectTimeout - idleRecv;

}

if(to <=
TimeSpan.Zero)

{

throw
new SessionTimeoutException(

string.Format("Clientsession timed out, have not heard from server in {0}ms for sessionid0x{1:X}", idleRecv, conn.SessionId));

}

//连接完成

if(zooKeeper.State ==ZooKeeper.States.CONNECTED)

{

TimeSpan timeToNextPing =new
TimeSpan(0, 0, 0, 0,Convert.ToInt32(conn.readTimeout.TotalMilliseconds/ 2 - idleSend.TotalMilliseconds));

if(timeToNextPing <=TimeSpan.Zero)

{

SendPing();

lastSend = now;

EnableWrite();

}

else

{

if (timeToNextPing < to)

{

to =timeToNextPing;

}

}

}

// Everythingbelow and until we get back to the select is

//non blocking, so time is effectively a constant. That is

//Why we just have to do this once, here

now = DateTime.Now;

if(outgoingQueue.Count > 0)

{

// We have something to send so it‘s the same

// as if we do the send now.

lastSend = now;

}

//从TCP监听端口进行数据读写操作

if(doIO(to))

{

lastHeard = now;

}

if(zooKeeper.State ==ZooKeeper.States.CONNECTED)

{

if (outgoingQueue.Count > 0)

{

EnableWrite();

}

else

{

DisableWrite();

}

}

}

catch(Exception e)

{

if(conn.closing)

{

if (LOG.IsDebugEnabled)

{

// closing so this is expected

LOG.Debug(string.Format("Anexception was thrown while closing send thread for session 0x{0:X} : {1}",conn.SessionId, e.Message));

}

break;

}

//this is ugly, you have a better way speak up

if(e
is KeeperException.SessionExpiredException)

{

LOG.Info(e.Message +
",closing socket connection");

}

elseif (eis
SessionTimeoutException)

{

LOG.Info(e.Message +RETRY_CONN_MSG);

}

else
if (e is System.IO.EndOfStreamException)

{

LOG.Info(e.Message +RETRY_CONN_MSG);

}

else

{

LOG.Warn(string.Format("Session0x{0:X} for server {1}, unexpected error{2}", conn.SessionId,null, RETRY_CONN_MSG), e);

}

Cleanup();

if(zooKeeper.State.IsAlive())

{

conn.consumer.QueueEvent(newWatchedEvent(KeeperState.Disconnected,EventType.None,null));

}

now = DateTime.Now;

lastHeard = now;

lastSend = now;

client = null;

}

}

Cleanup();

if(zooKeeper.State.IsAlive())

{

conn.consumer.QueueEvent(newWatchedEvent(KeeperState.Disconnected,EventType.None,null));

}

if(LOG.IsDebugEnabled) LOG.Debug("SendThreadexitedloop.");

}

只要连接状态不为CLOSED和AUTH_FAILED两种,就一直循环,主要做三件事:

(1)       建立到Zookeeper的TCP连接TcpClient;

(2)      判断当前在连接状态还未变为CONNECTED前是否超时,若超时就抛异常;

(3)      从TCP监听端口进行数据读写操作。

对于第一件事,如果TcpClient类对象client为空的话,调用StartConnect函数,定义如下:

private
void StartConnect()

{

//设置连接状态为CONNECTING

zooKeeper.State = ZooKeeper.States.CONNECTING;

currentConnectIndex =nextAddrToTry;

//选择一个Zookeeper地址

IPEndPointaddr = conn.serverAddrs[nextAddrToTry];

nextAddrToTry++;

if(nextAddrToTry == conn.serverAddrs.Count)

{

nextAddrToTry = 0;

}

LOG.Info("Openingsocket connection to server " + addr);

//创建TcpClient连接

client = newTcpClient();

client.LingerState = newLingerOption(false, 0);

client.NoDelay = true;

ConnectSocket(addr); //建立到Zookeeper的TCP连接

PrimeConnection(client);

initialized = false;

}

最终调用ConnectSocket函数,这个函数作用是最终建立到Zookeeper的TCP连接:

private
void ConnectSocket(IPEndPoint addr)

{

boolconnected =
false;

ManualResetEventsocketConnectTimeout =new
ManualResetEvent(false);

ThreadPool.QueueUserWorkItem(state=>

{

try

{

//尝试建立到addr的TCP连接

client.Connect(addr);

connected = true;

socketConnectTimeout.Set();

}

//ReSharper disable EmptyGeneralCatchClause

catch

//ReSharper restore EmptyGeneralCatchClause

{

}

});

socketConnectTimeout.WaitOne(10000);

if(connected)
return;

thrownewInvalidOperationException(string.Format("Couldnot make socket
connection to {0}:{1}", addr.Address, addr.Port));

}

到这里实例化ClientConnection的过程就结束了,再看看其Start方法:

public void Start()

{

zooKeeper.State = ZooKeeper.States.CONNECTING;

consumer.Start();

producer.Start();

}

启动很简单,首先设置连接状态为CONNECTING,然后分别启动consumer和producer。这两者的Start其实很简单就是启动在各自构造函数中创建的那个线程。

先看ClientConnectionEventConsumer:

public ClientConnectionEventConsumer(ClientConnectionconn)

{

this.conn= conn;

eventThread = new
Thread(new SafeThreadStart(PollEvents).Run){ Name ="ZK-EventThread " +conn.zooKeeper.Id, IsBackground =true };

}

public
voidStart()

{

eventThread.Start();

}

在看看ClientConnectionRequestProducer:

public ClientConnectionRequestProducer(ClientConnectionconn)

{

this.conn= conn;

zooKeeper = conn.zooKeeper;

requestThread = new
Thread(new SafeThreadStart(SendRequests).Run){ Name ="ZK-SendThread" +conn.zooKeeper.Id, IsBackground =true };

}

public
voidStart()

{

zooKeeper.State = ZooKeeper.States.CONNECTING;

requestThread.Start();

}

至此启动便完成了,但是细心的读者可能有两个疑问:

(1)      连接状态什么时候能够从CONNECTING变为CONNECTED?

(2)      传递到ZooKeeper构造函数中的IWatcher对象的process事件什么时候响应?

回答者两个问题要看ClientConnectionRequestProducer的线程SendRequests。

里面有个doIO方法会一直从TCP监听端口读取响应数据。

booldoIO(TimeSpan to)

{

boolpacketReceived =
false;

if(client ==
null) thrownewIOException("Socket is null!");

//判断当前套接字连接是否有可读取数据

if(client.Client.Poll(Convert.ToInt32(to.TotalMilliseconds/ 1000000),SelectMode.SelectRead))

{

packetReceived = true;

inttotal = 0;

//开始读取数据

intcurrent = total = client.GetStream().Read(incomingBuffer, total,incomingBuffer.Length - total);

while(total < incomingBuffer.Length && current > 0)

{

current =client.GetStream().Read(incomingBuffer, total, incomingBuffer.Length - total);

total += current;

}

if(current <= 0)

{

thrownewEndOfStreamException(string.Format("Unableto read additional
data from server sessionid 0x{0:X}, likely server has closedsocket",

conn.SessionId));

}

if(lenBuffer ==
null)

{

lenBuffer = incomingBuffer;

recvCount++;

ReadLength();

}

elseif (!initialized)

{

//若是还未初始化完成,就读取连接状态的response

ReadConnectResult();

if(!outgoingQueue.IsEmpty()) EnableWrite();

lenBuffer = null;

incomingBuffer = newbyte[4];

initialized = true;

}

else

{

//读取其它response

ReadResponse();

lenBuffer = null;

incomingBuffer = newbyte[4];

}

}

//判断当前套接字连接是否有可写数据

elseif (writeEnabled && client.Client.Poll(Convert.ToInt32(to.TotalMilliseconds / 1000000),SelectMode.SelectWrite))

{

lock(outgoingQueueLock)

{

if(!outgoingQueue.IsEmpty())

{

Packet first = outgoingQueue.First.Value;

client.GetStream().Write(first.data, 0, first.data.Length);

sentCount++;

outgoingQueue.RemoveFirst();

if (first.header !=null&& first.header.Type != (int)OpCode.Ping &&

first.header.Type!= (int)OpCode.Auth)

{

pendingQueue.AddLast(first);

}

}

}

}

if(outgoingQueue.IsEmpty())

{

DisableWrite();

}

else

{

EnableWrite();

}

returnpacketReceived;

}

看其中的ReadConnectResult方法:

private
void ReadConnectResult()

{

using(var reader =newEndianBinaryReader(EndianBitConverter.Big,newMemoryStream(incomingBuffer),Encoding.UTF8))

{

BinaryInputArchivebbia =BinaryInputArchive.GetArchive(reader);

ConnectResponseconRsp =new
ConnectResponse();

//读取response

conRsp.Deserialize(bbia,
"connect");

negotiatedSessionTimeout =conRsp.TimeOut;

if(negotiatedSessionTimeout <= 0)

{

zooKeeper.State =
ZooKeeper.States.CLOSED;

conn.consumer.QueueEvent(newWatchedEvent(KeeperState.Expired,EventType.None,null));

thrownewSessionExpiredException(string.Format("Unableto reconnect
to ZooKeeper service, session 0x{0:X} has expired",conn.SessionId));

}

//修改连接状态为CONNECTED

conn.readTimeout = newTimeSpan(0, 0,0, 0, negotiatedSessionTimeout*2/3);

conn.connectTimeout = newTimeSpan(0, 0,0, negotiatedSessionTimeout/conn.serverAddrs.Count);

conn.SessionId =conRsp.SessionId;

conn.SessionPassword =conRsp.Passwd;

zooKeeper.State = ZooKeeper.States.CONNECTED;

LOG.Info(string.Format("Sessionestablishment complete on server {0:X}, negotiated timeout = {1}",conn.SessionId, negotiatedSessionTimeout));

//将ZKWatchManager中的watcher封装并Add到ClientConnectionEventConsumer的waitingEvents集合

conn.consumer.QueueEvent(newWatchedEvent(KeeperState.SyncConnected,EventType.None,
null));

}

}

可以看到如果读取到连接完成的response,将会修改当前连接状态为CONNECTED,同时调用QueueEvent函数将之前new Zookeeper时赋值给WatchManager的IWatcher加入到ClientConnectionEventConsumer的waitingEvents集合,以便其线程中可以取出来调用。

至此初始化和启动分析完成。

核心是通过.NET中System.Net.Sockets类库中的TcpClient类与Zookeeper服务端进行TCP通信,轮训从TCP监听端口读取或者写入数据,和Java服务端通信只需要双方商量好数据通信格式,能够相互序列化和反序列化即可正确通信。

数据操作接口

以GetData函数为例:

public byte[] GetData(stringpath,IWatcher watcher,
Stat stat)

{

stringclientPath = path;

PathUtils.ValidatePath(clientPath);

// thewatch contains the un-chroot path

WatchRegistrationwcb =null;

if(watcher !=
null)

{

wcb = newDataWatchRegistration(watchManager, watcher,clientPath);

}

stringserverPath = PrependChroot(clientPath);

RequestHeaderh =
new RequestHeader();

h.Type = (int)OpCode.GetData;

GetDataRequestrequest =new
GetDataRequest();

request.Path = serverPath;

request.Watch = watcher != null;

GetDataResponseresponse =new
GetDataResponse();

//提交请求

ReplyHeaderr = cnxn.SubmitRequest(h, request, response, wcb);

if(r.Err != 0)

{

throwKeeperException.Create((KeeperException.Code)Enum.ToObject(typeof(KeeperException.Code),r.Err),
clientPath);

}

if(stat !=
null)

{

DataTree.CopyStat(response.Stat,stat);

}

//返回response的数据

returnresponse.Data;

}

主要关注三个问题:

(1)      获取数据请求何时、如何发送到ZooKeeper服务器?

(2)      请求的回复Response何时、如何返回?

(3)      传递进来的IWatcher事件何时触发?

上面GetData函数的核心是调用SubmitRequest函数提交请求,同时构造好一个response也一同传递进去,可以看到最后返回的是response.Data,这就说明SubmitRequest函数提交请求后,内部将数据写入套接字的同时,阻塞等待response的返回,我们实际来看一下,看看请求数据包发出后发送回来的响应数据是如何填充到传递进去的response对象中的。

下面是SubmitRequest函数的实现代码,实际上就是将数据Packet包放入到ClientConnectionRequestProducer的队列outgoingQueue中,下面是实现代码:

public
ReplyHeader SubmitRequest(RequestHeader h,IRecordrequest,
IRecord response,ZooKeeper.WatchRegistrationwatchRegistration)

{

ReplyHeaderr =
new ReplyHeader();

Packetp = QueuePacket(h, r, request, response,null,
null, watchRegistration,null,null);

lock(p)

{

while(!p.Finished)

{

if(!Monitor.Wait(p, SessionTimeout))

throw
new TimeoutException(string.Format("Therequest {0} timed out while waiting for a resposne from the server.",request));

}

}

returnr;

}

下面是QueuePacket函数的实现代码:

public
PacketQueuePacket(RequestHeader h,
ReplyHeader r, IRecordrequest,
IRecord response, string clientPath,
stringserverPath, ZooKeeper.WatchRegistration watchRegistration,object callback,
objectctx)

{

returnproducer.QueuePacket(h, r, request, response, clientPath, serverPath,watchRegistration);

}

下面是QueuePacket函数的实现代码:

public
Packet QueuePacket(RequestHeader h,
ReplyHeaderr, IRecord request,
IRecordresponse, string clientPath,
string serverPath, ZooKeeper.WatchRegistration watchRegistration)

{

lock(outgoingQueueLock)

{

//lockhere for XID?

if(h.Type != (int)OpCode.Ping&& h.Type != (int)OpCode.Auth)

{

h.Xid = Xid;

}

Packetp =
new Packet(h,r, request, response,
null, watchRegistration,clientPath, serverPath);

p.clientPath = clientPath;

p.serverPath = serverPath;

if(!zooKeeper.State.IsAlive())

{

ConLossPacket(p);

}

else

{

//将Packet放入outgoingQueue队列

outgoingQueue.AddLast(p);

}

returnp;

}

}

可以看到最终在这里传递进来的request和response被包装成了Packet对象,然后被放入到了ClientConnectionRequestProducer的outgoingQueue队列中。

还记得ClientConnectionRequestProducer中开启的线程SendRequests中不断循环调用的doIO函数么?这个函数处理到Zookeeper的TCP连接端口的数据读取和写入,再看看这个函数中处理数据读取和写入的代码:

//判断当前套接字连接是否有可读取数据

if(client.Client.Poll(Convert.ToInt32(to.TotalMilliseconds/ 1000000),SelectMode.SelectRead))

{

packetReceived = true;

inttotal = 0;

//开始读取数据

intcurrent = total = client.GetStream().Read(incomingBuffer, total,incomingBuffer.Length - total);

while(total < incomingBuffer.Length && current > 0)

{

current = client.GetStream().Read(incomingBuffer,total, incomingBuffer.Length - total);

total += current;

}

if(current <= 0)

{

thrownewEndOfStreamException(string.Format("Unableto read additional
data from server sessionid 0x{0:X}, likely server has closedsocket",

conn.SessionId));

}

if(lenBuffer ==
null)

{

lenBuffer = incomingBuffer;

recvCount++;

ReadLength();

}

elseif (!initialized)

{

//若是还未初始化完成,就读取连接状态的response

ReadConnectResult();

if(!outgoingQueue.IsEmpty()) EnableWrite();

lenBuffer = null;

incomingBuffer = newbyte[4];

initialized = true;

}

else

{

//读取其它response

ReadResponse();

lenBuffer = null;

incomingBuffer = newbyte[4];

}

}

//判断当前套接字连接是否有可写数据

elseif (writeEnabled && client.Client.Poll(Convert.ToInt32(to.TotalMilliseconds / 1000000),SelectMode.SelectWrite))

{

lock(outgoingQueueLock)

{

if(!outgoingQueue.IsEmpty())

{

Packetfirst = outgoingQueue.First.Value;

client.GetStream().Write(first.data, 0, first.data.Length);

sentCount++;

outgoingQueue.RemoveFirst();

if (first.header !=null&& first.header.Type != (int)OpCode.Ping &&

first.header.Type!= (int)OpCode.Auth)

{

pendingQueue.AddLast(first);

}

}

}

}

可以看到数据写入时如果outgoingQueue队列不为空,就从中取出Packet,将请求数据写入到套接字连接的流中,同时将这个Packet放入到了pendingQueue队列中。

这个队列是返回response的关键,当response返回时,会调用上面的doIO函数中的数据读取处理部分,其中调用了ReadResponse函数,实现代码如下:

private
void ReadResponse()

{

using(MemoryStream ms =newMemoryStream(incomingBuffer))

using(var reader =newEndianBinaryReader(EndianBitConverter.Big,ms,Encoding.UTF8))

{

BinaryInputArchivebbia =BinaryInputArchive.GetArchive(reader);

ReplyHeaderreplyHdr =new
ReplyHeader();

replyHdr.Deserialize(bbia,
"header");

if(replyHdr.Xid == -2)

{

//-2 is the xid for pings

if(LOG.IsDebugEnabled)

{

LOG.Debug(string.Format("Gotping response for sessionid: 0x{0:X} after {1}ms", conn.SessionId,(DateTime.Now.Nanos()
-lastPingSentNs)/1000000));

}

return;

}

if(replyHdr.Xid == -4)

{

//-2 is the xid for AuthPacket

//TODO: process AuthPacket here

if(LOG.IsDebugEnabled)

{

LOG.Debug(string.Format("Gotauth sessionid:0x{0:X}", conn.SessionId));

}

return;

}

if(replyHdr.Xid == -1)

{

//-1 means notification

if(LOG.IsDebugEnabled)

{

LOG.Debug(string.Format("Gotnotification sessionid:0x{0}", conn.SessionId));

}

WatcherEvent@event =new
WatcherEvent();

//读取响应结果

@event.Deserialize(bbia,
"response");

//convert from a server path to a client path

if(conn.ChrootPath !=null)

{

stringserverPath = @event.Path;

if (serverPath.CompareTo(conn.ChrootPath) == 0)

@event.Path =
"/";

else

@event.Path =serverPath.Substring(conn.ChrootPath.Length);

}

WatchedEventwe =new
WatchedEvent(@event);

if(LOG.IsDebugEnabled)

{

LOG.Debug(string.Format("Got{0} for sessionid 0x{1:X}", we, conn.SessionId));

}

//将构造好的watch事件放入响应事件处理队列,由线程处理

conn.consumer.QueueEvent(we);

return;

}

if(pendingQueue.IsEmpty())

{

thrownewIOException(string.Format("Nothingin the queue, but got
{0}", replyHdr.Xid));

}

//从pendingQueue队列取出Packet

Packetpacket;

lock(pendingQueueLock)

{

packet =pendingQueue.First.Value;

pendingQueue.RemoveFirst();

}

/*

* Since requests are processed inorder, we better get a response

* to the first request!

*/

try

{

if(packet.header.Xid != replyHdr.Xid)

{

packet.replyHeader.Err= (int)KeeperException.Code.CONNECTIONLOSS;

thrownewIOException(string.Format("Xidout of order. Got {0}
expected {1}", replyHdr.Xid,packet.header.Xid));

}

packet.replyHeader.Xid =replyHdr.Xid;

packet.replyHeader.Err =replyHdr.Err;

packet.replyHeader.Zxid =replyHdr.Zxid;

if(replyHdr.Zxid > 0)

{

lastZxid =replyHdr.Zxid;

}

//将读取到的response数据填充到取出的Packet中结束

if(packet.response !=null &&replyHdr.Err == 0)

{

packet.response.Deserialize(bbia,
"response");

}

if(LOG.IsDebugEnabled)

{

LOG.Debug(string.Format("Readingreply sessionid:0x{0:X}, packet:: {1}", conn.SessionId, packet));

}

}

finally

{

FinishPacket(packet);

}

}

}

可以看到这里面的关键就是通过一个队列pendingQueue来返回读取到的response的数据。

结论

通过分析发现,有三点原因需要强化客户端到ZooKeeper集群的连接管理:

(1)当实例化ZooKeeper对象建立到服务器的连接时,是通过不断循环,等待连接建立的回应,等待超过一定超时时限后建立连接失败。所以我们在使用ZooKeeper对象进行操作前有必要进行连接是否已经建立的测试,如果仍没有建立完成连接需要等待连接完成(即IWatcher事件响应),同样这个等待也需要超时时限。

(2)如果是ZooKeeper集群,连接到其中一个节点失败时,或者已经建立了与某个节点的连接,但是因为某种原因这个节点变得不可用,需要自动重连到其它可用的节点,所以重连机制也需要;

(3)对于每一个节点操作(GetData,Delete等),可能失败,此时需要保障有一个到可用ZooKeeper节点的可用连接,其次需要在操作失败时能够按照一定策略重试。

时间: 2024-10-11 06:45:19

ZooKeeperNet源码解析的相关文章

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

eclipse中导入jdk源码、SpringMVC注解@RequestParam、SpringMVC文件上传源码解析、ajax上传excel文件

eclipse中导入jdk源码:http://blog.csdn.net/evolly/article/details/18403321, http://www.codingwhy.com/view/799.html. ------------------------------- SpringMVC注解@RequestParam:http://825635381.iteye.com/blog/2196911. --------------------------- SpringMVC文件上传源

String源码解析(一)

本篇文章内的方法介绍,在方法的上面的注释讲解的很清楚,这里只阐述一些要点. Java中的String类的定义如下: 1 public final class String 2 implements java.io.Serializable, Comparable<String>, CharSequence { ...} 可以看到,String是final的,而且继承了Serializable.Comparable和CharSequence接口. 正是因为这个特性,字符串对象可以被共享,例如下面

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s

Spring源码解析-applicationContext

Demo uml类图 ApplicationContext ApplicationListener 源码解析 主流程 obtainFreshBeanFactory prepareBeanFactory invokeBeanFactoryPostProcessors registerBeanPostProcessors registerListeners finishRefresh 总结 在已经有BeanFactory可以完成Ioc功能情况下,spring又提供了ApplicationContex

socketserver源码解析和协程版socketserver

来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力 client import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print('receive:',data.decode()) inp = input('please input:') sk

Handler机制(四)---Handler源码解析

Handler的主要用途有两个:(1).在将来的某个时刻执行消息或一个runnable,(2)把消息发送到消息队列. 主要依靠post(Runnable).postAtTime(Runnable, long).postDelayed(Runnable, long).sendEmptyMessage(int).sendMessage(Message).sendMessageAtTime(Message).sendMessageDelayed(Message, long)这些方法来来完成消息调度.p

Android EventBus源码解析, 带你深入理解EventBus

上一篇带大家初步了解了EventBus的使用方式,详见:Android EventBus实战 没听过你就out了,本篇博客将解析EventBus的源码,相信能够让大家深入理解该框架的实现,也能解决很多在使用中的疑问:为什么可以这么做?为什么这么做不好呢? 1.概述 一般使用EventBus的组件类,类似下面这种方式: [java] view plain copy public class SampleComponent extends Fragment { @Override public vo