zookeeper session tracker机制分析

说到zookeeper session管理 ,免不了要问

  • 什么是session?
  • session id/session是如何产生的?
  • session 信息如何存储?

本文以session tracker线程【详见SessionTrackerImpl】的运行机制作为主线,并尝试解答一些相关问题

1)session基础

在介绍session tracker线程之前先回答几个问题

1.1) 什么是session?

zookeeper中session意味着一个物理连接,客户端connect成功之后,会发送一个connect型请求,此时就会有session 产生(下面会具体讲)

1.2)sessionid是如何产生的?

在SessionTrackerImpl实例化的时候就会调用下面的函数【详见SessionTrackerImpl.initializeNextSession】


1

2

3

4

5

6

public static long initializeNextSession(long id) {

       long nextSid = 0;

       nextSid = (System.currentTimeMillis() << 24) >> 8;

       nextSid =  nextSid | (id <<56);

       return nextSid;

   }

产生的值会存入nextSessionId属性,以后一旦有新的连接(session)产生,就会nextSessionId++

1.3)session是如何产生的?

接到一个连接类型的请求【详见ZooKeeperServer.processConnectRequest】


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

int sessionTimeout = connReq.getTimeOut();

        byte passwd[] = connReq.getPasswd();

        int minSessionTimeout = getMinSessionTimeout();

        if (sessionTimeout < minSessionTimeout) {

            sessionTimeout = minSessionTimeout;

        }

        int maxSessionTimeout = getMaxSessionTimeout();

        if (sessionTimeout > maxSessionTimeout) {

            sessionTimeout = maxSessionTimeout;

        }

        cnxn.setSessionTimeout(sessionTimeout);

        // We don‘t want to receive any packets until we are sure that the

        // session is setup

        cnxn.disableRecv();

        long sessionId = connReq.getSessionId();

        if (sessionId != 0) {

            long clientSessionId = connReq.getSessionId();

            LOG.info("Client attempting to renew session 0x"

                    + Long.toHexString(clientSessionId)

                    " at " + cnxn.getRemoteSocketAddress());

            serverCnxnFactory.closeSession(sessionId);

            cnxn.setSessionId(sessionId);

            reopenSession(cnxn, sessionId, passwd, sessionTimeout);

        else {

            LOG.info("Client attempting to establish new session at "

                    + cnxn.getRemoteSocketAddress());

            createSession(cnxn, passwd, sessionTimeout);

        }

1.3.1)确定session的timeout和id

【详见SessionTrackerImpl.createSession】


1

2

3

4

synchronized public long createSession(int sessionTimeout) {

        addSession(nextSessionId, sessionTimeout);

        return nextSessionId++;

    }

可见产生session需要两个元素,一个是sessionid,一个是timeout

  • timeout由客户端确定,但必须在服务器规定的最大的timeout(ticktime*20)和最小的timeout(ticktime*2)之间
  • 如果客户端没有指定sessionid,那么就会产生一个新的session【详见ZooKeeperServer.createSession】,否则会reopen【详见ZooKeeperServer.reopenSession】
  • sessionid的产生上面解释过了

1.3.2)实例化session及相关关系存放

【详见SessionTrackerImpl.addSession】


1

2

3

4

5

6

7

8

9

10

sessionsWithTimeout.put(id, sessionTimeout);

        if (sessionsById.get(id) == null) {

            SessionImpl s = new SessionImpl(id, sessionTimeout, 0);

            sessionsById.put(id, s);

            if (LOG.isTraceEnabled()) {

                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,

                        "SessionTrackerImpl --- Adding session 0x"

                        + Long.toHexString(id) + " " + sessionTimeout);

            }

        }

  • 一个重要的数据结构sessionsWithTimeout存放sessionid和timeout的映射
  • 另一个重要的数据结构sessionsById存放sessionid和SessionImpl实例的映射

1.3.3)确定session实例的tickTime及sessionSets关系维护

【详见SessionTrackerImpl.touchSession】


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

long expireTime = roundToInterval(System.currentTimeMillis() + timeout);

        if (s.tickTime >= expireTime) {

            // Nothing needs to be done

            return true;

        }

        SessionSet set = sessionSets.get(s.tickTime);

        if (set != null) {

            set.sessions.remove(s);

        }

        s.tickTime = expireTime;

        set = sessionSets.get(s.tickTime);

        if (set == null) {

            set = new SessionSet();

            sessionSets.put(expireTime, set);

        }

        set.sessions.add(s);

  • 根据当前时间和timeout计算本session 的expireTime即tickTime
  • 一个重要的数据结构sessionSets 存放过期时间和一组session实例(相同过期时间)的映射的建立及维护
  • session实例的tickTime的确定

2)session tracker线程的机制

在zookeeper服务体系中,专门有一个线程(session tracker)维护session【详见SessionTrackerImpl.run】,重要代码如下


1

2

3

4

5

6

7

8

9

10

11

12

13

14

currentTime = System.currentTimeMillis();

if (nextExpirationTime > currentTime) {

    this.wait(nextExpirationTime - currentTime);

    continue;

}

SessionSet set;

set = sessionSets.remove(nextExpirationTime);

if (set != null) {

    for (SessionImpl s : set.sessions) {

        sessionsById.remove(s.sessionId);

        expirer.expire(s);

    }

}

nextExpirationTime += expirationInterval;

可见SessionTrackerImpl这个线程会一直轮询的清除过期session

  • 每次轮询都会比较currentTime和nextExpirationTime,如果还未到nextExpirationTime,就等,否则往下走
  • 将sessionSets中的以nextExpirationTime为key的那组session移出
  • 遍历session,从sessionsById移除session,并调用相关的过期处理(下面会讲)
  • 调整下载比较的时间,即nextExpirationTime += expirationInterval;

3) session维护相关问题

3.1)清除session如何实现?

【详见ZooKeeperServer.close】


1

2

3

private void close(long sessionId) {

       submitRequest(null, sessionId, OpCode.closeSession, 0nullnull);

   }

3.1.1)构造一个Request实例

3.1.2)调用PrepRequestProcessor.processRequest放入submittedRequests队列

3.1.3)PrepRequestProcessor线程的处理


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,

                            zks.getTime(), type);

                                                   

switch (type) {

                                                  

    //省略N行代码......

                                                  

    case OpCode.closeSession:

        // We don‘t want to do this check since the session expiration thread

        // queues up this operation without being the session owner.

        // this request is the last of the session so it should be ok

        //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());

        HashSet<String> es = zks.getZKDatabase()

                .getEphemerals(request.sessionId);

        synchronized (zks.outstandingChanges) {

            for (ChangeRecord c : zks.outstandingChanges) {

                if (c.stat == null) {

                    // Doing a delete

                    es.remove(c.path);

                else if (c.stat.getEphemeralOwner() == request.sessionId) {

                    es.add(c.path);

                }

            }

            for (String path2Delete : es) {

                addChangeRecord(new ChangeRecord(request.hdr.getZxid(),

                        path2Delete, null0null));

            }

                                                   

            zks.sessionTracker.setSessionClosing(request.sessionId);

        }

                                                   

        LOG.info("Processed session termination for sessionid: 0x"

                + Long.toHexString(request.sessionId));

        break;

      • 设置request.hdr,这个很重要,
        • 在FinalRequestProcessor.processRequest会有相应的处理

1

2

3

4

5

6

if (request.hdr != null) {

              TxnHeader hdr = request.hdr;

              Record txn = request.txn;

  

              rc = zks.processTxn(hdr, txn);

           }

?

    • 一旦某个session关闭,与session相关的EPHEMERAL类型的节点都得清除
    • 并且通过调用sessionTracker.setSessionClosing将session设置为关闭,使得后续此session上的请求无效

3.1.4)SessionTrackerImpl相关数据结构的清理

【详见SessionTrackerImpl.removeSession】


1

2

3

4

5

6

7

8

9

10

11

12

synchronized public void removeSession(long sessionId) {

    SessionImpl s = sessionsById.remove(sessionId);

    sessionsWithTimeout.remove(sessionId);

    if (LOG.isTraceEnabled()) {

        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,

                "SessionTrackerImpl --- Removing session 0x"

                + Long.toHexString(sessionId));

    }

    if (s != null) {

        sessionSets.get(s.tickTime).sessions.remove(s);

    }

}

分别对sessionsById、sessionsWithTimeout、sessionSets进行处理

3.2)session owner咋回事?

如果不是在集群环境,即没有LearnerHandler线程,session 的owner就是一个常量实例ServerCnxn.me

3.3)sessionsWithTimeout这个数据结构的用途?

sessionsWithTimeout存放的是sessionid和timeout,此数据结构会和ZKDatabase中相通,会被持久化

如果某个session timeout为60s,如果空闲了30s,意味着还能空闲30s,此时服务重启,那么此session的timeout又变为60s

3.4)touch session是干吗的?

每次一旦该session有请求,就会touch,意味着session的过期时间变为(基本等于当前时间+timeout)

具体算法为


1

2

3

4

private long roundToInterval(long time) {

        // We give a one interval grace period

        return (time / expirationInterval + 1) * expirationInterval;

    }

time为System.currentTimeMillis() + timeout

expirationInterval默认为ticktime

3.5)check session是干吗的

基本上所有的事务型操作,都会调用用来验证当前请求的session是否关闭,owner是否正确

4)小结

  • SessionTrackerImpl作为一个单独的线程专门处理过期session
  • SessionTrackerImpl有3个重要的数据结构sessionsById、sessionSets、sessionsWithTimeout,其中sessionsWithTimeout会被持久化
  • SessionTrackerImpl提供了几个常用的API
    • createSession
    • addSession
    • touchSession
    • removeSession
    • checkSession
    • setOwner
    • dumpSessions
时间: 2024-10-06 12:44:32

zookeeper session tracker机制分析的相关文章

从session实现机制分析模拟请求验证码的可行性(转)

悲剧了,发现写完这篇blog没有配上这个格调超高的标题. 1.0问题背景 现在要实现一个带验证码网站的的自动登陆功能.验证码识别过程不再这篇文章的讨论之中.(之后有篇文章我会详细的总结验证码的识别过程).现在问题来了,怎么拿到你本次请求登陆页面的验证码图片? 2.0方案分析 现在有几种思路: (1)请求登陆页面,截取验证码图片,类似截屏,seleinum,webbrower的DrawToBitmap()等. (2)还是webbrower,将图片复制到剪切板在从剪切板中搞出来 HTMLContro

zookeeper节点Watch机制实例展示

znode以某种方式发生变化时,“观察”(watch)机制可以让客户端得到通知.可以针对ZooKeeper服务的“操作”来设置观察,该服务的其他 操作可以触发观察. 实现Watcher,复写process方法,处理收到的变更 /** * Watcher Server,处理收到的变更 * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { LOG.info("收到事件通知:"

基于LNMT的Session持久机制的多种方案实现及深入分析

1. Session机制 Session就是会话,就是指的是对话双方或者交互双方建立的信息通道,从建立连接到断开连接的整个过程,称为一个会话.它是一种网络持久化的机制. HTTP协议是一种无状态协议,客户端和服务器建立连接传输完数据后即断开连接,客户端再次发起连接后,服务器端无法知道这个连接是否和上一个连接有什么关系,它只能认为是不同的连接. 为了解决这个问题,一般采用的方法有两种: 客户端保持,即cookie机制 服务器端保持,即session机制.为了保持这个状态信息,它要让客户端至少保留一

【Zookeeper】源码分析目录

Zookeeper源码分析目录如下 1. [Zookeeper]源码分析之序列化 2. [Zookeeper]源码分析之持久化(一)之FileTxnLog 3. [Zookeeper]源码分析之持久化(二)之FileSnap 4. [Zookeeper]源码分析之持久化(三)之FileTxnSnapLog 5. [Zookeeper]源码分析之Watcher机制(一) 6. [Zookeeper]源码分析之Watcher机制(二)之WatchManager 7. [Zookeeper]源码分析之

Android4.0 Surface机制分析

1. java层面的Surface 对于Surface我们的认识主要是android的类Surface, android的文档描述Surface是"Handle onto a raw buffer that is being managed by the screen compositor",这个描述透漏出两个信息:首先,Surface是一个raw buffer的句柄,通过它去管理一个raw buffer,其次,Surface本身是由screen compositor来管理的.但是ra

[译]MySQL不加锁实现一致性读的机制分析

原文直通车:Consistent Nonlocking Reads   MySQL的一致性读的机制是是这样实现的:InnoDB引擎为一个事务Tx提供一个在时间T1的版本快照(T1就是在本 事务中首次执行查询语句的时间点).事务Tx中可以查询到时间点T1之前提交的数据,时间点T1之后提交的数据在 Tx中是看不到的.唯一的例外Ex是在事务Tx中可以看到在本事务中提交的数据(即便是在T1时间点还没有提交的数据).   先建一个表,边理论边实践,具体看下MySQL是如何工作的. mysql> creat

品味ZooKeeper之Watcher机制_2

品味ZooKeeper之Watcher机制 本文思维导图如下: 前言 Watcher机制是zookeeper最重要三大特性数据节点Znode+Watcher机制+ACL权限控制中的其中一个,它是zk很多应用场景的一个前提,比如集群管理.集群配置.发布/订阅. Watcher机制涉及到客户端与服务器(注意,不止一个机器,一般是集群,这里先认为一个整体分析)的两者数据通信与消息通信,除此之外还涉及到客户端的watchManager. 下面正式进入主题. 1.watcher原理框架 由图看出,zk的w

QT开发(六十三)——QT事件机制分析

QT开发(六十三)--QT事件机制分析 一.事件机制 事件是由系统或者QT平台本身在不同的时刻发出的.当用户按下鼠标.敲下键盘,或者是窗口需要重新绘制的时候,都会发出一个相应的事件.一些事件在对用户操作做出响应时发出,如键盘事件等:另一些事件则是由系统自动发出,如计时器事件. 事件的出现,使得程序代码不会按照原始的线性顺序执行.线性顺序的程序设计风格不适合处理复杂的用户交互,如用户交互过程中,用户点击"打开文件"将开始执行打开文件的操作,用户点击"保存文件"将开始执

Linux通信之poll机制分析

poll机制分析 韦东山 2009.12.10 所有的系统调用,基于都可以在它的名字前加上"sys_"前缀,这就是它在内核中对应的函数.比如系统调用open.read.write.poll,与之对应的内核函数为:sys_open.sys_read.sys_write.sys_poll. 一.内核框架: 对于系统调用poll或select,它们对应的内核函数都是sys_poll.分析sys_poll,即可理解poll机制. sys_poll函数位于fs/select.c文件中,代码如下: