Redis Cluster 的实现 - 初始化(2)

2) 在监听套接口上通过调用 aeCreateFileEvent() 设置事件处理器,从签名的调用可以看到设置的 ACCEPT 事件处理器为 clusterAcceptHandler(),

aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, clusterAcceptHandler, NULL);
// 设置事件处理器    
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
      aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
    
    // 在文件描述符 fd 上设置添加事件
    // 会根据不同系统平台选择相应的 I/O 模型
    // 如: Linux 下的 epoll
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
        
    fe->mask |= mask;
    
    // 分别针对 read/write 事件设置 事件处理器
    // 这里设置针对 READ 事件处理的为 clusterAcceptHandler()
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

如上设置添加 ACCEPT 事件处理器后,当有新的连接过来后,会回调 clusterAcceptHandler() 来处理新的连接。

// 设置每次回调时最多处理 1000 个连接
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
// fd 为 监听套接字
// privdata 为在调用 aeCreateFileEvent() 时设置的 clientdata 参数,这里为 NULL
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    clusterLink *link;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    /* If the server is starting up, don‘t accept cluster connections:
     * UPDATE messages may interact with the database content. */
    if (server.masterhost == NULL && server.loading) return;
    
    // 循环 MAX_CLUSTER_ACCEPTS_PER_CALL 次来接受新的连接
    while(max--) {
        
        // 客户端连接的描述符为 cfg
        // cip 为客户端地址
        // cport 为已连接 socket 的本地端口
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_VERBOSE,
                    "Accepting cluster node: %s", server.neterr);
            return;
        }
        
        // 这里同样设置 连接 socket 为 非阻塞
        anetNonBlock(NULL,cfd);
        
        // 并且打开 socket 选项 TCP_NODELAY(禁止 TCP 的 Nagle 算法)
        anetEnableTcpNoDelay(NULL,cfd);
        /* Use non-blocking I/O for cluster messages. */
        redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
        
        /* Create a link object we use to handle the connection.
         * It gets passed to the readable handler when data is available.
         * Initiallly the link->node pointer is set to NULL as we don‘t know
         * which node is, but the right node is references once we know the
         * node identity. */
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

在成功接受客户端连接(这里其实表示其他 节点 的连接)后,创建 cluster 链路,并进行初始化,设置 read 事件处理器 clusterReadHandler。
    
  关于 redis cluster 的各个节点的初始化部分基本结束,下面就是等着接受连接,处理接收的数据,组件 cluster。
    
    [下一篇准备分析一下 clusterReadHandler 的实现。]

时间: 2024-10-16 20:50:23

Redis Cluster 的实现 - 初始化(2)的相关文章

Redis Cluster 的实现 - 初始化(1)

首先从 redis.c 源码的 main() 函数开始, 在调用的 initServer 函数中除了初始化 redis 节点本身的一些配置和环境之外,会根据是否设置 cluster_enabled 参数来对 cluster 进行初始化,如下: initServer    // 也就是 redis.conf 配置中的参数 cluster-enabled 如果设置为 yes,则进入 cluster 模式   -> if (server.cluster_enabled) clusterInit();

JFinal redis cluster集群插件

JFinal 框架到了2.1版本号,可是依旧仅仅支持redis的主从集群,没有看到Cluster集群的插件.笔者照着主从的插件方式,改了改,实现了个简单的插件,先使用起来,兴许会更新完好版本号. 插件地址:点击打开链接 附上源代码: package com.sxt.jfinal.rediscluster; import java.util.Set; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis

安全稳定实现redis cluster自动化迁移

背景 目前redis集群最火的是codis和redis cluster(官方),但官方自带工具并没有支持密码操作.那么需要密码认证使用redis cluster集群的同学要仔细看了哦. 相信大家很多人已经使用了redis cluster,而且也肯定会用到核心应用,你是否考虑过如下问题? redis cluster无密码,被改数据 redis cluster无密码,被flushall (你是否有要哭的冲动哈哈) redis cluster无密码,数据在光天化日(你对用户不负责) redis clu

Redis Cluster的实现和管理

Redis Cluster在redis3.0版本以上开始支持,以ruby环境运行.他可以把多个redis实例整合在一起,形成一个集群,集群内分配slot(分片槽),实现数据的分片存放.客户端只要以集群的模式连接上集群内任意一个节点,就可以操作整个集群. 集群角色有Master和Slave.Master之间分配slots,一共16384个slot.Slave向它指定的Master同步数据,实现备份.当其中的一个Master无法提供服务时,该Master的Slave讲提升为Master,保证集群间s

Redis Essentials 读书笔记 - 第九章: Redis Cluster and Redis Sentinel (Collective Intelligence)

Chapter 9. Redis Cluster and Redis Sentinel (Collective Intelligence) 上一章介绍了复制,一个master可以对应一个或多个slave(replica), 在以下的情况下是够用的: 1. master有足够内存容纳所有key 2. 通过slave可以扩展读,解决网络吞吐量的问题 3. 允许停止master的维护窗口时间 4. 通过slave做数据冗余 但复制解决不了自动failover和自动resharding的问题,在以下的情

Jedis cluster集群初始化源码剖析

Jedis cluster集群初始化源码剖析 环境 jar版本: spring-data-redis-1.8.4-RELEASE.jar.jedis-2.9.0.jar 测试环境: Redis 3.2.8,八个集群节点 applicationContext-redis-cluster.xml 配置文件: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.spri

高可用Redis(十二):Redis Cluster

Redis Cluster是Redis官方提供的Redis集群功能 1.为什么要实现Redis Cluster 1.主从复制不能实现高可用 2.随着公司发展,用户数量增多,并发越来越多,业务需要更高的QPS,而主从复制中单机的QPS可能无法满足业务需求 3.数据量的考虑,现有服务器内存不能满足业务数据的需要时,单纯向服务器添加内存不能达到要求,此时需要考虑分布式需求,把数据分布到不同服务器上 4.网络流量需求:业务的流量已经超过服务器的网卡的上限值,可以考虑使用分布式来进行分流 5.离线计算,需

Redis Cluster 集群使用(3)

简介 Redis3.0版本之前,可以通过Redis Sentinel(哨兵)来实现高可用(HA),从3.0版本之后,官方推出了Redis Cluster,它的主要用途是实现数据分片(Data Sharding),不过同样可以实现HA,是官方当前推荐的方案.在Redis Sentinel模式中,每个节点需要保存全量数据,冗余比较多,而在Redis Cluster模式中,每个分片只需要保存一部分的数据,对于内存数据库来说,还是要尽量的减少冗余.在数据量太大的情况下,故障恢复需要较长时间. Redis

Kubernetes 通过statefulset部署redis cluster集群

Kubernetes 通过statefulset部署redis cluster集群 作者: 张首富 时间: 2019-02-19 个人博客地址: https://www.zhangshoufu.com QQ群: 895291458 需要有redis基础 Redis集群架构图 每个Mater 都可以拥有多个slave.当Master掉线后,redis cluster集群会从多个Slave中选举出来一个新的Matser作为代替,而旧的Master重新上线后变成 Master 的Slave. 部署re