首先从 redis.c 源码的 main() 函数开始, 在调用的 initServer 函数中除了初始化 redis 节点本身的一些配置和环境之外,会根据是否设置 cluster_enabled 参数来对 cluster 进行初始化,如下:
initServer // 也就是 redis.conf 配置中的参数 cluster-enabled 如果设置为 yes,则进入 cluster 模式 -> if (server.cluster_enabled) clusterInit();
下面看一下 clusterInit 函数如何将 redis 带入 cluster 模式的:
void clusterInit(void) { int saveconf = 0; /* 对 redisServer 结构中的 clusterState 进行初始化 */ server.cluster = zmalloc(sizeof(clusterState)); server.cluster->myself = NULL; server.cluster->currentEpoch = 0; // 初始为 FAIL 状态 server.cluster->state = REDIS_CLUSTER_FAIL; // master 节点数 server.cluster->size = 1; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; server.cluster->failover_auth_rank = 0; server.cluster->failover_auth_epoch = 0; server.cluster->lastVoteEpoch = 0; server.cluster->stats_bus_messages_sent = 0; server.cluster->stats_bus_messages_received = 0; memset(server.cluster->slots,0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); /* Lock the cluster config file to make sure every node uses * its own nodes.conf. */ if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR) exit(1); /* Load or create a new nodes configuration. */ // 加载或创建一个新的 节点配置 文件 // 如果加载失败,则通过 createClusterNode 来创建一个 flags = MYSELF|MASTER 的新节点 if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) { /* No configuration found. We will just use the random name provided * by the createClusterNode() function. */ myself = server.cluster->myself = createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER); redisLog(REDIS_NOTICE,"No cluster configuration found, I‘m %.40s", myself->name); // 将此节点加入 cluster 的 nodes hash table 中 // 该 nodes 维护的是一张 nodeName -> node 的 hash 表 clusterAddNode(myself); saveconf = 1; } // 具体的配置文件名由参数 cluster-config-file 来指定 if (saveconf) clusterSaveConfigOrDie(1); /* We need a listening TCP port for our cluster messaging needs. */ server.cfd_count = 0; /* Port sanity check II * The other handshake port check is triggered too late to stop * us from trying to use a too-high cluster port number. */ // 这里是做端口校验,要求 redis 面向客户端的监听端口必须小于 55535,这样才能避免出现 // cluster 通道的监听端口 > 65535 的情形 if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) { redisLog(REDIS_WARNING, "Redis port number too high. " "Cluster communication port is 10,000 port " "numbers higher than your Redis port. " "Your Redis port number must be " "lower than 55535."); exit(1); } // 打开 cluster 通道的 非阻塞监听端口 if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR, server.cfd,&server.cfd_count) == REDIS_ERR) { exit(1); } else { int j; // 在一个或多个非阻塞监听套接字上创建 ACCEPT 事件处理器 // 可以根据系统平台选择合适的 事件模型(如:Linux 上的 epoll,具体查看 aeApiAddEvent 源码) // 这里指定了 clusterAcceptHandler 函数作为 ACCEPT 事件处理器 for (j = 0; j < server.cfd_count; j++) { if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster " "file event."); } } /* The slots -> keys map is a sorted set. Init it. */ server.cluster->slots_to_keys = zslCreate(); /* Set myself->port to my listening port, we‘ll just need to discover * the IP address via MEET messages. */ myself->port = server.port; resetManualFailover(); }
从该部分源码可以看出 Redis Cluster 初始化部分的核心主要是 nodes.conf 的加载,以及 cluster bus 通道的监听服务的启动 这两部分:
1,nodes.conf 的加载或创建:从源码中可以看到是首先加载节点配置(查看 clusterLoadConfig() 函数实现,这里不做展开),如果不存在或文件内容长度为0,则新创建该节点配置,并且初始创建只是包含本节点的配置信息。nodes.conf 配置示例如下:
$ cat nodes.conf
8868592d98d84b7cf5752cc0b97af4ac807d1a12 127.0.0.1:7007 slave bfc910f924d772fe03d9fe6a19aabd73d5730d26 0 1410882108055 8 connected
f5bdda1518cd3826100a30f5953ed82a5861ed48 127.0.0.1:7002 slave bfc910f924d772fe03d9fe6a19aabd73d5730d26 0 1410882107151 8 connected
82578e8ec9747e46cbb4b8cc2484c71b9b2c91f4 127.0.0.1:7001 master - 0 1410882106146 2 connected 6461-10922
61dfb1055760d5dcf6519e35435d60dc5b207940 127.0.0.1:7004 slave 82578e8ec9747e46cbb4b8cc2484c71b9b2c91f4 0 1410882107651 5 connected
6d1ebedad33bb31ffbaa99bad095eef4a5920857 127.0.0.1:7006 master - 0 1410882106648 0 connected
bfc910f924d772fe03d9fe6a19aabd73d5730d26 127.0.0.1:7005 master - 0 1410882106648 8 connected 11923-16383
35e0f6fdadbf81a00a1d6d1843698613e653867b 127.0.0.1:7003 slave 123ed65d59ff22370f2f09546f410d31207789f6 0 1410882106146 7 connected
123ed65d59ff22370f2f09546f410d31207789f6 127.0.0.1:7000 myself,master - 0 0 7 connected 0-6460 10923-11922
vars currentEpoch 8 lastVoteEpoch 8
可以看到这里列出了两种类型的信息:
1) 集群中所有节点的配置信息,各字段分别是:
0 NodeID:6d1ebedad33bb31ffbaa99bad095eef4a5920857
1 节点的IP:Port:127.0.0.1:7006
2 flag:标识节点的一些选项,可选的值为 master|slave|myself|fail|fail?|handshake|noaddr|noflags,对应于 clusterNode.flags
3 master 节点ID:如果为 slave 节点,则显示出 master 节点的标识,否则显示 - 表示该节点为 master 节点
4 最近一次发送 ping 的时间
5 最近一次接收 pong 的时间
6 configEpoch
7 连接状态
8 对于 master 节点,最后一个字段记录了处理的 slots 范围; 对于 migration, 格式为 [slots-> nodeId,表示将 slots 指定的槽位迁移到 nodeId 上去;
2) vars 开头的行,当前主要记录了两个变量 currentEpoch 和 lastVoteEpoch 的值(下面单独分析 epoch)