ZooKeeper个人笔记watcher注册

每一个Watcher具有如下属性:

1.KeeperState

2.EventType

3.path

4.process(WatchedEvent evnet)回掉方法

Watcher干嘛的?用户监听session的状态,数据节点的状态等。

watcher种类:defaultWatcher,非defaultWatcher

dafaultWatcher是在创建ZooKeeper对象时传递的watcher参数。非defaultWatcher只得是,在调用getData,getChildren,exists函数时传递的watcher对象。

二者的相同点在于:都可以监听数据节点的状态,比如在getData中使用了defaultWatcher,那么当监听的节点内容发生改变时,defaultWatcher就会收到通知。如果没有使用了非defaultWatcher,也是同样的。

而这的不同点在于:defaultWatcher会监听session的生命周期,比如session创建成功了,失效了等,而非defaultWatcher不具有这个职责。其次defaultWatcher并不与某一个节点路径相互关联。

notification event,非notification event

客户端需要接受服务器发送过来的消息,第一种消息是类似于Watcher回掉这种的,我们叫做notification,他的特点是服务器主动发送消息给客户端的,比如客户端a在数据节点a上设置了getData监听,当客户端b修改了节点a后,服务器主动发送NodeDataChanged消息给客户端a。第二中消息是类似于create,getData这种,他们向服务器发送对应的请求后,然后将请求放进到pendingQueue中,然后等待服务器的响应,当接受到服务器的响应后,再从pendingQueue中取出请求,然后进行回掉。对于第二中,只有两类请求是不需要服务器响应的,ping,autu。

Watcher和AsyncCallback区别

Watcher用于监听节点的,比如getData对数据节点a设置了watcher,那么当a的数据内容发生改变时,客户端会收到NodeDataChanged通知,然后进行watcher的回掉。

AsyncCallback是在以异步方式使用ZooKeeper APi时,用户获取api的处理结果的。而这具有本质的不同,不要混淆。

同步getData源码走读:

 /**
     * Return the data and the stat of the node of the given path.
     * <p>
     * If the watch is non-null and the call is successful (no exception is
     * thrown), a watch will be left on the node with the given path. The watch
     * will be triggered by a successful operation that sets data on the node, or
     * deletes the node.
     * <p>
     * A KeeperException with error code KeeperException.NoNode will be thrown
     * if no node with the given path exists.
     *
     * @param path the given path
     * @param watcher explicit watcher
     * @param stat the stat of the node
     * @return the data of the node
     * @throws KeeperException If the server signals an error with a non-zero error code
     * @throws InterruptedException If the server transaction is interrupted.
     * @throws IllegalArgumentException if an invalid path is specified
     */
    public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();        //提交请求,等待请求处理完毕
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
    public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();        //提交请求到outgoingQueue,outgoingQueue里面的请求是将要发送到服务器的请求,ClientCnxnSocket的doIO()会从outgoingQueue中取出队列发送到服务器
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);        //阻塞等待,直到这个请求处理完成
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }

1.将请求头,请求体,响应等送入队列,返回一个packet,然后等待packet完成。

 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.        //这里不会为这个packet生成xid,当ClientCnxnSocket::doIO从outgoingQueue中取出packet才生成xid
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;            //如果会话已经关闭
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing                //如果客户端请求关闭会话
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }

由于outgoingQueue并不是阻塞队列并且需要向其添加packet对象,所以需要对其synchronized。最后调用sendThread.getClientCnxnSocket().wakeupCnxn.

  /**
     * @return true if a packet was received
     * @throws InterruptedException
     * @throws IOException
     */
    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }        //优先处理读响应
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) {
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();
                    }
                    sock.write(p.bb);
                    if (!p.bb.hasRemaining()) {
                        sentCount++;
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {
                    // No more packets to send: turn off write interest flag.
                    // Will be turned on later by a later call to enableWrite(),
                    // from within ZooKeeperSaslClient (if client is configured
                    // to attempt SASL authentication), or in either doIO() or
                    // in doTransport() if not.
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
    }

锁定outgoingQueue,获取一个可以发送的packet,如果则个packet还没有被发送,则p.bb!=null,然后生成xid。

xid初始值为1,每次发送一个packet时,都会递增xid,将其设置到请求头中。

最后,将packet写入到pendingQueue中。

   void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");            //虽然客户端会发送ping或者auth消息给服务器,但是客户端并不需要等待服务器的响应,也就是说他并没有将请求写入到pendingQueue中
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
                            Watcher.Event.KeeperState.AuthFailed, null) );
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }

            //notification类型的通知,watcher回掉相关逻辑。
            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                //根据<clientPath,EventType>从ZKWatcherManager中取出相关的wacher,然后封装成:                //<EventType(NodeDataChanged),Path(/abc),watche1 for getData,WatchedEvent>
                //<EventType(NodeDataChanged),Path(/abc),watche1 for exists,WatchedEvent>
                eventThread.queueEvent( we );
                return;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (clientTunneledAuthenticationInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }

            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }

                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {                //客户端发送的非ping,auth请求,比如getData,setData等,注意事项,如果客户端注册了新的wather,需要将这个watcher保存到ZKWatcherManager中。如果是同步调用,直接通知客户端完成即可,如果是异步调用,就需要进行AsyncCallback回掉。
                finishPacket(packet);
            }
        }
    private void finishPacket(Packet p) {        //如果用户显示的注册了watcher,比如在getData中注册了非默认watcher,那么就将watcher添加到ZKWatcherManager中去,很重要的一步哦        //如果没有这一步,也就没有后面的watcher回掉了。
        if (p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }
        //cb就是AsnycCallback,如果为null,表明是同步调用的接口,不需要异步回掉,因此,直接notifyAll即可。
        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }

如果watchRegistration!=null,

如果p.cb==null,说明这个请求已经处理完毕了,因此通知packet完成即可。

最后,将packet放进waitingEvents中,然后EventThread就可以从waitingEvents中取出packets,然后执行客户端逻辑。

 private void processEvent(Object event) {
          try {              //watcher回掉逻辑
              if (event instanceof WatcherSetEventPair) {
                  // each watcher will process the event
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } else {                  //create,getData等需要异步回掉的
                  Packet p = (Packet) event;
                  int rc = 0;
                  String clientPath = p.clientPath;
                  if (p.replyHeader.getErr() != 0) {
                      rc = p.replyHeader.getErr();
                  }
                  if (p.cb == null) {
                      LOG.warn("Somehow a null cb got to EventThread!");
                  } else if (p.response instanceof ExistsResponse
                          || p.response instanceof SetDataResponse
                          || p.response instanceof SetACLResponse) {
                      StatCallback cb = (StatCallback) p.cb;
                      if (rc == 0) {
                          if (p.response instanceof ExistsResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetDataResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetDataResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetACLResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetACLResponse) p.response)
                                              .getStat());
                          }
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetDataResponse) {
                      DataCallback cb = (DataCallback) p.cb;
                      GetDataResponse rsp = (GetDataResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getData(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetACLResponse) {
                      ACLCallback cb = (ACLCallback) p.cb;
                      GetACLResponse rsp = (GetACLResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getAcl(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetChildrenResponse) {
                      ChildrenCallback cb = (ChildrenCallback) p.cb;
                      GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetChildren2Response) {
                      Children2Callback cb = (Children2Callback) p.cb;
                      GetChildren2Response rsp = (GetChildren2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }
                  } else if (p.response instanceof CreateResponse) {
                      StringCallback cb = (StringCallback) p.cb;
                      CreateResponse rsp = (CreateResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())));
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.cb instanceof VoidCallback) {
                      VoidCallback cb = (VoidCallback) p.cb;
                      cb.processResult(rc, clientPath, p.ctx);
                  }
              }
          } catch (Throwable t) {
              LOG.error("Caught unexpected throwable", t);
          }
       }
    }

客户端在节点a注册了如下watcher:

getData(a,watcher1)

exists(a,watcher2)

getChildren(a,watcher3)

因此与a关联的watcher有watcher1和watcher2。如果节点a被修改了,那么客户端会收到notification 类型的通知,这里应是NodeDataChanged事件类型,此时,客户端需要回掉watcher1和watcher2.也就是说,他需要根据<EventType,clientPath>来找到与节点a对应的所有watcher。

时间: 2024-08-04 20:38:29

ZooKeeper个人笔记watcher注册的相关文章

zookeeper学习笔记-zkclient,curator使用

开源客户端,原生api的不足 连接的创建是异步的,需要开发人员自行编码实现等待 连接没有自动的超时重连机制 Zk本身没提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化 Watcher注册一次只会生效一次,需要不断的重复注册 Watcher的使用方式不符合java本身的术语,如果采用监听器方式,更容易理解 不支持递归创建树形节点 开源客户端---ZkClient介绍 Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Vo

zookeeper(4)注册中心

案例 注册中心可以使用Eureka来实现,这个比较简单,可以看之前的例子spring-cloud. 那么使用zookeeper如何来实现注册中心呢? 基于spring cloud我们也可以非常简单的实现. 1.利用之前搭建的zookeeper集群,zookeeper集群 2.新建maven工程 1.POM文件如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://

ZooKeeper 并不适合做注册中心

zookeeper 的 CP 模型不适合注册中心 zookeeper 是一个非常优秀的项目,非常成熟,被大量的团队使用,但对于服务发现来讲,zookeeper 真的是一个错误的方案. 在 CAP 模型中,zookeeper 是 CP,意味着面对网络分区时,为了保持一致性,他是不可用的. 因为 zookeeper 是一个分布式协调系统,如果使用最终一致性(AP)的话,将是一个糟糕的设计,他的核心算法是 Zab,所有设计都是为了一致性. 对于协调系统,这是非常正确的,但是对于服务发现,可用性是第一位

zookeeper(5)--基于watcher原理实现带注册中心的RPC框架

一.带版本控制的注册中心RPC框架 server端 //注册中心接口 public interface IRegisterCenter { public void register(String serviceName,String serviceAddress); } //实现类 package zoorpc.zk; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework

zookeeper 学习笔记1(转)

本文转自https://www.cnblogs.com/fanguangdexiaoyuer/p/7077520.html 感谢作者 可以设置观察的操作:exists,getChildren,getData 可以触发观察的操作:create,delete,setData zookeeper观察机制; 服务端只存储事件的信息, 客户端存储事件的信息和Watcher的执行逻辑. ZooKeeper客户端是线程安全的. 每一个应用只需要实例化一个ZooKeeper客户端即可, 同一个ZooKeeper

ZooKeeper简要笔记

What is Zookeeper(官方定义) Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.All of these kinds of services are used in some form or another by distr

ZooKeeper个人笔记之节点的监听

create public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException 1.不支持递归创建节点,比如你要创建/master/a,如果master不存在,你就不能创建a( KeeperException.NoNode). 2.不可以再ephemeral类型的节点下创建子节点(KeeperExcep

Zookeeper 学习笔记之 Leader Election

ZooKeeper四种节点类型: Persist Persist_Sequential Ephemeral Ephemeral_Sequential 在节点上可注册的Watch,客户端先得到通知再得到数据,Watch被fire后,不会再Watch到后续的变化. 基于ZooKeeper做Leader Election 非公平模式 - 客户端会在Persist父节点下创建Ephemeral的Leader节点,只不过是大家抢占式注册,先到先得.即使第一次排在前面,对第二次竞选也不会有影响,所以称为非公

dubbo+zookeeper搭建笔记

参考博客: http://blog.csdn.net/u013142781/article/details/50396621#reply http://blog.csdn.net/u013142781/article/details/50395650 https://my.oschina.net/penngo/blog/494838 http://blog.csdn.net/morning99/article/details/40426133 内容基本来自小宝鸽的博客,部分小地方自己稍作修改.