redis集群源码阅读 之 集群握手

集群节点的启动仍然是使用redis-server命令,但需要使用集群模式启动。启动完之后各个节点分别在各自的集群内,可以通过cluster meet命令将两个节点加入到同一个集群。集群相关的命令通过[email protected]这个api现实。下面主要通过源码分析来看看A节点向B节点发送cluster meet命令的过程。

  • 处理cluster meet命令的整个流程

  

if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
        /* CLUSTER MEET <ip> <port> */
        // 将给定地址的节点添加到当前节点所处的集群里面

        long long port;

        // 检查 port 参数的合法性
        if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
            addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }

        // 尝试与给定地址的节点进行连接
        if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
            errno == EINVAL)
        {
            // 连接失败
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
            // 连接成功
            addReply(c,shared.ok);
        }

    }

  可以看到检查ip:port之后就是进程cluster handshake。

  • 节点之间握手

   B节点首先根据提供的ip创建一个带有REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET标志的集群节点,并把A节点加入到集群中。并给A节点赋予一个随机的名字。

   

 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
 memcpy(n->ip,norm_ip,sizeof(n->ip));
 n->port = port;

 // 将节点添加到集群当中
 clusterAddNode(n);

   当以集群模式启动redis-server时,在时间事件循环serverCron中,以每秒10次的频率会执行clusterCron。 在clusterCron中会对集群的节点(cluster->nodes)做一些检查和处理。

    刚才创建的A节点,还是新建的状态,node->link还是NULL。 此时会向A节点创建一个tcp连接,用于后面节点之间的通信。

  

 if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;

            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR,
                    server.bindaddr_count ? server.bindaddr[0] : NULL);
            if (fd == -1) {
                redisLog(REDIS_DEBUG, "Unable to connect to "
                    "Cluster Node [%s]:%d -> %s", node->ip,
                    node->port+REDIS_CLUSTER_PORT_INCR,
                    server.neterr);
                continue;
            }
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
            /* Queue a PING in the new connection ASAP: this is crucial
             * to avoid false positives in failure detection.
             *
             * If the node is flagged as MEET, we send a MEET message instead
             * of a PING one, to force the receiver to add us in its node
             * table. */
            // 向新连接的节点发送 PING 命令,防止节点被识进入下线
            // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
            old_ping_sent = node->ping_sent;
            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);}

  其中ClusterLink用于处理与A节点之间的读写(有读写缓存)操作。创建完tcp之后会立即发送一条CLUSTERMSG_TYPE_MEET类型Ping命令给节点A。

    A节点收到B节点发来的Ping消息(处理消息的API为clusterProcessPacket)

  

if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
  redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);

  if (!sender && type == CLUSTERMSG_TYPE_MEET) {
    clusterNode *node;

    // 创建 HANDSHAKE 状态的新节点
    node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);

    // 设置 IP 和端口
    nodeIp2String(node->ip,link);
    node->port = ntohs(hdr->port);

    // 将新节点添加到集群
    clusterAddNode(node);

    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
  }

  /* Get info from the gossip section */
  // 分析并取出消息中的 gossip 节点信息
  clusterProcessGossipSection(hdr,link);

  /* Anyway reply with a PONG */
  // 向目标节点返回一个 PONG
  clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}

  会将B节点加入到cluster->nodes中,并设置标志位REDIS_NODE_HANDSHAKE。然后回复B节点一个带有A节点信息的PONG命令。B节点收到A节点发来的Pong命令,就会更新A节点的信息。

  同时,在A节点的clusterCron,也会处理新创建的节点B(作为客户端连上B节点的cfd,发送ping命令),至此握手完成。

  • 一些小细节

  节点的cfd是以port+REDIS_CLUSTER_PORT_INCR为端口创建的socket描述符,充当的是服务器。 同时,集群中的其他节点会向目标节点的port+REDIS_CLUSTER_PORT_INCR端口建立一个tcp连接,此时充当的是客户端。也就  是说每个节点即是服务端又是客户端。

  

  时间事件的实现

  

  

原文地址:https://www.cnblogs.com/coderht/p/10705959.html

时间: 2024-08-02 19:59:55

redis集群源码阅读 之 集群握手的相关文章

Redis源码阅读-Adlist双向链表

Redis源码阅读-链表部分- 链表数据结构在Adlist.h   Adlist.c Redis的链表是双向链表,内部定义了一个迭代器. 双向链表的函数主要是链表创建.删除.节点插入.头插入.尾插入.第N个节点.节点迭代遍历.链表复制.链表rotate.节点删除 typedef struct listNode { struct listNode *prev; struct listNode *next; void *value; //定义为void *类型,方便用户自行使用自己的数据结构 } l

Redis源码阅读(一)事件机制

Redis源码阅读(一)事件机制 Redis作为一款NoSQL非关系内存数据库,具有很高的读写性能,且原生支持的数据类型丰富,被广泛的作为缓存.分布式数据库.消息队列等应用.此外Redis还有许多高可用特性,包括数据持久化,主从模式备份等等,可以满足对数据完整有一定要求的场景. 而且Redis的源码结构简单清晰,有大量材料可以参阅:通过阅读Redis源码,掌握一些常用技术在Redis中的实现,相信会对个人编程水平有很大帮助.这里记录下我阅读Redis源码的心得.从我自己比较关心的几个技术点出发,

Redis源码阅读(二)高可用设计——复制

Redis源码阅读(二)高可用设计-复制 复制的概念:Redis的复制简单理解就是一个Redis服务器从另一台Redis服务器复制所有的Redis数据库数据,能保持两台Redis服务器的数据库数据一致. 使用场景:复制机制很实用,在客户端并发访问量很大,单台Redis扛不住的情况下,可以部署多台Redis复制相同的数据,共同对外提供服务,提高Redis并发访问处理能力.当然这种通过复制方式部署多台Redis以提高并发处理能力的方式只适用于客户端大部分访问为读数据请求的场景.此外,Redis从2.

Redis源码阅读一:简单动态字符串SDS

源码阅读基于Redis4.0.9 SDS介绍 redis 127.0.0.1:6379> SET dbname redis OK redis 127.0.0.1:6379> GET dbname "redis" 从上面的例子可以看到,key为dbname的值是一个字符串"redis" Redis源码是用c写成,但并没有使用c的字符串.c的字符串有以下缺点: 没有储存字符串长度的变量,获取长度只能靠遍历字符串 扩容麻烦.没有相应保护,容易造成缓冲区溢出 更

redis源码阅读——动态字符串sds

redis中动态字符串sds相关的文件为:sds.h与sds.c 一.数据结构 redis中定义了自己的数据类型"sds",用于描述 char*,与一些数据结构 1 typedef char *sds; 2 3 /* Note: sdshdr5 is never used, we just access the flags byte directly. 4 * However is here to document the layout of type 5 SDS strings. *

redis 5.0.7 源码阅读——双向链表

redis中动态字符串sds相关的文件为:adlist.h与adlist.c 一.数据结构 redis里定义的双向链表,与普通双向链表大致相同 单个节点: 1 typedef struct listNode { 2 struct listNode *prev; 3 struct listNode *next; 4 void *value; 5 } listNode; 链表: 1 typedef struct list { 2 listNode *head; 3 listNode *tail; 4

redis 5.0.7 源码阅读——跳跃表skiplist

redis中并没有专门给跳跃表两个文件.在5.0.7的版本中,结构体的声明与定义.接口的声明在server.h中,接口的定义在t_zset.c中,所有开头为zsl的函数. 一.数据结构 单个节点: typedef struct zskiplistNode { //key,唯一 sds ele; //分值,可重复 double score; //后退指针 struct zskiplistNode *backward; //层 struct zskiplistLevel { //前进指针 struc

Flume-NG源码阅读之HBaseSink

关于HBase的sink的所有内容均在org.apache.flume.sink.hbase包下. 每个sink包括自己定制的,都extends AbstractSink implements Configurable. 一.首先是configure(Context context)方法.该方法是对HBaseSink的参数初始化.主要包括以下几个: tableName:要写入的HBase数据表名,不能为空: columnFamily:数据表对应的列簇名,这个sink目前只支持一个列簇,不能为空:

Hadoop源码阅读环境搭建

Hadoop源码阅读环境搭建 一.说明 作为一个学习hadoop的同学,必须在本机上搭建hadoop源码阅读环境,这样,在方便阅读源码的同时也方便进行调试和源码修改.好了,下面开始搭建环境. 1.环境说明:hadoop 版本:1.2.1. IDE:eclipse.操作系统:centos 2.网上有人是通过eclipse的新建项目指定目录的方式将hadoop目录转换成Eclipse工程同时导入eclipse,具体做法如下: File-->new-->Java Project-->勾掉Use