Jedis Cluster源码分析

最近一个项目用到Jedis客户端,需要对这个客户端进行改造。看了一下Jedis Cluster源码,做个记录

首先,说核心内容, 在Jedis源码中,关于cluster有个两个重要的map。一个是nodes,一个是slots

nodes:  host:port  ---->  JedisPool

slots:  slot ----> JedisPool

nodes存放的是key为host:port到JedisPool的映射

slots存放的 slot到JedisPool的映射

这里,JedisPool是用apache common pool存放jedis对象的pool,slot是通过Crc16算出对16384取余得到

上个Jedis Cluster的Demo吧

 1 import redis.clients.jedis.HostAndPort;
 2 import redis.clients.jedis.JedisCluster;
 3 import java.util.HashSet;
 4 import java.util.Set;
 5
 6 /**
 7  * Created by guanxianseng on 2017/8/15.
 8  *
 9  * nodes: host:port -> JedisPool
10  * slots: slot -> JedisPool
11  */
12 public class TestCluster {
13     public static void main(String[] args) {
14         Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
15         jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7340));
16         jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7341));
17         jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7342));
18         JedisCluster jc = new JedisCluster(jedisClusterNodes);
19         jc.set("name", "guanxianseng");
20         System.out.println(jc.get("name"));
21     }
22 }

输出

guanxianseng

Process finished with exit code 0

这里IP是我的虚拟机的IP,开了两台虚拟机,部署的是三主三从的集群

首先,进入JedisCluster的构造函数,一路找下去,我们会看到这样的代码

1 public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
2                                        final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
3     this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
4     initializeSlotsCache(nodes, poolConfig, password);
5   }

进入initializeSlotsCache方法

 1 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
 2     for (HostAndPort hostAndPort : startNodes) {
 3       Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
 4       if (password != null) {
 5         jedis.auth(password);
 6       }
 7       try {
 8         cache.discoverClusterNodesAndSlots(jedis);
 9         break;
10       } catch (JedisConnectionException e) {
11         // try next nodes
12       } finally {
13         if (jedis != null) {
14           jedis.close();
15         }
16       }
17     }
18   }

这里,获取集群节点的jedis对象,进入discoverClusterNodesAndSlots(jedis)

 1 public void discoverClusterNodesAndSlots(Jedis jedis) {
 2     w.lock();
 3
 4     try {
 5       reset();
 6       List<Object> slots = jedis.clusterSlots();
 7
 8       for (Object slotInfoObj : slots) {
 9         List<Object> slotInfo = (List<Object>) slotInfoObj;
10
11         if (slotInfo.size() <= MASTER_NODE_INDEX) {
12           continue;
13         }
14
15         List<Integer> slotNums = getAssignedSlotArray(slotInfo);
16
17         // hostInfos
18         int size = slotInfo.size();
19         for (int i = MASTER_NODE_INDEX; i < size; i++) {
20           List<Object> hostInfos = (List<Object>) slotInfo.get(i);
21           if (hostInfos.size() <= 0) {
22             continue;
23           }
24
25           HostAndPort targetNode = generateHostAndPort(hostInfos);
26           setupNodeIfNotExist(targetNode);
27           if (i == MASTER_NODE_INDEX) {
28             assignSlotsToNode(slotNums, targetNode);
29           }
30         }
31       }
32     } finally {
33       w.unlock();
34     }
35   }

第6行,其实就是执行slots命令。进入getAssignedSlotArray方法

private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
        .intValue(); slot++) {
      slotNums.add(slot);
    }
    return slotNums;
  }

这里获取了,节点分配的slots

回到上面,进入generateHostAndPort方法

private HostAndPort generateHostAndPort(List<Object> hostInfos) {
    return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
        ((Long) hostInfos.get(1)).intValue());
  }

这里获取到节点的host和port

回到上面,进入setupNodeIfNotExist(targetNode);

 1 public JedisPool setupNodeIfNotExist(HostAndPort node) {
 2     w.lock();
 3     try {
 4       String nodeKey = getNodeKey(node);
 5       JedisPool existingPool = nodes.get(nodeKey);
 6       if (existingPool != null) return existingPool;
 7
 8       JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
 9           connectionTimeout, soTimeout, password, 0, null, false, null, null, null);
10       nodes.put(nodeKey, nodePool);
11       return nodePool;
12     } finally {
13       w.unlock();
14     }
15   }

这里设置我们一开始提到的nodes, host:port   -------> JedisPool映射

继续回到上面,进入assignSlotsToNode(slotNums, targetNode);

 1 public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
 2     w.lock();
 3     try {
 4       JedisPool targetPool = setupNodeIfNotExist(targetNode);
 5       for (Integer slot : targetSlots) {
 6         slots.put(slot, targetPool);
 7       }
 8     } finally {
 9       w.unlock();
10     }
11   }

这里设置了前面说的slots, slot ------> JedisPool的映射

这里初始化完成

执行set命令

1 @Override
2   public String set(final String key, final String value) {
3     return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
4       @Override
5       public String execute(Jedis connection) {
6         return connection.set(key, value);
7       }
8     }.run(key);
9   }

进入run(key);方法

1 public T run(String key) {
2     if (key == null) {
3       throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
4     }
5
6     return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
7   }

进入runWithRetries()

 1 private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
 2     if (attempts <= 0) {
 3       throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
 4     }
 5
 6     Jedis connection = null;
 7     try {
 8
 9       if (asking) {
10         // TODO: Pipeline asking with the original command to make it
11         // faster....
12         connection = askConnection.get();
13         connection.asking();
14
15         // if asking success, reset asking flag
16         asking = false;
17       } else {
18         if (tryRandomNode) {
19           connection = connectionHandler.getConnection();
20         } else {
21           connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
22         }
23       }
24
25       return execute(connection);
26
27     } catch (JedisNoReachableClusterNodeException jnrcne) {

这里有点长,截取了前面一部分

1 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));

这里,计算key的slot,从slots获取Jedis对象

到这,基本已完成

总结一下,执行slots命令,缓存host:port --> JedisPool, slot ---->JedisPool映射。执行命令,key ---> slot ----> JedisPool   ------->Jedis

 
时间: 2024-07-30 13:48:18

Jedis Cluster源码分析的相关文章

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

Jedis源码分析

对于日常开发,Redis由于单线程的并发模型.丰富的数据结构和简单的API,深受广大程序员的喜爱.Redis提供了多种语言的API,像java.c和python等.之前一直都是使用redis,但是没有多redis的API有一个系统的认识.忙里偷闲,撸一下Redis相关的API的实现,由于我是一个java猿,那么我主要学习了一下jedis的源码,来分析一下Redis的读写流程. 一.Jedis项目结构 : 代码是比较简单的,而且很多类也没有那么多的抽象和继承,其实是比较好懂的.commands包里

Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster

在ClusterClient源码分析中,我们知道,他是依托于"Distributed Publish Subscribe in Cluster"来实现消息的转发的,那本文就来分析一下Pub/Sub是如何实现的. 还记得之前分析Cluster源码的文章吗?其实Cluster只是把集群内各个节点的信息通过gossip协议公布出来,并把节点的信息分发出来.但各个actor的地址还是需要开发者自行获取或设计的,比如我要跟worker通信,那就需要知道这个actor在哪个节点,通过actorPa

memcache-client-forjava 源码分析之MemcachedCacheManager

接上文<memcache-client-forjava 源码分析之DefaultCacheImpl分析>,主要分析ICache另外一个针对Memcached缓存实现,重点实现了memcached的高可用能力. 由于底层访问复用了java_memcached-release包的实现,memcache-client-forjava只是在上层做了简单封装.本文重点分析下如何进行的封装,以提高自己的设计经验.个人认为,java_memcached-release源码阅读,比spymemcached更简

solr源码分析之solrclound

一.简介 SolrCloud是Solr4.0版本以后基于Solr和Zookeeper的分布式搜索方案.SolrCloud是Solr的基于Zookeeper一种部署方式.Solr可以以多种方式部署,例如单机方式,多机Master-Slaver方式. 二.特色功能 SolrCloud有几个特色功能: 集中式的配置信息使用ZK进行集中配置.启动时可以指定把Solr的相关配置文件上传Zookeeper,多机器共用.这些ZK中的配置不会再拿到本地缓存,Solr直接读取ZK中的配置信息.配置文件的变动,所有

storm启动nimbus源码分析-nimbus.clj

nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus.bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数: nimbus函数 def nimbus(klass="backtype.storm.daemon.nimbus"):    """Syntax: [s

Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏玄机

文章目录 1. 开箱即用,内藏玄机 2. 总结 3. 源代码 Spring Boot提供了很多"开箱即用"的依赖模块,那么,Spring Boot 如何巧妙的做到开箱即用,自动配置的呢? 开箱即用,内藏玄机 Spring Boot提供了很多"开箱即用"的依赖模块,都是以spring-boot-starter-xx作为命名的.例如,之前提到的 spring-boot-starter-redis.spring-boot-starter-data-mongodb.spri

【Zookeeper】源码分析之Watcher机制(一)

一.前言 前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类. 二.总体框图 对于Watcher机制而言,主要涉及的类主要如下. 说明: Watcher,接口类型,其定义了process方法,需子类实现. Event,接口类型,Watcher的内部类,无任何方法. KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态. EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事

storm启动supervisor源码分析-supervisor.clj

supervisor是storm集群重要组成部分,supervisor主要负责管理各个"工作节点".supervisor与zookeeper进行通信,通过zookeeper的"watch机制"可以感知到是否有新的任务需要认领或哪些任务被重新分配.我们可以通用执行bin/storm supervisor >/dev/null 2>&1 &来启动supervisor.bin/storm是一个python脚本,在这个脚本中定义了一个superv