最近一个项目用到Jedis客户端,需要对这个客户端进行改造。看了一下Jedis Cluster源码,做个记录
首先,说核心内容, 在Jedis源码中,关于cluster有个两个重要的map。一个是nodes,一个是slots
nodes: host:port ----> JedisPool
slots: slot ----> JedisPool
nodes存放的是key为host:port到JedisPool的映射
slots存放的 slot到JedisPool的映射
这里,JedisPool是用apache common pool存放jedis对象的pool,slot是通过Crc16算出对16384取余得到
上个Jedis Cluster的Demo吧
1 import redis.clients.jedis.HostAndPort; 2 import redis.clients.jedis.JedisCluster; 3 import java.util.HashSet; 4 import java.util.Set; 5 6 /** 7 * Created by guanxianseng on 2017/8/15. 8 * 9 * nodes: host:port -> JedisPool 10 * slots: slot -> JedisPool 11 */ 12 public class TestCluster { 13 public static void main(String[] args) { 14 Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>(); 15 jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7340)); 16 jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7341)); 17 jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7342)); 18 JedisCluster jc = new JedisCluster(jedisClusterNodes); 19 jc.set("name", "guanxianseng"); 20 System.out.println(jc.get("name")); 21 } 22 }
输出
guanxianseng Process finished with exit code 0
这里IP是我的虚拟机的IP,开了两台虚拟机,部署的是三主三从的集群
首先,进入JedisCluster的构造函数,一路找下去,我们会看到这样的代码
1 public JedisClusterConnectionHandler(Set<HostAndPort> nodes, 2 final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) { 3 this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password); 4 initializeSlotsCache(nodes, poolConfig, password); 5 }
进入initializeSlotsCache方法
1 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { 2 for (HostAndPort hostAndPort : startNodes) { 3 Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); 4 if (password != null) { 5 jedis.auth(password); 6 } 7 try { 8 cache.discoverClusterNodesAndSlots(jedis); 9 break; 10 } catch (JedisConnectionException e) { 11 // try next nodes 12 } finally { 13 if (jedis != null) { 14 jedis.close(); 15 } 16 } 17 } 18 }
这里,获取集群节点的jedis对象,进入discoverClusterNodesAndSlots(jedis)
1 public void discoverClusterNodesAndSlots(Jedis jedis) { 2 w.lock(); 3 4 try { 5 reset(); 6 List<Object> slots = jedis.clusterSlots(); 7 8 for (Object slotInfoObj : slots) { 9 List<Object> slotInfo = (List<Object>) slotInfoObj; 10 11 if (slotInfo.size() <= MASTER_NODE_INDEX) { 12 continue; 13 } 14 15 List<Integer> slotNums = getAssignedSlotArray(slotInfo); 16 17 // hostInfos 18 int size = slotInfo.size(); 19 for (int i = MASTER_NODE_INDEX; i < size; i++) { 20 List<Object> hostInfos = (List<Object>) slotInfo.get(i); 21 if (hostInfos.size() <= 0) { 22 continue; 23 } 24 25 HostAndPort targetNode = generateHostAndPort(hostInfos); 26 setupNodeIfNotExist(targetNode); 27 if (i == MASTER_NODE_INDEX) { 28 assignSlotsToNode(slotNums, targetNode); 29 } 30 } 31 } 32 } finally { 33 w.unlock(); 34 } 35 }
第6行,其实就是执行slots命令。进入getAssignedSlotArray方法
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<Integer>(); for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)) .intValue(); slot++) { slotNums.add(slot); } return slotNums; }
这里获取了,节点分配的slots
回到上面,进入generateHostAndPort方法
private HostAndPort generateHostAndPort(List<Object> hostInfos) { return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue()); }
这里获取到节点的host和port
回到上面,进入setupNodeIfNotExist(targetNode);
1 public JedisPool setupNodeIfNotExist(HostAndPort node) { 2 w.lock(); 3 try { 4 String nodeKey = getNodeKey(node); 5 JedisPool existingPool = nodes.get(nodeKey); 6 if (existingPool != null) return existingPool; 7 8 JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), 9 connectionTimeout, soTimeout, password, 0, null, false, null, null, null); 10 nodes.put(nodeKey, nodePool); 11 return nodePool; 12 } finally { 13 w.unlock(); 14 } 15 }
这里设置我们一开始提到的nodes, host:port -------> JedisPool映射
继续回到上面,进入assignSlotsToNode(slotNums, targetNode);
1 public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { 2 w.lock(); 3 try { 4 JedisPool targetPool = setupNodeIfNotExist(targetNode); 5 for (Integer slot : targetSlots) { 6 slots.put(slot, targetPool); 7 } 8 } finally { 9 w.unlock(); 10 } 11 }
这里设置了前面说的slots, slot ------> JedisPool的映射
这里初始化完成
执行set命令
1 @Override 2 public String set(final String key, final String value) { 3 return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { 4 @Override 5 public String execute(Jedis connection) { 6 return connection.set(key, value); 7 } 8 }.run(key); 9 }
进入run(key);方法
1 public T run(String key) { 2 if (key == null) { 3 throw new JedisClusterException("No way to dispatch this command to Redis Cluster."); 4 } 5 6 return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false); 7 }
进入runWithRetries()
1 private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { 2 if (attempts <= 0) { 3 throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); 4 } 5 6 Jedis connection = null; 7 try { 8 9 if (asking) { 10 // TODO: Pipeline asking with the original command to make it 11 // faster.... 12 connection = askConnection.get(); 13 connection.asking(); 14 15 // if asking success, reset asking flag 16 asking = false; 17 } else { 18 if (tryRandomNode) { 19 connection = connectionHandler.getConnection(); 20 } else { 21 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); 22 } 23 } 24 25 return execute(connection); 26 27 } catch (JedisNoReachableClusterNodeException jnrcne) {
这里有点长,截取了前面一部分
1 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
这里,计算key的slot,从slots获取Jedis对象
到这,基本已完成
总结一下,执行slots命令,缓存host:port --> JedisPool, slot ---->JedisPool映射。执行命令,key ---> slot ----> JedisPool ------->Jedis