redis6.0.5之cluster.c阅读笔记1-集群初始化


*********************************************************************************
在server.c文件中,服务器启动的时候,如果是集群模式,那么会调用函数clusterInit
void initServer(void) {
   。。。。。。

    if (server.cluster_enabled) clusterInit(); 如果是集群模式,这里开始初始化集群
  。。。。。。。
}
*********************************************************************************
void clusterInit(void) {
    int saveconf = 0;

    server.cluster = zmalloc(sizeof(clusterState));  给集群的状态分配空间
    server.cluster->myself = NULL;
    server.cluster->currentEpoch = 0; 开始的纪元
    server.cluster->state = CLUSTER_FAIL;  初始状态为失败
    server.cluster->size = 1;
    server.cluster->todo_before_sleep = 0;
    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); 初始化 名字->节点 字典
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL); 节点黑名单
    server.cluster->failover_auth_time = 0;
    server.cluster->failover_auth_count = 0;
    server.cluster->failover_auth_rank = 0;
    server.cluster->failover_auth_epoch = 0;
    server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; 故障转移失败原因
    server.cluster->lastVoteEpoch = 0; 最近一次投票纪元
    for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {  集群消息类型
        server.cluster->stats_bus_messages_sent[i] = 0;
        server.cluster->stats_bus_messages_received[i] = 0;
    }
    server.cluster->stats_pfail_nodes = 0;
    memset(server.cluster->slots,0, sizeof(server.cluster->slots)); 清空槽
    clusterCloseAllSlots(); 清理所有槽的迁移和导入状态

    /* Lock the cluster config file to make sure every node uses
     * its own nodes.conf. */ 锁住集群的配置文件,确保每个节点使用自己的配置nodes.conf
    if (clusterLockConfig(server.cluster_configfile) == C_ERR) 锁文件是否成功,失败退出
        exit(1);

    /* Load or create a new nodes configuration. */ 加载或者创建一个新节点的配置文件
    if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
        /* No configuration found. We will just use the random name provided
         * by the createClusterNode() function. */
        没有发现节点配置文件,我们将使用函数createClusterNode创建的随机名字
        myself = server.cluster->myself =
            createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER); 创建新的节点配置
        serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
            myself->name);
        clusterAddNode(myself);添加节点到集群字典
        saveconf = 1; 设置保存标志
    }
    if (saveconf) clusterSaveConfigOrDie(1); 保存节点配置文件

    /* We need a listening TCP port for our cluster messaging needs. */
    我们需要一个监听TCP端口来满足我们的集群消息需要
    server.cfd_count = 0;

    /* Port sanity check II
     * The other handshake port check is triggered too late to stop
     * us from trying to use a too-high cluster port number. */
端口合规检查II
另外一个握手端口检查触发的太晚,不能阻止我们尝试使用更高的集群端口
    int port = server.tls_cluster ? server.tls_port : server.port; 是否使用安全端口
    if (port > (65535-CLUSTER_PORT_INCR)) { 集群的通信端口需要在服务端口上加10000,如果超过了65535,就报错
        serverLog(LL_WARNING, "Redis port number too high. "
                   "Cluster communication port is 10,000 port "
                   "numbers higher than your Redis port. "
                   "Your Redis port number must be "
                   "lower than 55535.");
        exit(1);
    }
    if (listenToPort(port+CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == C_ERR) 监听端口
    {
        exit(1); 失败退出
    } else {
        int j;

        for (j = 0; j < server.cfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler, NULL) == AE_ERR) 
            对每个文件描述符设置读取事件,因为为非阻塞的异步,所以需要回调函数处理
                    serverPanic("Unrecoverable error creating Redis Cluster "
                                "file event.");
        }
    }

    /* The slots -> keys map is a radix tree. Initialize it here. */
    用来保存键值对的槽是一颗基树,在这里初始化
    server.cluster->slots_to_keys = raxNew();
    memset(server.cluster->slots_keys_count,0,
           sizeof(server.cluster->slots_keys_count)); 初始化键数目为0个

    /* Set myself->port / cport to my listening ports, we'll just need to
     * discover the IP address via MEET messages. */
    设置本身的服务端口/通信端口 到我的监听端口,我们将通过MEET消息发现IP地址
    myself->port = port;
    myself->cport = port+CLUSTER_PORT_INCR; 通信端口 
    if (server.cluster_announce_port) 如果设置了cluster_announce_port,那么就使用设置值
        myself->port = server.cluster_announce_port;
    if (server.cluster_announce_bus_port) 如果设置了cluster_announce_bus_port,那么就使用设置值
        myself->cport = server.cluster_announce_bus_port;

    server.cluster->mf_end = 0; 无手动故障转移
    resetManualFailover(); 重置手工故障转移状态
    clusterUpdateMyselfFlags();
}
*********************************************************************************
/* Clear the migrating / importing state for all the slots.
 * This is useful at initialization and when turning a master into slave. */
清理所有槽的迁移和导入状态,这个函数在初始化和一个节点从主转为从的时候有用
void clusterCloseAllSlots(void) {
    memset(server.cluster->migrating_slots_to,0,
        sizeof(server.cluster->migrating_slots_to)); 
    memset(server.cluster->importing_slots_from,0,
        sizeof(server.cluster->importing_slots_from));
}
*********************************************************************************
/* Lock the cluster config using flock(), and leaks the file descritor used to
 * acquire the lock so that the file will be locked forever.
使用函数flock锁住集群配置,获取文件的锁,通过泄漏文件描述符,这样能一直保持住锁
 * This works because we always update nodes.conf with a new version
 * in-place, reopening the file, and writing to it in place (later adjusting
 * the length with ftruncate()).
这样能工作,是因为我们总是用新版本更新节点配置文件nodes.conf,
重新打开文件,写入数据(后面通过函数ftruncate调整长度)
 * On success C_OK is returned, otherwise an error is logged and
 * the function returns C_ERR to signal a lock was not acquired. */
成功返回C_OK,否则会记录错误同时返回C_ERR提示没有获取到锁
int clusterLockConfig(char *filename) {
/* flock() does not exist on Solaris 
 * and a fcntl-based solution won't help, as we constantly re-open that file,
 * which will release _all_ locks anyway
 */
函数flock不存在在Solaris系统,而且基于fcntl函数的解决方案也不行,
因为当我们不断的重新打开那个文件时,会释放所有的锁。
#if !defined(__sun)
    /* To lock it, we need to open the file in a way it is created if
     * it does not exist, otherwise there is a race condition with other
     * processes. */
     为了锁住文件,我们需要使用 假设文件不存在时创建的方式 打开文件,否则容易和其它进程产生竞争条件
    int fd = open(filename,O_WRONLY|O_CREAT,0644);
    if (fd == -1) {
        serverLog(LL_WARNING,
            "Can't open %s in order to acquire a lock: %s",
            filename, strerror(errno));
        return C_ERR;
    }

    if (flock(fd,LOCK_EX|LOCK_NB) == -1) {  锁住文件,如果锁不住就返回失败
        if (errno == EWOULDBLOCK) {
            serverLog(LL_WARNING,
                 "Sorry, the cluster configuration file %s is already used "
                 "by a different Redis Cluster node. Please make sure that "
                 "different nodes use different cluster configuration "
                 "files.", filename);
        } else {
            serverLog(LL_WARNING,
                "Impossible to lock %s: %s", filename, strerror(errno));
        }
        close(fd);
        return C_ERR;
    }
    /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
     * lock to the file as long as the process exists. */
     已经获取锁: 通过不关闭fd,内存泄漏,这样只要进程存在,我们就能保持对文件的锁
#endif /* __sun */

    return C_OK;
}
*********************************************************************************
/* Load the cluster config from 'filename'.
通过文件名加载集群配置文件
 * If the file does not exist or is zero-length (this may happen because
 * when we lock the nodes.conf file, we create a zero-length one for the
 * sake of locking if it does not already exist), C_ERR is returned.
 * If the configuration was loaded from the file, C_OK is returned. */
如果文件不存在或者是空的(发生这种情况是因为我们锁住了节点配置文件,因为文件不存在我们创建了一个的文件),返回错误
如果成功加载文件中的配置,返回成功
int clusterLoadConfig(char *filename) {
    FILE *fp = fopen(filename,"r");  以只读方式打开文件
    struct stat sb;
    char *line;
    int maxline, j;

    if (fp == NULL) {
        if (errno == ENOENT) {
            return C_ERR;
        } else {
            serverLog(LL_WARNING,
                "Loading the cluster node config from %s: %s",
                filename, strerror(errno));
            exit(1);
        }
    }

    /* Check if the file is zero-length: if so return C_ERR to signal
     * we have to write the config. */
     检查文件是否为空:为空就返回错误表示我们需要写配置文件
    if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        fclose(fp);
        return C_ERR;
    }

    /* Parse the file. Note that single lines of the cluster config file can
     * be really long as they include all the hash slots of the node.
     * This means in the worst possible case, half of the Redis slots will be
     * present in a single line, possibly in importing or migrating state, so
     * together with the node ID of the sender/receiver.
分析文件。注意到集群配置文件的单行可能很长,因为它包含了节点的所有哈希槽。
这就意味着在最坏的情况,半数的redis槽可能在同一行,可能处于导入或者迁移状态,
同时与发送方/接收方的节点ID一起
     * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
为了简单起见,我们直接为每行分配1024+CLUSTER_SLOTS*128个字节   #define CLUSTER_SLOTS 16384
    maxline = 1024+CLUSTER_SLOTS*128;    
    line = zmalloc(maxline);
    while(fgets(line,maxline,fp) != NULL) {  读入每行进行分析
        int argc;
        sds *argv;
        clusterNode *n, *master;
        char *p, *s;

        /* Skip blank lines, they can be created either by users manually
         * editing nodes.conf or by the config writing process if stopped
         * before the truncate() call. */
        跳过空白行,这些空白行可能由用户手工编辑文件nodes.conf或者通过写入配置文件但是在调用函数truncate停止
        if (line[0] == '\n' || line[0] == '\0') continue;

        /* Split the line into arguments for processing. */ 为了处理将行分割成参数
        argv = sdssplitargs(line,&argc);
        if (argv == NULL) goto fmterr;

        /* Handle the special "vars" line. Don't pretend it is the last
         * line even if it actually is when generated by Redis. */
         处理特殊的vars行,不用假装它是最后一行,即使它实际上是最后一行,这行是由redis自动产生的
本机器上的一个节点的配置文件如下所示:
[root@redis2 soft]# cat nodes-6379.conf 
a76d900cc3d3e6d026e50961b802daf9e6e3b860 10.100.13.81:6379@16379 slave a9b18e0f93ee0a90901c7b4c94bee89394f44867 0 1639538056918 8 connected
3bfc005b03062aec4b87d2b9fa6db246149be661 10.100.13.111:6379@16379 master - 0 1639538052912 9 connected 0-5460
4b84fbde91ba4b50a0da0ab574ad94f739b4b68c 10.100.13.113:6379@16379 master - 0 1639538050910 7 connected
a9b18e0f93ee0a90901c7b4c94bee89394f44867 10.100.13.109:6379@16379 master - 0 1639538054915 8 connected 5461-16383
41ae2e6271a8e5e139b3d097daa1cb5075ee8a03 10.100.13.160:6379@16379 slave 3bfc005b03062aec4b87d2b9fa6db246149be661 0 1639538057577 9 connected
e3e3ff648f5924b417d434e9a4f9169a1ea2ba8c 10.100.13.88:6379@16379 myself,slave a9b18e0f93ee0a90901c7b4c94bee89394f44867 0 1639537649000 2 connected
vars currentEpoch 9 lastVoteEpoch 0
         
        if (strcasecmp(argv[0],"vars") == 0) {  最后一行
            if (!(argc % 2)) goto fmterr;
            for (j = 1; j < argc; j += 2) {
                if (strcasecmp(argv[j],"currentEpoch") == 0) { 当前纪元
                    server.cluster->currentEpoch =
                            strtoull(argv[j+1],NULL,10);
                } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) { 最后投票纪元
                    server.cluster->lastVoteEpoch =
                            strtoull(argv[j+1],NULL,10);
                } else {
                    serverLog(LL_WARNING,
                        "Skipping unknown cluster config variable '%s'",
                        argv[j]);
                }
            }
            sdsfreesplitres(argv,argc);
            continue;
        }

        /* Regular config lines have at least eight fields */ 正常的一行至少有8个字段
        if (argc < 8) { 少于8个字段的行不正常
            sdsfreesplitres(argv,argc);
            goto fmterr;
        }

        /* Create this node if it does not exist */ 如果这个节点不存在就创建
        n = clusterLookupNode(argv[0]); 查找节点是否存在
        if (!n) {
            n = createClusterNode(argv[0],0); 新建节点
            clusterAddNode(n); 将节点加入到节点哈希表
        }
        /* Address and port */ 地址和端口
        if ((p = strrchr(argv[1],':')) == NULL) { 通过中间的:符号判断
            sdsfreesplitres(argv,argc);
            goto fmterr;
        }
        *p = '\0';
        memcpy(n->ip,argv[1],strlen(argv[1])+1);获取前面的IP
        char *port = p+1; 端口开始位置
        char *busp = strchr(port,'@');判读@符号
        if (busp) {
            *busp = '\0'; 结束位置
            busp++;
        }
        n->port = atoi(port); 字符串转为整数
        /* In older versions of nodes.conf the "@busport" part is missing.
         * In this case we set it to the default offset of 10000 from the
         * base port. */
         在老版本的节点配置文件中, "@busport"部分是没有的。在这种情况下,我们默认从基本的端口设置偏移值10000
        n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;

        /* Parse flags */ 解析标志
        p = s = argv[2]; 第三个参数
        while(p) {  这里可能有多个参数,都是用逗号分隔,需要循环处理
            p = strchr(s,','); 
            if (p) *p = '\0';
            if (!strcasecmp(s,"myself")) {
                serverAssert(server.cluster->myself == NULL);
                myself = server.cluster->myself = n; 
                n->flags |= CLUSTER_NODE_MYSELF;
            } else if (!strcasecmp(s,"master")) {
                n->flags |= CLUSTER_NODE_MASTER;
            } else if (!strcasecmp(s,"slave")) {
                n->flags |= CLUSTER_NODE_SLAVE;
            } else if (!strcasecmp(s,"fail?")) {
                n->flags |= CLUSTER_NODE_PFAIL;
            } else if (!strcasecmp(s,"fail")) {
                n->flags |= CLUSTER_NODE_FAIL;
                n->fail_time = mstime();
            } else if (!strcasecmp(s,"handshake")) {
                n->flags |= CLUSTER_NODE_HANDSHAKE;
            } else if (!strcasecmp(s,"noaddr")) {
                n->flags |= CLUSTER_NODE_NOADDR;
            } else if (!strcasecmp(s,"nofailover")) {
                n->flags |= CLUSTER_NODE_NOFAILOVER;
            } else if (!strcasecmp(s,"noflags")) {
                /* nothing to do */
            } else {
                serverPanic("Unknown flag in redis cluster config file");
            }
            if (p) s = p+1; 如果还没有结束,指向下个通过逗号分隔的字段
        }

        /* Get master if any. Set the master and populate master's
         * slave list. */ 如果是master,设置主机并且填充主机的从机列表
        if (argv[3][0] != '-') {
            master = clusterLookupNode(argv[3]); 通过名字查找节点
            if (!master) {如果主机为空
                master = createClusterNode(argv[3],0); 创建节点
                clusterAddNode(master);加入到节点哈希表
            }
            n->slaveof = master; 从机的主机
            clusterNodeAddSlave(master,n); 将从节点添加到主节点的从节点列表
        }

        /* Set ping sent / pong received timestamps */ 设置ping发出和pong收到的时间
        if (atoi(argv[4])) n->ping_sent = mstime();
        if (atoi(argv[5])) n->pong_received = mstime();

        /* Set configEpoch for this node. */ 为当前节点设置纪元
        n->configEpoch = strtoull(argv[6],NULL,10);

        /* Populate hash slots served by this instance. */ 该实例服务的哈希槽
        for (j = 8; j < argc; j++) {
            int start, stop;

            if (argv[j][0] == '[') {
                /* Here we handle migrating / importing slots */ 这里我们处理 迁移 或者 导入的 槽
                int slot;
                char direction;
                clusterNode *cn;

                p = strchr(argv[j],'-');
                serverAssert(p != NULL);
                *p = '\0';
                direction = p[1]; /* Either '>' or '<' */  方向 > 或者 <
                slot = atoi(argv[j]+1);跳过[符,开始的slot
                if (slot < 0 || slot >= CLUSTER_SLOTS) { slot是否超出正常范围
                    sdsfreesplitres(argv,argc);
                    goto fmterr;
                }
                p += 3; 指向节点名字开始位置,因为这里的两种格式为 ->- 或 -<- 后面跟着节点名,
                cn = clusterLookupNode(p); 查找节点
                if (!cn) {
                    cn = createClusterNode(p,0); 不存在就新建
                    clusterAddNode(cn);
                }
                if (direction == '>') {
                    server.cluster->migrating_slots_to[slot] = cn; 迁出
                } else {
                    server.cluster->importing_slots_from[slot] = cn; 导入
                }
                continue;
            } else if ((p = strchr(argv[j],'-')) != NULL) {
                *p = '\0';
                start = atoi(argv[j]); 开始的slot
                stop = atoi(p+1); 结束的slot
            } else {
                start = stop = atoi(argv[j]); 只有一个slot
            }
            if (start < 0 || start >= CLUSTER_SLOTS ||
                stop < 0 || stop >= CLUSTER_SLOTS)
            {
                sdsfreesplitres(argv,argc);
                goto fmterr;
            }
            while(start <= stop) clusterAddSlot(n, start++); 给节点一个一个添加slot
        }

        sdsfreesplitres(argv,argc);
    }
    /* Config sanity check */ 配置合法性检测
    if (server.cluster->myself == NULL) goto fmterr;

    zfree(line);
    fclose(fp);

    serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name); 打印40个字符

    /* Something that should never happen: currentEpoch smaller than
     * the max epoch found in the nodes configuration. However we handle this
     * as some form of protection against manual editing of critical files. */
     下面这个些事情可能发生: 当前纪元小于节点配置中的最大纪元。
     然而我们这样做主要是为了保护手工编辑节点文件导致的问题(就是手工修改节点文件导致纪元变化)
    if (clusterGetMaxEpoch() > server.cluster->currentEpoch) { 获取当前最大的纪元
        server.cluster->currentEpoch = clusterGetMaxEpoch();
    }
    return C_OK;

fmterr:
    serverLog(LL_WARNING,
        "Unrecoverable error: corrupted cluster config file.");
    zfree(line);
    if (fp) fclose(fp);
    exit(1);
}
*********************************************************************************
/* Node lookup by name */ 通过名字查找节点
clusterNode *clusterLookupNode(const char *name) {
    sds s = sdsnewlen(name, CLUSTER_NAMELEN);
    dictEntry *de;

    de = dictFind(server.cluster->nodes,s); 在字典中通过名字查找节点
    sdsfree(s);
    if (de == NULL) return NULL;
    return dictGetVal(de);
}
*********************************************************************************
/* Create a new cluster node, with the specified flags.
 * If "nodename" is NULL this is considered a first handshake and a random
 * node name is assigned to this node (it will be fixed later when we'll
 * receive the first pong).
使用特定的标志创建一个新的集群节点。
如果节点名字是空的,那就是第一次,就会使用随机名字赋值给这个节点(当我们收到第一个回复的pong时就会修复)
 * The node is created and returned to the user, but it is not automatically
 * added to the nodes hash table. */
节点被创建并且被返回给用户,但不会自动加入到节点的哈希表(意味着需要后面自己加)
clusterNode *createClusterNode(char *nodename, int flags) {
    clusterNode *node = zmalloc(sizeof(*node));

    if (nodename)
        memcpy(node->name, nodename, CLUSTER_NAMELEN);
    else
        getRandomHexChars(node->name, CLUSTER_NAMELEN); 获取随机的名字 40个字符
    node->ctime = mstime(); 创建时间
    node->configEpoch = 0; 开始纪元
    node->flags = flags;
    memset(node->slots,0,sizeof(node->slots)); 清空节点的槽
    node->numslots = 0;
    node->numslaves = 0;
    node->slaves = NULL;
    node->slaveof = NULL;
    node->ping_sent = node->pong_received = 0;
    node->data_received = 0;
    node->fail_time = 0;
    node->link = NULL;
    memset(node->ip,0,sizeof(node->ip));
    node->port = 0;
    node->cport = 0;
    node->fail_reports = listCreate();
    node->voted_time = 0;
    node->orphaned_time = 0;
    node->repl_offset_time = 0;
    node->repl_offset = 0;
    listSetFreeMethod(node->fail_reports,zfree);
    return node;
}
*********************************************************************************
/* Add a node to the nodes hash table */ 将节点加入到节点哈希表
int clusterAddNode(clusterNode *node) {
    int retval;

    retval = dictAdd(server.cluster->nodes,
            sdsnewlen(node->name,CLUSTER_NAMELEN), node);
    return (retval == DICT_OK) ? C_OK : C_ERR;
}
*********************************************************************************
将从节点添加到主节点的从节点列表中
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
    int j;

    /* If it's already a slave, don't add it again. */ 如果已经是从节点,那么就不要重复添加
    for (j = 0; j < master->numslaves; j++)
        if (master->slaves[j] == slave) return C_ERR;
    master->slaves = zrealloc(master->slaves,
        sizeof(clusterNode*)*(master->numslaves+1));  不在的情况下,需要扩容
    master->slaves[master->numslaves] = slave;
    master->numslaves++;
    master->flags |= CLUSTER_NODE_MIGRATE_TO;
    return C_OK;
}
*********************************************************************************
/* Add the specified slot to the list of slots that node 'n' will
 * serve. Return C_OK if the operation ended with success.
 * If the slot is already assigned to another instance this is considered
 * an error and C_ERR is returned. */
将特定的槽加入到提供服务的节点的槽列表中。如果操作成功返回C_OK。
如果槽已经分配给零食的实例,那被人为是错误的并且返回C_ERR
int clusterAddSlot(clusterNode *n, int slot) {
    if (server.cluster->slots[slot]) return C_ERR;
    clusterNodeSetSlotBit(n,slot);
    server.cluster->slots[slot] = n;
    return C_OK;
}

/* Set the slot bit and return the old value. */
设置槽的比特位,返回旧值
int clusterNodeSetSlotBit(clusterNode *n, int slot) {
    int old = bitmapTestBit(n->slots,slot); 测试比特位,顺便返回原来的比特位的值
    bitmapSetBit(n->slots,slot); 设置slot位置设置比特位
    if (!old) { 没有旧值
        n->numslots++;  那么该节点服务的槽就多了一个
        /* When a master gets its first slot, even if it has no slaves,
         * it gets flagged with MIGRATE_TO, that is, the master is a valid
         * target for replicas migration, if and only if at least one of
         * the other masters has slaves right now.
         *当一个主机节点获取它的首个槽时,即使它没有任何从机,它会获得MIGRATE_TO的标志,
         那就是说,这个主机是有效的可以做复制迁移的,当且仅当现在另外的主机至少有一个从机
         * Normally masters are valid targerts of replica migration if:
         * 1. The used to have slaves (but no longer have).
         * 2. They are slaves failing over a master that used to have slaves.
         正常的主机都是有效的复制迁移目标,如果满足下面条件:
         1.他们以前拥有过从机(但是现在没有)
         2.他们是从机,但是他们的主机下线了
         * However new masters with slots assigned are considered valid
         * migration tagets if the rest of the cluster is not a slave-less.
        然而如果集群的其余部分不是没有从机的,那么分配了插槽的新主机被视为有效的迁移标记
         * See https://github.com/antirez/redis/issues/3043 for more info. */
        if (n->numslots == 1 && clusterMastersHaveSlaves())  至少有一个槽,并且集群有从机(存在迁移的可能性)
            n->flags |= CLUSTER_NODE_MIGRATE_TO; 就设置标志CLUSTER_NODE_MIGRATE_TO
    }
    return old;
}
判断集群主机是否有从机
/* Return non-zero if there is at least one master with slaves in the cluster.
 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
 * MIGRATE_TO flag the when a master gets the first slot. */
如果集群中至少有一台主机有从机,就返回非0,否则返回0.
该函数由函数clusterNodeSetSlotBit调用,用来给主机获取第一个槽的时候设置标志MIGRATE_TO
int clusterMastersHaveSlaves(void) {
    dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
    dictEntry *de;
    int slaves = 0;
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        if (nodeIsSlave(node)) continue;  
        slaves += node->numslaves; 遍历主机,累加从节点数目
    }
    dictReleaseIterator(di);
    return slaves != 0;
}

/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
 * otherwise 0. */
测试一个普通bitmap的比特位,返回1如果这个比特被设置了,否则返回0
int bitmapTestBit(unsigned char *bitmap, int pos) {
    off_t byte = pos/8; 获取字符位置
    int bit = pos&7; 获取字符中比特位置
    return (bitmap[byte] & (1<0;  针对该比特测试
}

/* Set the bit at position 'pos' in a bitmap. */
在bitmap位置pos设置比特位
void bitmapSetBit(unsigned char *bitmap, int pos) {
    off_t byte = pos/8; 获取字符位置
    int bit = pos&7; 获取字符中比特位置
    bitmap[byte] |= 1<<bit; 设置比特值
}

/* Clear the bit at position 'pos' in a bitmap. */
清除在bitmap位置pos的比特位
void bitmapClearBit(unsigned char *bitmap, int pos) {
    off_t byte = pos/8;
    int bit = pos&7;
    bitmap[byte] &= ~(1<<bit);
}
*********************************************************************************
/* Return the greatest configEpoch found in the cluster, or the current
 * epoch if greater than any node configEpoch. */
返回集群中能找到的最大的纪元,或者当前状态的纪元如果它比任何节点的纪元都大
获取集群当前最大的纪元
uint64_t clusterGetMaxEpoch(void) {
    uint64_t max = 0;
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        if (node->configEpoch > max) max = node->configEpoch;  找所有节点最大的纪元
    }
    dictReleaseIterator(di);
    if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch; 和当前状态的纪元比较
    return max;
}
*********************************************************************************
/* Cluster node configuration is exactly the same as CLUSTER NODES output.
集群节点的配置同命令CLUSTER NODES的输出是一致的。
 * This function writes the node config and returns 0, on error -1
 * is returned.
这个函数写入节点配置返回0,有错误返回-1
 * Note: we need to write the file in an atomic way from the point of view
 * of the POSIX filesystem semantics, so that if the server is stopped
 * or crashes during the write, we'll end with either the old file or the
 * new one. Since we have the full payload to write available we can use
 * a single write to write the whole file. If the pre-existing file was
 * bigger we pad our payload with newlines that are anyway ignored and truncate
 * the file afterward. */
注意:我们写这个文件从POSIX文件系统的语义来看是用原子的方式,所以如果服务器在写的过程中停止或者崩溃,
我们或者用旧文件结束或者使用新文件。因为我们拥有完整的负荷来写,所以我们可以使用一次写将整个文件写入。
如果之前存在的文件更大,我们用新行添加我们的内容,这些新行会被忽略,最后我们使用截断文件
int clusterSaveConfig(int do_fsync) {
    sds ci;
    size_t content_size;
    struct stat sb;
    int fd;

    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;

    /* Get the nodes description and concatenate our "vars" directive to
     * save currentEpoch and lastVoteEpoch. */
获取节点描述拼接我们的"vars"来保存当前纪元和最后投票纪元
    ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);所有描述节点的内容
    ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
        (unsigned long long) server.cluster->currentEpoch,
        (unsigned long long) server.cluster->lastVoteEpoch); 最后添加的一行
    content_size = sdslen(ci);

    if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
        == -1) goto err; 打开文件失败

    /* Pad the new payload if the existing file length is greater. */
    填充新的内容如果已经存在的文件长度更大
    if (fstat(fd,&sb) != -1) {
        if (sb.st_size > (off_t)content_size) { 原来的文件大于当前将写入的文件
            ci = sdsgrowzero(ci,sb.st_size); 增长空间到sb.st_size长度
            memset(ci+content_size,'\n',sb.st_size-content_size); 将后面的字符全部用\n覆盖
        }
    }
    if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; 如果文件没有写完返回错误
    if (do_fsync) {
        server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
        fsync(fd); 将文件写入到磁盘
    }

    /* Truncate the file if needed to remove the final \n padding that
     * is just garbage. */
     如果有需要截断文件,因为后面填充的\n都时垃圾字符
     content_size != sdslen(ci) 这个条件判断ci的长度是否有变动,因为通过函数sdsgrowzero,可能会有变动。
     在有变动的情况下,将后面的垃圾填充字符\n全部去掉
    if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
        /* ftruncate() failing is not a critical error. */ 去掉垃圾字符的函数ftruncate即使失败也无所谓
    }
    close(fd);
    sdsfree(ci);
    return 0;

err:
    if (fd != -1) close(fd); 存在打开文件,需要关闭释放资源,否则造成内存泄漏
    sdsfree(ci);
    return -1;
}

保存文件配置
void clusterSaveConfigOrDie(int do_fsync) {
    if (clusterSaveConfig(do_fsync) == -1) {
        serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
        exit(1);
    }
}
*********************************************************************************
/* Generate a csv-alike representation of the nodes we are aware of,
 * including the "myself" node, and return an SDS string containing the
 * representation (it is up to the caller to free it).
生成所有我们知道节点的类似cvs格式的表示,包括自身节点,返回一个包含表示的SDS字符串(该字符串由调用者释放)
 * All the nodes matching at least one of the node flags specified in
 * "filter" are excluded from the output, so using zero as a filter will
 * include all the known nodes in the representation, including nodes in
 * the HANDSHAKE state.
所有的节点只要有一个标志确定在过滤器中,那么就会别排除在外,
因此使用标志0作为过滤器就可以在表示中包括所有已知节点,包括处于HANDSHAKE状态的节点
 * The representation obtained using this function is used for the output
 * of the CLUSTER NODES function, and as format for the cluster
 * configuration file (nodes.conf) for a given node. */
用这个函数获取的表示,可以被命令CLUSTER NODES用来作为输出,同给定节点的集群配置文件(nodes.conf)一样格式
sds clusterGenNodesDescription(int filter) {
    sds ci = sdsempty(), ni;
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) { 遍历节点
        clusterNode *node = dictGetVal(de);

        if (node->flags & filter) continue; 如果被过滤,就不包括
        ni = clusterGenNodeDescription(node); 获取节点描述信息
        ci = sdscatsds(ci,ni);
        sdsfree(ni); 
        ci = sdscatlen(ci,"\n",1);
    }
    dictReleaseIterator(di);
    return ci;
}
*********************************************************************************
/* Generate a csv-alike representation of the specified cluster node.
 * See clusterGenNodesDescription() top comment for more information.
生成特定集群节点的类似cvs格式的表示,可以参看函数clusterGenNodesDescription上面的注释获取更多信息
 * The function returns the string representation as an SDS string. */
这个函数返回字符串表示当做一个SDS字符串
sds clusterGenNodeDescription(clusterNode *node) {
    int j, start;
    sds ci;

    /* Node coordinates */ 节点位置
    ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
        node->name, 名字
        node->ip,  ip
        node->port, 服务端口
        node->cport);  通信端口

    /* Flags */ 标志
    ci = representClusterNodeFlags(ci, node->flags); 拼接标志

    /* Slave of... or just "-" */  是谁的从机 或者 只是"-"
    if (node->slaveof)  是从机
        ci = sdscatprintf(ci," %.40s ",node->slaveof->name); 拼接主机名
    else
        ci = sdscatlen(ci," - ",3); 否则用" - "代替

    /* Latency from the POV of this node, config epoch, link status */
    此节点视觉下的的延迟、配置纪元、链路状态
    ci = sdscatprintf(ci,"%lld %lld %llu %s",
        (long long) node->ping_sent,  ping命令发送时间
        (long long) node->pong_received, 接受pong时间
        (unsigned long long) node->configEpoch, 当前纪元
        (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
                    "connected" : "disconnected");  链路状态

    /* Slots served by this instance */ 由这个节点服务的槽
    start = -1;
    for (j = 0; j < CLUSTER_SLOTS; j++) { 遍历全部的槽
        int bit;

        if ((bit = clusterNodeGetSlotBit(node,j)) != 0) { 开始属于本节点的槽
            if (start == -1) start = j; 
        }
        if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) { 开始不为-1 并且 (该槽不属于本节点 或者 已经是最后一个槽了)
            if (bit && j == CLUSTER_SLOTS-1) j++; 槽属于本节点,但是是最后一个槽

            if (start == j-1) { 如果只有一个槽
                ci = sdscatprintf(ci," %d",start);
            } else { 有连续的多个槽
                ci = sdscatprintf(ci," %d-%d",start,j-1);
            }
            start = -1;
        }
    }

    /* Just for MYSELF node we also dump info about slots that
     * we are migrating to other instances or importing from other
     * instances. */
对于本身节点,我们也转储关于迁移到其它节点或者从其它节点导入的槽信息
    if (node->flags & CLUSTER_NODE_MYSELF) { 是否是本身节点
        for (j = 0; j < CLUSTER_SLOTS; j++) { 遍历所有的槽
            if (server.cluster->migrating_slots_to[j]) { 存在迁移到其它节点的槽
                ci = sdscatprintf(ci," [%d->-%.40s]",j,
                    server.cluster->migrating_slots_to[j]->name);
            } else if (server.cluster->importing_slots_from[j]) { 存在从其它节点迁移过来的槽
                ci = sdscatprintf(ci," [%d-<-%.40s]",j,
                    server.cluster->importing_slots_from[j]->name);
            }
        }
    }
    return ci;
}
*********************************************************************************
static struct redisNodeFlags redisNodeFlagsTable[] = {
    {CLUSTER_NODE_MYSELF,       "myself,"},
    {CLUSTER_NODE_MASTER,       "master,"},
    {CLUSTER_NODE_SLAVE,        "slave,"},
    {CLUSTER_NODE_PFAIL,        "fail?,"},
    {CLUSTER_NODE_FAIL,         "fail,"},
    {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
    {CLUSTER_NODE_NOADDR,       "noaddr,"},
    {CLUSTER_NODE_NOFAILOVER,   "nofailover,"}
};

/* Concatenate the comma separated list of node flags to the given SDS
 * string 'ci'. */
拼接由逗号分隔的节点标志列表,赋值给字符串变量ci
sds representClusterNodeFlags(sds ci, uint16_t flags) {
    size_t orig_len = sdslen(ci);
    int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
    for (i = 0; i < size; i++) { 遍历节点标志表
        struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
        if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name); 如果有该标志,就拼接
    }
    /* If no flag was added, add the "noflags" special flag. */
    如果没有标志被添加,那么添加一个特殊的"noflags"
    if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,"); 
    sdsIncrLen(ci,-1); /* Remove trailing comma. */ 移除结尾的逗号,注意逗号是带在标志上的
    return ci;
}
*********************************************************************************
/* Return the slot bit from the cluster node structure. */
从集群的节点结构中获取槽的比特位
int clusterNodeGetSlotBit(clusterNode *n, int slot) {
    return bitmapTestBit(n->slots,slot);
}
*********************************************************************************
/* Initialize a set of file descriptors to listen to the specified 'port'
 * binding the addresses specified in the Redis server configuration.
初始化一组文件描述符,用来监听配置在redis服务中特定的绑定端口
 * The listening file descriptors are stored in the integer array 'fds'
 * and their number is set in '*count'.
监听的文件描述符被存储在整数数组fds中,他们的数量被保存在*count中
 * The addresses to bind are specified in the global server.bindaddr array
 * and their number is server.bindaddr_count. If the server configuration
 * contains no specific addresses to bind, this function will try to
 * bind * (all addresses) for both the IPv4 and IPv6 protocols.
被绑定的地址由全局数组变量server.bindaddr确定,它们的数量为server.bindaddr_count。
如果服务配置没有包含特定绑定的地址,这个函数将尝试绑定所有的地址,而且同时包括ipv4和ipv6两种协议
 * On success the function returns C_OK.
成功的情况下返回C_OK.
 * On error the function returns C_ERR. For the function to be on
 * error, at least one of the server.bindaddr addresses was
 * impossible to bind, or no bind addresses were specified in the server
 * configuration but the function is not able to bind * for at least
 * one of the IPv4 or IPv6 protocols. */
错误的情况下返回C_ERR。对于函数出现错误,至少其中一个服务地址不能被绑定,
或者没有指定的绑定地址同时函数不能绑定ipv4或者ipv6中的任何一个协议的所有地址
int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */ 如果没有指定绑定地址,强制绑定0.0.0.0,当j=0,总是会进入循环
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) { 没有绑定地址
            int unsupported = 0;
            /* Bind * for both IPv6 and IPv4, we enter here only if
             * server.bindaddr_count == 0. */ 
        同时绑定ipv4和ipv6的所有地址,我们能进入这里是因为server.bindaddr_count == 0
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                server.tcp_backlog); 创建ipv6的监听
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]); 设置非阻塞
                (*count)++;
            } else if (errno == EAFNOSUPPORT) { 协议不支持
                unsupported++;
                serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
            }

            if (*count == 1 || unsupported) {
                /* Bind the IPv4 address as well. */ 绑定ipv4的地址
                fds[*count] = anetTcpServer(server.neterr,port,NULL,
                    server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                    serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
                }
            }
            /* Exit the loop if we were able to bind * on IPv4 and IPv6,
             * otherwise fds[*count] will be ANET_ERR and we'll print an
             * error and return to the caller with an error. */
             如果我们绑定了 IPv4 或者 IPv6,就退出循环,否则错误信息会存储到fds[*count],
             我们将打印出错误信息同时返回给调用者一个错误
             
             上面如果绑定成功或者协议不支持,这两种情况,都会加1,从而这里的判断为真,退出循环,
             如果绑定不成功,但是错误不是协议不支持(那就有可能是其它情况,这个时候循环不退出,继续尝试下一次)
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) { 存在:为ipv6格式
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        } else { 不存在为ipv4格式
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) { 绑定监听端口存在错误
            serverLog(LL_WARNING,
                "Could not create server TCP listening socket %s:%d: %s",
                server.bindaddr[j] ? server.bindaddr[j] : "*",
                port, server.neterr);
                if (errno == ENOPROTOOPT     || errno == EPROTONOSUPPORT ||
                    errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
                    errno == EAFNOSUPPORT    || errno == EADDRNOTAVAIL)
                    continue; 这些情况下重试
            return C_ERR;
        }
        anetNonBlock(NULL,fds[*count]); 设置非阻塞模式
        (*count)++;
    }
    return C_OK;
}
*********************************************************************************
回调函数
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    /* If the server is starting up, don't accept cluster connections:
     * UPDATE messages may interact with the database content. */
     如果服务正在启动,不接受集群的连接,更新消息可能和数据库的内容有关
    if (server.masterhost == NULL && server.loading) return;

    while(max--) {  每次调用处理的最大集群连接数量
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }

        connection *conn = server.tls_cluster ? connCreateAcceptedTLS(cfd,1) : connCreateAcceptedSocket(cfd);
        根据是否需要ssl创建接受连接
        connNonBlock(conn);设置非阻塞模式
        connEnableTcpNoDelay(conn);设置不延迟模式,用小数据包尽快回复

        /* Use non-blocking I/O for cluster messages. */ 为集群的通信使用非阻塞的I/O
        serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);

        /* Accept the connection now.  connAccept() may call our handler directly
         * or schedule it for later depending on connection implementation.
         */现在接受连接。函数connacpt可以直接调用我们的处理程序,也可以根据连接实现将其安排到后面处理
        if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
            if (connGetState(conn) == CONN_STATE_ERROR) 判断连接状态是否有问题
                serverLog(LL_VERBOSE,
                        "Error accepting cluster node connection: %s",
                        connGetLastError(conn));
            connClose(conn); 有问题释放资源
            return;
        }
    }
}
*********************************************************************************
static void clusterConnAcceptHandler(connection *conn) {
    clusterLink *link;

    if (connGetState(conn) != CONN_STATE_CONNECTED) { 如果连接状态有问题
        serverLog(LL_VERBOSE,
                "Error accepting cluster node connection: %s", connGetLastError(conn));
        connClose(conn);
        return;
    }

    /* Create a link object we use to handle the connection.
     * It gets passed to the readable handler when data is available.
     * Initiallly the link->node pointer is set to NULL as we don't know
     * which node is, but the right node is references once we know the
     * node identity. */
    我们创建一个关联的对象用来处理连接。当数据可用时,它被传递给可读处理程序。
    开始的时候指针link->node被设置为空,因为我们不知道是哪个节点,
    但是一旦我们确定知道是哪个节点,就关联到这个节点
    link = createClusterLink(NULL); 创建空关联对象
    link->conn = conn; 处理连接
    connSetPrivateData(conn, link); 将对象传递给连接做数据处理

    /* Register read handler */注册读处理器
    connSetReadHandler(conn, clusterReadHandler);
}
*********************************************************************************
创建空关联对象
clusterLink *createClusterLink(clusterNode *node) {
    clusterLink *link = zmalloc(sizeof(*link));
    link->ctime = mstime();
    link->sndbuf = sdsempty();
    link->rcvbuf = sdsempty();
    link->node = node;
    link->conn = NULL;
    return link;
}
*********************************************************************************
/* Read data. Try to read the first field of the header first to check the
 * full length of the packet. When a whole packet is in memory this function
 * will call the function to process the packet. And so forth. */
读取数据。率先尝试通过读取头部第一个字段来检查这个数据报的全部长度。
当整个数据包在内存时候,这个函数调用函数处理这个报。等等。
void clusterReadHandler(connection *conn) {
    clusterMsg buf[1];
    ssize_t nread;
    clusterMsg *hdr;
    clusterLink *link = connGetPrivateData(conn);返回要处理的数据
    unsigned int readlen, rcvbuflen;

    while(1) { /* Read as long as there is data to read. */ 只要还有数据,就一直读取
        rcvbuflen = sdslen(link->rcvbuf);
        if (rcvbuflen < 8) {
            /* First, obtain the first 8 bytes to get the full message
             * length. */ 首先,获取开始的8字节数据 来得到整个消息的长度
            readlen = 8 - rcvbuflen;
        } else {
            /* Finally read the full message. */ 最后读取所有的消息
            hdr = (clusterMsg*) link->rcvbuf;
            if (rcvbuflen == 8) { 有8个字节的数据
                /* Perform some sanity check on the message signature
                 * and length. */ 执行对消息的签名和长度进行合法性检查
#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData))
                if (memcmp(hdr->sig,"RCmb",4) != 0 || 是否以RCmb开头
                    ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) 长度至少要能覆盖结构体clusterMsg的长度
                {
                    serverLog(LL_WARNING,
                        "Bad message length or signature received "
                        "from Cluster bus.");
                    handleLinkIOError(link);
                    return;
                }
            }
            readlen = ntohl(hdr->totlen) - rcvbuflen; 剩余要读取的字节数
            if (readlen > sizeof(buf)) readlen = sizeof(buf); 超过了缓存的容量,只读取缓存大小的字节数
        }

        nread = connRead(conn,buf,readlen); 读取
        if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */ 没有数据可读

        if (nread <= 0) { 读取发生错误
            /* I/O error... */
            serverLog(LL_DEBUG,"I/O error reading from node link: %s",
                (nread == 0) ? "connection closed" : connGetLastError(conn));
            handleLinkIOError(link); 释放连接资源
            return;
        } else {
            /* Read data and recast the pointer to the new buffer. */
            读取数据并且将指针强制转化指向新的缓存
            link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
            hdr = (clusterMsg*) link->rcvbuf; 强转结构体
            rcvbuflen += nread;  加上新读取的字节数
        }

        /* Total length obtained? Process this packet. */
        已经获取整个报文长度,处理这个报文
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            if (clusterProcessPacket(link)) {  这里有个长函数,大概近500行,处理报文,我们接下来单独阅读
                sdsfree(link->rcvbuf);
                link->rcvbuf = sdsempty();
            } else {
                return; /* Link no longer valid. */ 连接不再有效
            }
        }
    }
}
*********************************************************************************
/* This function is called when we detect the link with this node is lost.
   We set the node as no longer connected. The Cluster Cron will detect
   this connection and will try to get it connected again.
当我们检测到节点的连接丢失了这个函数就会被调用。我们设置这个节点不再连接(即断开)。
集群的定时任务会检测到这个连接的信息,从而尝试再次连接
   Instead if the node is a temporary node used to accept a query, we
   completely free the node on error. */
相反如果该节点是用于接受查询的临时节点,则我们会在出错时完全释放该节点
void handleLinkIOError(clusterLink *link) {
    freeClusterLink(link);
}
*********************************************************************************
/* Free a cluster link, but does not free the associated node of course.
 * This function will just make sure that the original node associated
 * with this link will have the 'link' field set to NULL. */
释放集群的连接,但是不能释放关联的节点。这个函数只是确保与此连接关联的原始节点的连接字段被设置为空
void freeClusterLink(clusterLink *link) {
    if (link->conn) {
        connClose(link->conn);
        link->conn = NULL;
    }
    sdsfree(link->sndbuf);
    sdsfree(link->rcvbuf);
    if (link->node)
        link->node->link = NULL;
    zfree(link);
}
*********************************************************************************
/* Reset the manual failover state. This works for both masters and slavesa
 * as all the state about manual failover is cleared.
重置手工故障转移状态。这个函数可以为主机和从机工作,所有关于手工故障转移的状态会被清除
 * The function can be used both to initialize the manual failover state at
 * startup or to abort a manual failover in progress. */
这个函数可以用于在启动截断或者手工故障转移过程终止 初始化手工故障转移状态
void resetManualFailover(void) {
    if (server.cluster->mf_end && clientsArePaused()) { 存在被暂停的客户端
        server.clients_pause_end_time = 0;
        clientsArePaused(); /* Just use the side effect of the function. */ 利用该函数的副效应,暂停时间到会被清理
    }
    server.cluster->mf_end = 0; /* No manual failover in progress. */ 过程中不能再有手工故障转移
    server.cluster->mf_can_start = 0;
    server.cluster->mf_slave = NULL;
    server.cluster->mf_master_offset = 0;
}
*********************************************************************************
/* Some flags (currently just the NOFAILOVER flag) may need to be updated
 * in the "myself" node based on the current configuration of the node,
 * that may change at runtime via CONFIG SET. This function changes the
 * set of flags in myself->flags accordingly. */
可能需要根据节点的当前配置在"myself"节点中更新某些标志(现在只有NOFAILOVER标志),
这些标志可能在运行时通过命令CONFIG SET更改。这个函数会相应地更改myself->flags中的标志集
void clusterUpdateMyselfFlags(void) {
    int oldflags = myself->flags;
    int nofailover = server.cluster_slave_no_failover ?
                     CLUSTER_NODE_NOFAILOVER : 0;
    myself->flags &= ~CLUSTER_NODE_NOFAILOVER; 去掉默认的标志
    myself->flags |= nofailover; 重新设置标志
    if (myself->flags != oldflags) { 标志有变化
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE); 设置将要做的事情的标志
    }
}

void clusterDoBeforeSleep(int flags) {
    server.cluster->todo_before_sleep |= flags;
}
*********************************************************************************