
/* When this function is called, there is a packet to process starting
 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
 * function should just handle the higher level stuff of processing the
 * packet, modifying the cluster state if needed.
 * The function returns 1 if the link is still valid after the packet
 * was processed, otherwise 0 if the link was freed since the packet
 * processing lead to some inconsistency error (for instance a PONG
 * received from the wrong sender ID). */

typedef struct {
    char sig[4];        /* Signature "RCmb" (Redis Cluster message bus). */ 开头标记RCmb (Redis集群消息总线)
    uint32_t totlen;    /* Total length of this message */  消息的总长度
    uint16_t ver;       /* Protocol version, currently set to 1. */ 协议版本,当前设置为1
    uint16_t port;      /* TCP base port number. */ TCP连接的端口号
    uint16_t type;      /* Message type */ 消息类型
    uint16_t count;     /* Only used for some kind of messages. */ 只对部分类型的消息使用
    uint64_t currentEpoch;  /* The epoch accordingly to the sending node. */ 对应发送节点的纪元
    uint64_t configEpoch;   /* The config epoch if it's a master, or the last
                               epoch advertised by its master if it is a
                               slave. */如果是主机就是配置的纪元,如果是从机,那就是主机建议设置的最后纪元
    uint64_t offset;    /* Master replication offset if node is a master or  如果是主机就是保存的复制偏移数据(等待复制)
                           processed replication offset if node is a slave. */ 如果是从机,那就是已处理的复制偏移量
    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */  发送节点的名字
    unsigned char myslots[CLUSTER_SLOTS/8];具体的槽
    char slaveof[CLUSTER_NAMELEN]; 从机名
    char myip[NET_IP_STR_LEN];    /* Sender IP, if not all zeroed. */ 发送者的IP,如果不全是0
    char notused1[34];  /* 34 bytes reserved for future usage. */ 保留的34个字节,为将来做准备
    uint16_t cport;      /* Sender TCP cluster bus port */ 发送者的TCP集群总线端口
    uint16_t flags;      /* Sender node flags */ 发送者的节点标志
    unsigned char state; /* Cluster state from the POV of the sender */ 从发送者角度看的集群状态
    unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */ 消息的标志:
    union clusterMsgData data;  具体的集群消息数据
} clusterMsg;

/* Message flags better specify the packet content or are used to
 * provide some information about the node state. */
消息标志用于更好的确定报文内容或者 被用来提供节点状态的一些信息
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */  为了手工故障转移停止主机
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if 给出确认回复给认证请求,即使主机在线
                                            master is up. */ 

int clusterProcessPacket(clusterLink *link) {
    clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
    uint32_t totlen = ntohl(hdr->totlen); 获取总长度
    uint16_t type = ntohs(hdr->type); 获取消息类型
    mstime_t now = mstime(); 分析报文时间

    if (type < CLUSTERMSG_TYPE_COUNT)  消息的种类是否是未知的,不能处理不知道的消息种类
    serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
        type, (unsigned long) totlen);

    /* Perform sanity checks */ 执行正常检查
    if (totlen < 16) return 1; /* At least signature, version, totlen, count. */ 总长度至少16个字节,否则格式错误
    if (totlen > sdslen(link->rcvbuf)) return 1; 总长度比缓存要大,也是格式错误

    if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {  检查版本
        /* Can't handle messages of different versions. */ 不能处理不同版本的消息
        return 1;

    uint16_t flags = ntohs(hdr->flags); 获取发送者的节点标志
    uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
    clusterNode *sender;

        type == CLUSTERMSG_TYPE_MEET) 是PING PONG  MEET 三种消息的一种
        uint16_t count = ntohs(hdr->count); 获取计数信息
        uint32_t explen; /* expected length of this packet */ 报文的期望长度

        explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 总的结构体大小-数据联合体的大小
        explen += (sizeof(clusterMsgDataGossip)*count); 实际的联合体数据长度
        if (totlen != explen) return 1; 总长度和期望长度不等,不处理返回
    } else if (type == CLUSTERMSG_TYPE_FAIL) {  失败消息的期望长度
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 

        explen += sizeof(clusterMsgDataFail);
        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {  发布的消息
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        explen += sizeof(clusterMsgDataPublish) -
                8 +  8字节无效填充数据
                ntohl(hdr->data.publish.msg.channel_len) + 渠道长度
                ntohl(hdr->data.publish.msg.message_len); 消息长度
        if (totlen != explen) return 1;
               type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
               type == CLUSTERMSG_TYPE_MFSTART)
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        explen += sizeof(clusterMsgDataUpdate);
        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_MODULE) {
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        explen += sizeof(clusterMsgDataPublish) -
                3 + ntohl(hdr->data.module.msg.len);
        if (totlen != explen) return 1;

    /* Check if the sender is a known node. Note that for incoming connections
     * we don't store link->node information, but resolve the node by the
     * ID in the header each time in the current implementation. */
    sender = clusterLookupNode(hdr->sender); 通过名字查找发送节点

    /* Update the last time we saw any data from this node. We
     * use this in order to avoid detecting a timeout from a node that
     * is just sending a lot of data in the cluster bus, for instance
     * because of Pub/Sub. */
    if (sender) sender->data_received = now; 更新收到该节点数据的时间,防止误判该节点超时

    if (sender && !nodeInHandshake(sender)) { 存在发送者 并且 不处于握手状态
        /* Update our curretEpoch if we see a newer epoch in the cluster. */ 
        senderCurrentEpoch = ntohu64(hdr->currentEpoch); 发送者当前纪元
        senderConfigEpoch = ntohu64(hdr->configEpoch); 发送者的配置纪元
        if (senderCurrentEpoch > server.cluster->currentEpoch)
            server.cluster->currentEpoch = senderCurrentEpoch;
        /* Update the sender configEpoch if it is publishing a newer one. */
        if (senderConfigEpoch > sender->configEpoch) {
            sender->configEpoch = senderConfigEpoch;
                                 CLUSTER_TODO_FSYNC_CONFIG); 设置要做事情的标志
        /* Update the replication offset info for this node. */
        sender->repl_offset = ntohu64(hdr->offset);发送者的偏移量
        sender->repl_offset_time = now; 复制偏移量更新时间
        /* If we are a slave performing a manual failover and our master
         * sent its offset while already paused, populate the MF state. */
如果我们是一台正在执行故障转移的从机,而且这个是我们已经停止的主机发送它的偏移量,填充MF(Manual Failover)的状态
        if (server.cluster->mf_end &&  限制时间不为0
            nodeIsSlave(myself) &&   节点是从机
            myself->slaveof == sender &&  主机是发送者
            hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && 主机已经停止
            server.cluster->mf_master_offset == 0) 偏移数据还没有开始接收
            server.cluster->mf_master_offset = sender->repl_offset;设置偏移量
                "Received replication offset for paused "
                "master manual failover: %lld",

    /* Initial processing of PING and MEET requests replying with a PONG. */
        serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);

        /* We use incoming MEET messages in order to set the address
         * for 'myself', since only other cluster nodes will send us
         * MEET messages on handshakes, when the cluster joins, or
         * later if we changed address, and those nodes will use our
         * official address to connect to us. So by obtaining this address
         * from the socket is a simple way to discover / update our own
         * address in the cluster without it being hardcoded in the config.
         * However if we don't have an address at all, we update the address
         * even with a normal PING packet. If it's wrong it will be fixed
         * by MEET later. */
        if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
            server.cluster_announce_ip == NULL)
            char ip[NET_IP_STR_LEN];

            if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
                strcmp(ip,myself->ip)) 通过连接获取ip
                serverLog(LL_WARNING,"IP address for this node updated to %s",

        /* Add this node if it is new for us and the msg type is MEET.
         * In this stage we don't try to add the node with the right
         * flags, slaveof pointer, and so forth, as this details will be
         * resolved when we'll receive PONGs from the node. */
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;

            node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE); 创建新节点
            nodeIp2String(node->ip,link,hdr->myip);  将节点ip转化为字符串
            node->port = ntohs(hdr->port);
            node->cport = ntohs(hdr->cport);
            clusterAddNode(node);  添加节点到哈希表中

        /* If this is a MEET packet from an unknown node, we still process
         * the gossip section here since we have to trust the sender because
         * of the message type. */
        if (!sender && type == CLUSTERMSG_TYPE_MEET)

        /* Anyway reply with a PONG */
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG); 发送ping命令。该函数放到后面定时任务再讲

    /* PING, PONG, MEET: process config information. */ 对名利PING PONG MEET : 处理配置信息
        type == CLUSTERMSG_TYPE_MEET)
        serverLog(LL_DEBUG,"%s packet received: %p",
            type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
        if (link->node) { 如果节点存在
            if (nodeInHandshake(link->node)) {  处于握手阶段
                /* If we already have this node, try to change the
                 * IP/port of the node with the new one. */
                if (sender) { 如果发送者不为空
                        "Handshake: we already know node %.40s, "
                        "updating the address if needed.", sender->name);
                    if (nodeUpdateAddressIfNeeded(sender,link,hdr)) 有必要的话更新节点的地址信息(ip/port)
                    /* Free this node as we already have it. This will
                     * cause the link to be freed as well. */
                    clusterDelNode(link->node); 删除节点
                    return 0;

                /* First thing to do is replacing the random name with the
                 * right node name if this was a handshake stage. */
                clusterRenameNode(link->node, hdr->sender); 替换随机名字
                serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
                link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
                link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
            } else if (memcmp(link->node->name,hdr->sender,
                        CLUSTER_NAMELEN) != 0) 如果节点关联名和发送者名字不一样
                /* If the reply has a non matching node ID we
                 * disconnect this node and set it as not having an associated
                 * address. */ 如果回复拥有一个不匹配的节点ID,我们断开这个节点并且设置该节点没有关联的地址
                serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
                link->node->flags |= CLUSTER_NODE_NOADDR;
                link->node->ip[0] = '\0';
                link->node->port = 0;
                link->node->cport = 0;
                return 0;

        /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
         * announced. This is a dynamic flag that we receive from the
         * sender, and the latest status must be trusted. We need it to
         * be propagated because the slave ranking used to understand the
         * delay of each slave in the voting process, needs to know
         * what are the instances really competing. */
        if (sender) { 存在发送者
            int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
            sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
            sender->flags |= nofailover;

        /* Update the node address if it changed. */ 如果地址变化了更新节点地址
        if (sender && type == CLUSTERMSG_TYPE_PING &&
            !nodeInHandshake(sender) &&   不处于握手状态
            nodeUpdateAddressIfNeeded(sender,link,hdr))  更新节点地址

        /* Update our info about the node */ 更新我们对于节点的信息
        if (link->node && type == CLUSTERMSG_TYPE_PONG) { 如果收到了回复信息pong
            link->node->pong_received = now; 更新收到时间
            link->node->ping_sent = 0; 最近发送过ping的时间清零,因为已经收到了回复

            /* The PFAIL condition can be reversed without external
             * help if it is momentary (that is, if it does not
             * turn into a FAIL state).
             * The FAIL condition is also reversible under specific
             * conditions detected by clearNodeFailureIfNeeded(). */
            if (nodeTimedOut(link->node)) {  如果超时
                link->node->flags &= ~CLUSTER_NODE_PFAIL;
            } else if (nodeFailed(link->node)) { 节点下线
                clearNodeFailureIfNeeded(link->node); 如果有需要清除接地那的下线状态

        /* Check for role switch: slave -> master or master -> slave. */
        检测角色变换 : 从机->主机 或者 主机->从机
        if (sender) {发送者不为空
            if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
                sizeof(hdr->slaveof))) 如果存在主机,那么和CLUSTER_NODE_NULL_NAME比较就是不为0,!就是为0
                /* Node is a master. */ 不存在该节点的主机,那么该节点就是主机
                clusterSetNodeAsMaster(sender); 设置发送节点为主机
            } else {
                /* Node is a slave. */ 节点是从机
                clusterNode *master = clusterLookupNode(hdr->slaveof); 查找主机

                if (nodeIsMaster(sender)) { 如果发送节点是主机
                    /* Master turned into a slave! Reconfigure the node. */
                    clusterDelNodeSlots(sender); 删除节点的服务的槽
                    sender->flags &= ~(CLUSTER_NODE_MASTER|
                                       CLUSTER_NODE_MIGRATE_TO); 设置迁移标志
                    sender->flags |= CLUSTER_NODE_SLAVE;

                    /* Update config and state. */ 更新配置和状态

                /* Master node changed for this slave? */ 
                if (master && sender->slaveof != master) { 两次的主节点不一致
                    if (sender->slaveof) 主节点非空
                        clusterNodeRemoveSlave(sender->slaveof,sender); 将从节点从主节点从机列表中删除
                    clusterNodeAddSlave(master,sender); 将从节点添加到主节点master中
                    sender->slaveof = master; 设置从节点的新主节点

                    /* Update config. */  更新配置

        /* Update our info about served slots.
         * Note: this MUST happen after we update the master/slave state
         * so that CLUSTER_NODE_MASTER flag will be set. */
        /* Many checks are only needed if the set of served slots this
         * instance claims is different compared to the set of slots we have
         * for it. Check this ASAP to avoid other computational expansive
         * checks later. */
        clusterNode *sender_master = NULL; /* Sender or its master if slave. */ 发送者是主机就是本身 或者 从机时为它的主机
        int dirty_slots = 0; /* Sender claimed slots don't match my view? */ 发送者申明的槽和我本地保存的不匹配

        if (sender) {
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof; 获取主节点
            if (sender_master) {主机点存在
                dirty_slots = memcmp(sender_master->slots,  本地保存的节点槽信息 和 收到信息的节点槽信息不一致情况
                        hdr->myslots,sizeof(hdr->myslots)) != 0;

        /* 1) If the sender of the message is a master, and we detected that
         *    the set of slots it claims changed, scan the slots to see if we
         *    need to update our configuration. */
        if (sender && nodeIsMaster(sender) && dirty_slots)
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots); 更新配置的槽信息

        /* 2) We also check for the reverse condition, that is, the sender
         *    claims to serve slots we know are served by a master with a
         *    greater configEpoch. If this happens we inform the sender.
         * This is useful because sometimes after a partition heals, a
         * reappearing master may be the last one to claim a given set of
         * hash slots, but with a configuration that other instances know to
         * be deprecated. Example:
         * A and B are master and slave for slots 1,2,3.
         * A is partitioned away, B gets promoted.
         * B is partitioned away, and A returns available.
A和B是主节点和从节点,服务槽1,2,3. A被隔离了,B得到了提升(为主机)。
         * Usually B would PING A publishing its set of served slots and its
         * configEpoch, but because of the partition B can't inform A of the
         * new configuration, so other nodes that have an updated table must
         * do it. In this way A will stop to act as a master (or can try to
         * failover if there are the conditions to win the election). */
因此,具有更新表的其他节点必须这样做(这样做指的是 通知A最新的配置)。
        if (sender && dirty_slots) { 有不匹配的槽
            int j;

            for (j = 0; j < CLUSTER_SLOTS; j++) { 遍历所有槽
                if (bitmapTestBit(hdr->myslots,j)) { 是服务的槽
                    if (server.cluster->slots[j] == sender || 属于发送者
                        server.cluster->slots[j] == NULL) continue; 或者空 继续
                    if (server.cluster->slots[j]->configEpoch >  
                        senderConfigEpoch)  槽的属于者不是发送者 也不空,即属于另外的主节点服务,需要比较纪元
                            "Node %.40s has old slots configuration, sending "
                            "an UPDATE message about %.40s",
                                sender->name, server.cluster->slots[j]->name);
                            server.cluster->slots[j]); 本地纪元大,需要通知发送者更新信息

                        /* TODO: instead of exiting the loop send every other
                         * UPDATE packet for other nodes that are the new owner
                         * of sender's slots. */

        /* If our config epoch collides with the sender's try to fix
         * the problem. */
        if (sender &&
            nodeIsMaster(myself) && nodeIsMaster(sender) &&
            senderConfigEpoch == myself->configEpoch) 两者纪元一致
            clusterHandleConfigEpochCollision(sender); 解决纪元相同冲突问题,名字字典序更小的就有机会获取更大纪元

        /* Get info from the gossip section */ 从gossip中获取信息
        if (sender) clusterProcessGossipSection(hdr,link);
    } else if (type == CLUSTERMSG_TYPE_FAIL) { 下线的情况
        clusterNode *failing;

        if (sender) {
            failing = clusterLookupNode(hdr->; 查找节点
            if (failing &&
                !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF))) 失败需要设置标志
                    "FAIL message received from %.40s about %.40s",
                    hdr->sender, hdr->;
                failing->flags |= CLUSTER_NODE_FAIL;
                failing->fail_time = now;
                failing->flags &= ~CLUSTER_NODE_PFAIL;
        } else {
                "Ignoring FAIL message from unknown node %.40s about %.40s",
                hdr->sender, hdr->;
    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
        robj *channel, *message;
        uint32_t channel_len, message_len;

        /* Don't bother creating useless objects if there are no
         * Pub/Sub subscribers. */
        if (dictSize(server.pubsub_channels) ||
            channel_len = ntohl(hdr->data.publish.msg.channel_len);
            message_len = ntohl(hdr->data.publish.msg.message_len);
            channel = createStringObject(
            message = createStringObject(
            pubsubPublishMessage(channel,message); 给订阅者推送信息
        if (!sender) return 1;  /* We don't know that node. */ 我们不知道这个节点
        clusterSendFailoverAuthIfNeeded(sender,hdr); 发送失败确认信息
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
        if (!sender) return 1;  /* We don't know that node. */
        /* We consider this vote only if the sender is a master serving
         * a non zero number of slots, and its currentEpoch is greater or
         * equal to epoch where this node started the election. */
        if (nodeIsMaster(sender) && sender->numslots > 0 &&  主节点 服务槽数不为0
            senderCurrentEpoch >= server.cluster->failover_auth_epoch) 发送者当前纪元 大于等于 故障转移确认纪元
            server.cluster->failover_auth_count++; 获得投票数加1
            /* Maybe we reached a quorum here, set a flag to make sure
             * we check ASAP. */ 如果我们得到了法定数量,设置一个标志,确认我们尽快检查
    } else if (type == CLUSTERMSG_TYPE_MFSTART) { 故障转移开始
        /* This message is acceptable only if I'm a master and the sender
         * is one of my slaves. */这个消息被接收当且仅当我是主节点并且发送者是我的一个从节点
        if (!sender || sender->slaveof != myself) return 1;
        /* Manual failover requested from slaves. Initialize the state
         * accordingly. */从节点发送的手工故障转移请求,相应的初始化状态
        resetManualFailover();  重置手工故障转移
        server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; 设置时间限制
        server.cluster->mf_slave = sender; 执行手工转移的节点
        pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); 设置客户端不可用时间
        serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        clusterNode *n; /* The node the update is about. */  更新所涉及的节点
        uint64_t reportedConfigEpoch =
                    ntohu64(hdr->data.update.nodecfg.configEpoch); 当前纪元

        if (!sender) return 1;  /* We don't know the sender. */ 不认识发送者
        n = clusterLookupNode(hdr->data.update.nodecfg.nodename); 查找节点
        if (!n) return 1;   /* We don't know the reported node. */ 不知道报告节点
        if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */ 我们当前配置更新,没有必要更新

        /* If in our current config the node is a slave, set it as a master. */
        if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);

        /* Update the node's configEpoch. */ 更新节点的纪元配置
        n->configEpoch = reportedConfigEpoch;  设置更高的纪元

        /* Check the bitmap of served slots and update our
         * config accordingly. */ 通过bitmap检查服务的槽,对应的更新我们的配置
    } else if (type == CLUSTERMSG_TYPE_MODULE) {
        if (!sender) return 1;  /* Protect the module from unknown nodes. */
        /* We need to route this message back to the right module subscribed
         * for the right message type. */
        uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */ 字节序安全的ID
        uint32_t len = ntohl(hdr->data.module.msg.len);
        uint8_t type = hdr->data.module.msg.type;
        unsigned char *payload = hdr->data.module.msg.bulk_data;
        moduleCallClusterReceivers(sender->name,module_id,type,payload,len); 通知订阅了该消息的模块
    } else {
        serverLog(LL_WARNING,"Received unknown packet type: %d", type);
    return 1;

/* Process the gossip section of PING or PONG packets.
 * Note that this function assumes that the packet is already sanity-checked
 * by the caller, not in the content of the gossip section, but in the
 * length. */
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->; 获取数据部分
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); 查找节点

    while(count--) {
        uint16_t flags = ntohs(g->flags);
        clusterNode *node;
        sds ci;

        if (server.verbosity == LL_DEBUG) { 调试级别,输出调试信息
            ci = representClusterNodeFlags(sdsempty(), flags); 拼接标志,用逗号分隔
            serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",

        /* Update our state accordingly to the gossip sections */
        node = clusterLookupNode(g->nodename);
        if (node) {
            /* We already know this node.
               Handle failure reports, only when the sender is a master. */
            if (sender && nodeIsMaster(sender) && node != myself) { 发送者不空 并且 是主节点 并且 报告节点不是自己
                if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
                    if (clusterNodeAddFailureReport(node,sender)) {  把发送者发送的故障节点添加到故障列表
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    markNodeAsFailingIfNeeded(node);  如果有需要标记该节点为故障节点
                } else {
                    if (clusterNodeDelFailureReport(node,sender)) { 将节点从故障列表移除
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);

            /* If from our POV the node is up (no failure flags are set),
             * we have no pending ping for the node, nor we have failure
             * reports for this node, update the last pong time with the
             * one we see from the other nodes. */
            if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&  没有故障标志
                node->ping_sent == 0 && 没有等待的ping命令
                clusterNodeFailureReportsCount(node) == 0) 没有该节点的故障报告
                mstime_t pongtime = ntohl(g->pong_received); 最后一次收到该节点pong的时间
                pongtime *= 1000; /* Convert back to milliseconds. */ 转化成毫秒

                /* Replace the pong time with the received one only if
                 * it's greater than our view but is not in the future
                 * (with 500 milliseconds tolerance) from the POV of our
                 * clock. */
                if (pongtime <= (server.mstime+500) &&
                    pongtime > node->pong_received)
                    node->pong_received = pongtime;

            /* If we already know this node, but it is not reachable, and
             * we see a different address in the gossip section of a node that
             * can talk with this other node, update the address, disconnect
             * the old link if any, so that we'll attempt to connect with the
             * new address. */
            if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) && 标志故障
                !(flags & CLUSTER_NODE_NOADDR) && 存在地址
                !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) && gossip信息中无故障标志
                (strcasecmp(node->ip,g->ip) ||
                 node->port != ntohs(g->port) ||
                 node->cport != ntohs(g->cport))) 
            {   如果节点保存的通信地址和gossip中的信息不一致,那就需要更新
                if (node->link) freeClusterLink(node->link); 释放旧的连接信息
                memcpy(node->ip,g->ip,NET_IP_STR_LEN); ip地址
                node->port = ntohs(g->port); 服务端口
                node->cport = ntohs(g->cport);  通信端口
                node->flags &= ~CLUSTER_NODE_NOADDR; 去掉无地址标志
        } else {
            /* If it's not in NOADDR state and we don't have it, we
             * add it to our trusted dict with exact nodeid and flag.
             * Note that we cannot simply start a handshake against
             * this IP/PORT pairs, since IP/PORT can be reused already,
             * otherwise we risk joining another cluster.
             * Note that we require that the sender of this gossip message
             * is a well known node in our cluster, otherwise we risk
             * joining another cluster. */
            if (sender && 存在发送者节点
                !(flags & CLUSTER_NODE_NOADDR) && 存在地址
                !clusterBlacklistExists(g->nodename)) 不在黑名单中
                clusterNode *node;
                node = createClusterNode(g->nodename, flags); 创建新节点
                node->port = ntohs(g->port);
                node->cport = ntohs(g->cport);
                clusterAddNode(node); 添加新节点

        /* Next node */  指向下个节点的gossip信息
/* Update the node address to the IP address that can be extracted
 * from link->fd, or if hdr->myip is non empty, to the address the node
 * is announcing us. The port is taken from the packet header as well.
 * If the address or port changed, disconnect the node link so that we'll
 * connect again to the new address.
 * If the ip/port pair are already correct no operation is performed at
 * all.
 * The function returns 0 if the node address is still the same,
 * otherwise 1 is returned. */
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
                              clusterMsg *hdr)
    char ip[NET_IP_STR_LEN] = {0};
    int port = ntohs(hdr->port);
    int cport = ntohs(hdr->cport);

    /* We don't proceed if the link is the same as the sender link, as this
     * function is designed to see if the node link is consistent with the
     * symmetric link that is used to receive PINGs from the node.
     用来从节点收取的pong消息的对称连接 信息是否一致。
     * As a side effect this function never frees the passed 'link', so
     * it is safe to call during packet processing. */
    if (link == node->link) return 0; 是同一个连接,无需处理

    nodeIp2String(ip,link,hdr->myip); 获取ip地址信息
    if (node->port == port && node->cport == cport &&
        strcmp(ip,node->ip) == 0) return 0; 信息一致,返回0

    /* IP / port is different, update it. */ IP或者端口有变化,更新
    node->port = port;
    node->cport = cport;
    if (node->link) freeClusterLink(node->link); 存在节点连接,清空
    node->flags &= ~CLUSTER_NODE_NOADDR; 清除无地址信息标志
    serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
        node->name, node->ip, node->port);

    /* Check if this is our master and we have to change the
     * replication target as well. */
    if (nodeIsSlave(myself) && myself->slaveof == node)  是我们的主节点
        replicationSetMaster(node->ip, node->port); 重新设置复制链路
    return 1;
/* Remove a node from the cluster. The function performs the high level
 * cleanup, calling freeClusterNode() for the low level cleanup.
 * Here we do the following:
 * 1) Mark all the slots handled by it as unassigned.
 * 2) Remove all the failure reports sent by this node and referenced by
 *    other nodes.
 * 3) Free the node with freeClusterNode() that will in turn remove it
 *    from the hash table and from the list of slaves of its master, if
 *    it is a slave node.
void clusterDelNode(clusterNode *delnode) {
    int j;
    dictIterator *di;
    dictEntry *de;

    /* 1) Mark slots as unassigned. */ 标记槽为未分配
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (server.cluster->importing_slots_from[j] == delnode)
            server.cluster->importing_slots_from[j] = NULL;
        if (server.cluster->migrating_slots_to[j] == delnode)
            server.cluster->migrating_slots_to[j] = NULL;
        if (server.cluster->slots[j] == delnode)
            clusterDelSlot(j);  清理槽的分配

    /* 2) Remove failure reports. */ 移除故障报告
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        if (node == delnode) continue;
        clusterNodeDelFailureReport(node,delnode);  清除该节点的故障报告

    /* 3) Free the node, unlinking it from the cluster. */

/* Delete the specified slot marking it as unassigned.
 * Returns C_OK if the slot was assigned, otherwise if the slot was
 * already unassigned C_ERR is returned. */
int clusterDelSlot(int slot) {
    clusterNode *n = server.cluster->slots[slot];

    if (!n) return C_ERR; 该槽未分配,无需清理,返回错误
    serverAssert(clusterNodeClearSlotBit(n,slot) == 1); 清除标志
    server.cluster->slots[slot] = NULL; 未分配
    return C_OK;

/* Clear the slot bit and return the old value. */
int clusterNodeClearSlotBit(clusterNode *n, int slot) {
    int old = bitmapTestBit(n->slots,slot);
    bitmapClearBit(n->slots,slot); 清除bitmap中的标志
    if (old) n->numslots--; 原来存在的情况,总数减1
    return old;

/* Low level cleanup of the node structure. Only called by clusterDelNode(). */
void freeClusterNode(clusterNode *n) {
    sds nodename;
    int j;

    /* If the node has associated slaves, we have to set
     * all the slaves->slaveof fields to NULL (unknown). */
    for (j = 0; j < n->numslaves; j++)
        n->slaves[j]->slaveof = NULL;

    /* Remove this node from the list of slaves of its master. */
    if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);

    /* Unlink from the set of nodes. */
    nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
    serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);

    /* Release link and associated data structures. */
    if (n->link) freeClusterLink(n->link);

/* This is only used after the handshake. When we connect a given IP/PORT
 * as a result of CLUSTER MEET we don't have the node name yet, so we
 * pick a random one, and will fix it when we receive the PONG request using
 * this function. */
这个函数只在握手之后使用,作为命令CLUSTER MEET的结果,当我们连接一对给定的IP/PORT,我们还没有节点的名字,
void clusterRenameNode(clusterNode *node, char *newname) {
    int retval;
    sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);

    serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
        node->name, newname);
    retval = dictDelete(server.cluster->nodes, s); 从节点哈希表中删除该名字对应的节点
    serverAssert(retval == DICT_OK);
    memcpy(node->name, newname, CLUSTER_NAMELEN);
    clusterAddNode(node); 添加新的节点到集群
/* This function is called only if a node is marked as FAIL, but we are able
 * to reach it again. It checks if there are the conditions to undo the FAIL
 * state. */
void clearNodeFailureIfNeeded(clusterNode *node) {
    mstime_t now = mstime();

    serverAssert(nodeFailed(node)); 确认标志了故障

    /* For slaves we always clear the FAIL flag if we can contact the
     * node again. */ 对于从节点,如果我们可以再次联系这个节点,那么就清除这个故障标志
    if (nodeIsSlave(node) || node->numslots == 0) {  是从节点  或者 没有服务槽主节点
            "Clear FAIL state for node %.40s: %s is reachable again.",
                nodeIsSlave(node) ? "replica" : "master without slots");
        node->flags &= ~CLUSTER_NODE_FAIL; 清除故障标志

    /* If it is a master and...
     * 1) The FAIL state is old enough.
     * 2) It is yet serving slots from our point of view (not failed over).
     * Apparently no one is going to fix these slots, clear the FAIL flag. */
    if (nodeIsMaster(node) && node->numslots > 0 &&   主节点  并且 节点服务的槽大于0
        (now - node->fail_time) >
        (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))  标记为故障的时间超过了 两倍节点超时时间
            "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
        node->flags &= ~CLUSTER_NODE_FAIL;

/* Reconfigure the specified node 'n' as a master. This function is called when
 * a node that we believed to be a slave is now acting as master in order to
 * update the state of the node. */
void clusterSetNodeAsMaster(clusterNode *n) {
    if (nodeIsMaster(n)) return;  本身是主节点,无事可做,返回

    if (n->slaveof) { 如果是从节点
        clusterNodeRemoveSlave(n->slaveof,n); 从主节点的从节点列表中删除该从节点
        if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;  对本节点来来说,是可用于从节点迁移的主节点
    n->flags &= ~CLUSTER_NODE_SLAVE; 取消丛节点标志
    n->flags |= CLUSTER_NODE_MASTER; 设置主节点标志
    n->slaveof = NULL; 为主节点

    /* Update config and state. */ 更新配置和状态

/* Delete all the slots associated with the specified node.
 * The number of deleted slots is returned. */
int clusterDelNodeSlots(clusterNode *node) {
    int deleted = 0, j;

    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (clusterNodeGetSlotBit(node,j)) { 获取槽是否属于删除节点
            clusterDelSlot(j);  属于就删除槽
            deleted++; 删除槽数量加1
    return deleted;
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
    int j;

    for (j = 0; j < master->numslaves; j++) {
        if (master->slaves[j] == slave) {
            if ((j+1) < master->numslaves) {  不是最后一个从节点,需要内存迁移
                int remaining_slaves = (master->numslaves - j) - 1;
                        (sizeof(*master->slaves) * remaining_slaves));
            master->numslaves--; 最后一个无需内存迁移,直接减1即可
            if (master->numslaves == 0) 该节点没有从节点了,无需做节点转移
                master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
            return C_OK;
    return C_ERR;
/* This function is called when we receive a master configuration via a
 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
 * node, and the set of slots claimed under this configEpoch.
 * What we do is to rebind the slots with newer configuration compared to our
 * local configuration, and if needed, we turn ourself into a replica of the
 * node (see the function comments for more info).
 * The 'sender' is the node for which we received a configuration update.
 * Sometimes it is not actually the "Sender" of the information, like in the
 * case we receive the info via an UPDATE packet. */
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
    int j;
    clusterNode *curmaster, *newmaster = NULL;
    /* The dirty slots list is a list of slots for which we lose the ownership
     * while having still keys inside. This usually happens after a failover
     * or after a manual cluster reconfiguration operated by the admin.
有争议的槽列表 是那些我们已经失去所有权但是还有键在里面的槽列表。
这种情况的发生通常在 故障转移之后 或者 在一次管理员手工重新配置之后发生
     * If the update message is not able to demote a master to slave (in this
     * case we'll resync with the master updating the whole key space), we
     * need to delete all the keys in the slots we lost ownership. */
    uint16_t dirty_slots[CLUSTER_SLOTS];
    int dirty_slots_count = 0;

    /* Here we set curmaster to this node or the node this node
     * replicates to if it's a slave. In the for loop we are
     * interested to check if slots are taken away from curmaster. */
    curmaster = nodeIsMaster(myself) ? myself : myself->slaveof; 获取主节点,主节点才有服务的槽的所有权

    if (sender == myself) { 如果是来自自身的信息,不更新返回
        serverLog(LL_WARNING,"Discarding UPDATE message about myself.");

    for (j = 0; j < CLUSTER_SLOTS; j++) { 遍历所有槽
        if (bitmapTestBit(slots,j)) { 测试槽是否有归属,有归属的槽需要进一步判断
            /* The slot is already bound to the sender of this message. */
            if (server.cluster->slots[j] == sender) continue;

            /* The slot is in importing state, it should be modified only
             * manually via redis-trib (example: a resharding is in progress
             * and the migrating side slot was already closed and is advertising
             * a new config. We still want the slot to be closed manually). */
            if (server.cluster->importing_slots_from[j]) continue; 有槽被导入

            /* We rebind the slot to the new node claiming it if:
             * 1) The slot was unassigned or the new node claims it with a
             *    greater configEpoch.
             * 2) We are not currently importing the slot. */
            if (server.cluster->slots[j] == NULL || 槽没有主人
                server.cluster->slots[j]->configEpoch < senderConfigEpoch) 小于新节点的纪元
                /* Was this slot mine, and still contains keys? Mark it as
                 * a dirty slot. */
                if (server.cluster->slots[j] == myself && 槽是我的
                    countKeysInSlot(j) && 还有键
                    sender != myself) 发送者不是本人
                    dirty_slots[dirty_slots_count] = j; 将其标记为争议槽

                if (server.cluster->slots[j] == curmaster) 槽是当前主机的
                    newmaster = sender;发送者成为槽的新主节点
                clusterDelSlot(j); 清空槽为未分配
                clusterAddSlot(sender,j); 将槽分配给发送者
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|  保存配置
                                     CLUSTER_TODO_UPDATE_STATE| 修改状态
                                     CLUSTER_TODO_FSYNC_CONFIG); 重新同步

    /* After updating the slots configuration, don't do any actual change
     * in the state of the server if a module disabled Redis Cluster
     * keys redirections. */
    if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)

    /* If at least one slot was reassigned from a node to another node
     * with a greater configEpoch, it is possible that:
     * 1) We are a master left without slots. This means that we were
     *    failed over and we should turn into a replica of the new
     *    master.
     * 2) We are a slave and our master is left without slots. We need
     *    to replicate to the new slots owner. */
    if (newmaster && curmaster->numslots == 0) { 主节点但是没有服务槽
            "Configuration change detected. Reconfiguring myself "
            "as a replica of %.40s", sender->name);
        clusterSetMaster(sender); 设置主节点
    } else if (dirty_slots_count) { 存在争议槽
        /* If we are here, we received an update message which removed
         * ownership for certain slots we still have keys about, but still
         * we are serving some slots, so this master node was not demoted to
         * a slave.
如果到了这里,那么我们收到了一个更新的消息, 已经移除所有权的特定槽,但是我们还拥有里面的键,
         * In order to maintain a consistent state between keys and slots
         * we need to remove all the keys from the slots we lost. */
        for (j = 0; j < dirty_slots_count; j++)
            delKeysInSlot(dirty_slots[j]); 删除争议槽中的键
/* Send an UPDATE message to the specified link carrying the specified 'node'
 * slots configuration. The node name, slots bitmap, and configEpoch info
 * are included. */
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;

    if (link == NULL) return; 如果连接是空的,返回
    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE); 组装集群消息
    memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN); 填充节点名
    hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch); 填充纪元
    clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen)); 向指定节点发送消息

/* Build the message header. hdr must point to a buffer at least
 * sizeof(clusterMsg) in bytes. */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
    int totlen = 0;
    uint64_t offset;
    clusterNode *master;

    /* If this node is a master, we send its slots bitmap and configEpoch.
     * If this node is a slave we send the master's information instead (the
     * node is flagged as slave so the receiver knows that it is NOT really
     * in charge for this slots. */
    master = (nodeIsSlave(myself) && myself->slaveof) ?
              myself->slaveof : myself;

    memset(hdr,0,sizeof(*hdr)); 初始化结构体
    hdr->ver = htons(CLUSTER_PROTO_VER); 版本
    hdr->sig[0] = 'R'; 签名
    hdr->sig[1] = 'C';
    hdr->sig[2] = 'm';
    hdr->sig[3] = 'b';
    hdr->type = htons(type); 类型
    memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN); 发送者名字

    /* If cluster-announce-ip option is enabled, force the receivers of our
     * packets to use the specified address for this node. Otherwise if the
     * first byte is zero, they'll do auto discovery. */
    if (server.cluster_announce_ip) { 非空,使用指定地址
        hdr->myip[NET_IP_STR_LEN-1] = '\0';

    /* Handle cluster-announce-port as well. */ 处理集群公告端口
    int port = server.tls_cluster ? server.tls_port : server.port; 是否使用SSL
    int announced_port = server.cluster_announce_port ?
                         server.cluster_announce_port : port;
    int announced_cport = server.cluster_announce_bus_port ?
                          server.cluster_announce_bus_port :
                          (port + CLUSTER_PORT_INCR);

    if (myself->slaveof != NULL) 主节点非空,即本身为从节点
        memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN); 更新主节点名
    hdr->port = htons(announced_port);
    hdr->cport = htons(announced_cport);
    hdr->flags = htons(myself->flags); 标志
    hdr->state = server.cluster->state; 状态

    /* Set the currentEpoch and configEpochs. */ 设置状态当前纪元 和 节点配置纪元
    hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
    hdr->configEpoch = htonu64(master->configEpoch);

    /* Set the replication offset. */ 设置复制偏移量
    if (nodeIsSlave(myself)) 是从节点
        offset = replicationGetSlaveOffset(); 获取从节点已经复制的偏移量
        offset = server.master_repl_offset; 主机等待复制的偏移量
    hdr->offset = htonu64(offset);

    /* Set the message flags. */ 设置消息标志
    if (nodeIsMaster(myself) && server.cluster->mf_end)
        hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; 故障转移中设置 暂停标志

    /* Compute the message length for certain messages. For other messages
     * this is up to the caller. */
    if (type == CLUSTERMSG_TYPE_FAIL) { 故障消息
        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
        totlen += sizeof(clusterMsgDataFail);
    } else if (type == CLUSTERMSG_TYPE_UPDATE) { 更新消息
        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
        totlen += sizeof(clusterMsgDataUpdate);
    hdr->totlen = htonl(totlen); 总长度
    /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
    对于PING, PONG, 和 MEET命令, 由调用者来确定总长度字段值

/* Put stuff into the send buffer.
 * It is guaranteed that this function will never have as a side effect
 * the link to be invalidated, so it is safe to call this function
 * from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
    if (sdslen(link->sndbuf) == 0 && msglen != 0) 没有缓冲区 但是有消息数据
        connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); 直接发送

    link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); 有缓冲区的情况,填充缓冲区

    /* Populate sent messages stats. */ 填充发送消息的统计信息
    clusterMsg *hdr = (clusterMsg*) msg;
    uint16_t type = ntohs(hdr->type);
        server.cluster->stats_bus_messages_sent[type]++; 等待发送消息加1
/* This function is called when this node is a master, and we receive from
 * another master a configuration epoch that is equal to our configuration
 * epoch.
 * It is not possible that different slaves get the same config
 * epoch during a failover election, because the slaves need to get voted
 * by a majority. However when we perform a manual resharding of the cluster
 * the node will assign a configuration epoch to itself without to ask
 * for agreement. Usually resharding happens when the cluster is working well
 * and is supervised by the sysadmin, however it is possible for a failover
 * to happen exactly while the node we are resharding a slot to assigns itself
 * a new configuration epoch, but before it is able to propagate it.
 * So technically it is possible in this condition that two nodes end with
 * the same configuration epoch.
 * Another possibility is that there are bugs in the implementation causing
 * this to happen.
 * Moreover when a new cluster is created, all the nodes start with the same
 * configEpoch. This collision resolution code allows nodes to automatically
 * end with a different configEpoch at startup automatically.
 * In all the cases, we want a mechanism that resolves this issue automatically
 * as a safeguard. The same configuration epoch for masters serving different
 * set of slots is not harmful, but it is if the nodes end serving the same
 * slots for some reason (manual errors or software bugs) without a proper
 * failover procedure.
 * In general we want a system that eventually always ends with different
 * masters having different configuration epochs whatever happened, since
 * nothign is worse than a split-brain condition in a distributed system.
 * When this function gets called, what happens is that if this node
 * has the lexicographically smaller Node ID compared to the other node
 * with the conflicting epoch (the 'sender' node), it will assign itself
 * the greatest configuration epoch currently detected among nodes plus 1.
 * This means that even if there are multiple nodes colliding, the node
 * with the greatest Node ID never moves forward, so eventually all the nodes
 * end with a different configuration epoch.
void clusterHandleConfigEpochCollision(clusterNode *sender) {
    /* Prerequisites: nodes have the same configEpoch and are both masters. */
    前提提交:节点都具有同样的配置纪元 并且 都是 主节点
    if (sender->configEpoch != myself->configEpoch ||
        !nodeIsMaster(sender) || !nodeIsMaster(myself)) return; 判断前提条件
    /* Don't act if the colliding node has a smaller Node ID. */
    if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
    /* Get the next ID available at the best of this node knowledge. */
    myself->configEpoch = server.cluster->currentEpoch; 
    clusterSaveConfigOrDie(1); 保存文件配置
        "WARNING: configEpoch collision with node %.40s."
        " configEpoch set to %llu",
        (unsigned long long) myself->configEpoch);
/* Vote for the node asking for our vote if there are the conditions. */
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
    clusterNode *master = node->slaveof;
    uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); 请求的当前纪元
    uint64_t requestConfigEpoch = ntohu64(request->configEpoch); 请求的配置纪元
    unsigned char *claimed_slots = request->myslots; 请求的槽
    int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK; 强制投票请求标志
    int j;

    /* IF we are not a master serving at least 1 slot, we don't have the
     * right to vote, as the cluster size in Redis Cluster is the number
     * of masters serving at least one slot, and quorum is the cluster
     * size + 1 */
    if (nodeIsSlave(myself) || myself->numslots == 0) return;

    /* Request epoch must be >= our currentEpoch. 
     * Note that it is impossible for it to actually be greater since
     * our currentEpoch was updated as a side effect of receiving this
     * request, if the request epoch was greater. */
请求节点的纪元必须大于等于 我们当前的纪元。
注意 请求纪元大于我们当前纪元是不太可能发生的情况,因为我们当前纪元会被更新,如果收到请求的纪元大于我们当前纪元。
    if (requestCurrentEpoch < server.cluster->currentEpoch) { 比我们当前纪元小,不要做任何事情
            "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
            (unsigned long long) requestCurrentEpoch,
            (unsigned long long) server.cluster->currentEpoch);

    /* I already voted for this epoch? Return ASAP. */ 我已经为这个节点投票? 尽快返回
    if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { 表示投过票了
                "Failover auth denied to %.40s: already voted for epoch %llu",
                (unsigned long long) server.cluster->currentEpoch);

    /* Node must be a slave and its master down.
     * The master can be non failing if the request is flagged
     * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
    if (nodeIsMaster(node) || master == NULL ||   是主节点  或者  主节点为空
        (!nodeFailed(master) && !force_ack)) 主节点没有失败并且 不是强制    这几种情况下,都不能故障转移
        if (nodeIsMaster(node)) { 本身是主节点的情况
                    "Failover auth denied to %.40s: it is a master node",
        } else if (master == NULL) { 是从节点,但是没有主节点信息
                    "Failover auth denied to %.40s: I don't know its master",
        } else if (!nodeFailed(master)) { 主节点没有下线
                    "Failover auth denied to %.40s: its master is up",

    /* We did not voted for a slave about this master for two
     * times the node timeout. This is not strictly needed for correctness
     * of the algorithm but makes the base case more linear. */
    if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
    {  从超时时间上确保 确保主节点下线
                "Failover auth denied to %.40s: "
                "can't vote about this master before %lld milliseconds",
                (long long) ((server.cluster_node_timeout*2)-
                             (mstime() - node->slaveof->voted_time)));

    /* The slave requesting the vote must have a configEpoch for the claimed
     * slots that is >= the one of the masters currently serving the same
     * slots in the current configuration. */
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(claimed_slots, j) == 0) continue;  该槽是否在请求中存在,不存在
        if (server.cluster->slots[j] == NULL || 该槽没有所属节点
            server.cluster->slots[j]->configEpoch <= requestConfigEpoch) 槽的所属节点的纪元 小于 请求纪元
        /* If we reached this point we found a slot that in our current slots
         * is served by a master with a greater configEpoch than the one claimed
         * by the slave requesting our vote. Refuse to vote for this slave. */
                "Failover auth denied to %.40s: "
                "slot %d epoch (%llu) > reqEpoch (%llu)",
                node->name, j,
                (unsigned long long) server.cluster->slots[j]->configEpoch,
                (unsigned long long) requestConfigEpoch);

    /* We can vote for this slave. */ 我们能为这个从节点投票了
    server.cluster->lastVoteEpoch = server.cluster->currentEpoch; 投票纪元更新
    node->slaveof->voted_time = mstime();跟新投票时间
    clusterSendFailoverAuth(node); 发送故障转移投票
    serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
        node->name, (unsigned long long) server.cluster->currentEpoch);

/* Send a FAILOVER_AUTH_ACK message to the specified node. */
void clusterSendFailoverAuth(clusterNode *node) {
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;

    if (!node->link) return;
    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); 组装投票信息
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    hdr->totlen = htonl(totlen);
    clusterSendMessage(node->link,(unsigned char*)buf,totlen); 向指定地址发送信息
typedef struct moduleClusterNodeInfo {
    int flags;
    char ip[NET_IP_STR_LEN];
    int port;
    char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */
} mdouleClusterNodeInfo;

/* We have an array of message types: each bucket is a linked list of
 * configured receivers. */
static moduleClusterReceiver *clusterReceivers[UINT8_MAX];

/* Dispatch the message to the right module receiver. */
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
    moduleClusterReceiver *r = clusterReceivers[type]; 定义接收者数组
    while(r) {
        if (r->module_id == module_id) { 匹配模块ID
            RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 
            ctx.module = r->module;
            ctx.client = moduleFreeContextReusedClient;
            selectDb(ctx.client, 0);
            r->callback(&ctx,sender_id,type,payload,len); 调用注册的回调函数
        r = r->next;
/* This function is called every time we get a failure report from a node.
 * The side effect is to populate the fail_reports list (or to update
 * the timestamp of an existing report).
 * 'failing' is the node that is in failure state according to the
 * 'sender' node.
 * The function returns 0 if it just updates a timestamp of an existing
 * failure report from the same sender. 1 is returned if a new failure
 * report is created. */
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
    list *l = failing->fail_reports; 获取故障报告列表
    listNode *ln;
    listIter li;
    clusterNodeFailReport *fr;

    /* If a failure report from the same sender already exists, just update
     * the timestamp. */
    while ((ln = listNext(&li)) != NULL) {
        fr = ln->value;
        if (fr->node == sender) { 来自同样发送者
            fr->time = mstime(); 更新时间戳
            return 0;

    /* Otherwise create a new report. */ 否则创建新的故障报告
    fr = zmalloc(sizeof(*fr));
    fr->node = sender;
    fr->time = mstime();
    listAddNodeTail(l,fr); 加入到故障列表
    return 1;
/* This function checks if a given node should be marked as FAIL.
 * It happens if the following conditions are met:
 * 1) We received enough failure reports from other master nodes via gossip.
 *    Enough means that the majority of the masters signaled the node is
 *    down recently.
 * 2) We believe this node is in PFAIL state.
 * If a failure is detected we also inform the whole cluster about this
 * event trying to force every other node to set the FAIL flag for the node.
 * Note that the form of agreement used here is weak, as we collect the majority
 * of masters state during some time, and even if we force agreement by
 * propagating the FAIL message, because of partitions we may not reach every
 * node. However:
 * 1) Either we reach the majority and eventually the FAIL state will propagate
 *    to all the cluster.
 * 2) Or there is no majority so no slave promotion will be authorized and the
 *    FAIL flag will be cleared after some time.
2)要么 没有达到多数票,这样从节点不会被提升,(主节点)故障标记将会在将来清除
void markNodeAsFailingIfNeeded(clusterNode *node) {
    int failures;
    int needed_quorum = (server.cluster->size / 2) + 1; 法定人数  集群大小的一半 + 1

    if (!nodeTimedOut(node)) return; /* We can reach it. */ 不超时,我们可以交流
    if (nodeFailed(node)) return; /* Already FAILing. */ 已经标记故障

    failures = clusterNodeFailureReportsCount(node); 统计报告故障的节点数
    /* Also count myself as a voter if I'm a master. */ 如果本节点是主节点,也要计数
    if (nodeIsMaster(myself)) failures++;
    if (failures < needed_quorum) return; /* No weak agreement from masters. */ 

        "Marking node %.40s as failing (quorum reached).", node->name);

    /* Mark the node as failing. */ 标记该节点为故障
    node->flags &= ~CLUSTER_NODE_PFAIL;
    node->flags |= CLUSTER_NODE_FAIL;
    node->fail_time = mstime();

    /* Broadcast the failing node name to everybody, forcing all the other
     * reachable nodes to flag the node as FAIL. */
    if (nodeIsMaster(myself)) clusterSendFail(node->name);

/* Send a FAIL message to all the nodes we are able to contact.
 * The FAIL message is sent when we detect that a node is failing
 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
 * nodes to do the same ASAP. */
那么就就发送这个故障信息: 我们转变节点的状态到CLUSTER_NODE_FAIL并且请求所有其它节点尽快做相同的事情
void clusterSendFail(char *nodename) {
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;

    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); 组装故障信息
    clusterBroadcastMessage(buf,ntohl(hdr->totlen)); 广播故障信息

/* Send a message to all the nodes that are part of the cluster having
 * a connected link.
 * It is guaranteed that this function will never have as a side effect
 * some node->link to be invalidated, so it is safe to call this function
 * from event handlers that will do stuff with node links later. */
void clusterBroadcastMessage(void *buf, size_t len) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(server.cluster->nodes); 遍历所有节点
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        if (!node->link) continue; 没有联系的
        if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) 自己活着处于握手状态
        clusterSendMessage(node->link,buf,len); 给连接节点发送消息
/* Remove the failing report for 'node' if it was previously considered
 * failing by 'sender'. This function is called when a node informs us via
 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
从它的视角看一个节点是正常的(没有故障 或者 准故障标志)
 * Note that this function is called relatively often as it gets called even
 * when there are no nodes failing, and is O(N), however when the cluster is
 * fine the failure reports list is empty so the function runs in constant
 * time.
 * The function returns 1 if the failure report was found and removed.
 * Otherwise 0 is returned. */
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
    list *l = node->fail_reports;
    listNode *ln;
    listIter li;
    clusterNodeFailReport *fr;

    /* Search for a failure report from this sender. */ 搜索来自此发件人的故障报告
    while ((ln = listNext(&li)) != NULL) {
        fr = ln->value;
        if (fr->node == sender) break;
    if (!ln) return 0; /* No failure report from this sender. */ 没有来自此发件人的故障报告

    /* Remove the failure report. */ 从故障报告移除
    listDelNode(l,ln); 移除
    clusterNodeCleanupFailureReports(node); 清除超时的故障报告
    return 1;
/* Return the number of external nodes that believe 'node' is failing,
 * not including this node, that may have a PFAIL or FAIL state for this
 * node as well. */
int clusterNodeFailureReportsCount(clusterNode *node) {
    clusterNodeCleanupFailureReports(node); 删除过久的节点故障报告
    return listLength(node->fail_reports); 统计节点故障报告的数量
/* Remove failure reports that are too old, where too old means reasonably
 * older than the global node timeout. Note that anyway for a node to be
 * flagged as FAIL we need to have a local PFAIL state that is at least
 * older than the global node timeout, so we don't just trust the number
 * of failure reports from other nodes. */
void clusterNodeCleanupFailureReports(clusterNode *node) {
    list *l = node->fail_reports;
    listNode *ln;
    listIter li;
    clusterNodeFailReport *fr;
    mstime_t maxtime = server.cluster_node_timeout *
                     CLUSTER_FAIL_REPORT_VALIDITY_MULT; 两倍的全局节点超时时间
    mstime_t now = mstime();

    while ((ln = listNext(&li)) != NULL) {
        fr = ln->value;
        if (now - fr->time > maxtime) listDelNode(l,ln); 超过两倍节点超时两倍的时间,删除之
/* Return non-zero if the specified node ID exists in the blacklist.
 * You don't need to pass an sds string here, any pointer to 40 bytes
 * will work. */
如果指定的节点ID存在黑名单中,返回非0. 在这里不需要传递sds字符串,任何指向40个字节的指针即可工作
int clusterBlacklistExists(char *nodeid) {
    sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
    int retval;

    clusterBlacklistCleanup(); 删除过时的黑名单节点
    retval = dictFind(server.cluster->nodes_black_list,id) != NULL; 查找是否在黑名单哈希表中
    return retval;

/* Before of the addNode() or Exists() operations we always remove expired
 * entries from the black list. This is an O(N) operation but it is not a
 * problem since add / exists operations are called very infrequently and
 * the hash table is supposed to contain very little elements at max.
 * However without the cleanup during long uptimes and with some automated
 * node add/removal procedures, entries could accumulate. */
void clusterBlacklistCleanup(void) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(server.cluster->nodes_black_list);
    while((de = dictNext(di)) != NULL) {
        int64_t expire = dictGetUnsignedIntegerVal(de);

        if (expire < server.unixtime) 过时就删除
/* Set the specified node 'n' as master for this node.
 * If this node is currently a master, it is turned into a slave. */
void clusterSetMaster(clusterNode *n) {
    serverAssert(n != myself);确定不是本身
    serverAssert(myself->numslots == 0); 服务的槽数为0,否则不能变为从节点

    if (nodeIsMaster(myself)) { 本身是主节点
        myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO); 取消 主节点 和 可迁移标志 
        myself->flags |= CLUSTER_NODE_SLAVE; 
        clusterCloseAllSlots();  清理所有槽的迁移和导入状态
    } else {
        if (myself->slaveof) 是从节点,且有主节点
            clusterNodeRemoveSlave(myself->slaveof,myself); 从之前的主节点的列表移除该从节点
    myself->slaveof = n;
    replicationSetMaster(n->ip, n->port); 从新主节点赋值数据
    resetManualFailover(); 一切结束,重置手工故障转移状态