redis6.0.5之cluster.c阅读笔记2-报文处理
************************************************************************************************
/* When this function is called, there is a packet to process starting
* at node->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
* packet, modifying the cluster state if needed.
当这个函数被调用时,缓存node->rcvbuf就有一个等待处理的数据报。释放这个数据报由调用者进行,
所以这个函数应该对数据报更高级别的处理,如果有需要修改集群状态
* The function returns 1 if the link is still valid after the packet
* was processed, otherwise 0 if the link was freed since the packet
* processing lead to some inconsistency error (for instance a PONG
* received from the wrong sender ID). */
这个函数返回1,如果报文经过处理之后,连接一直有效,
否则如果经过报文处理之后导致一些不一致的错误(例如收到一个错误的发送者ID发送的PONG回复),连接失效返回0.
等待处理报文的结构体如下
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */ 开头标记RCmb (Redis集群消息总线)
uint32_t totlen; /* Total length of this message */ 消息的总长度
uint16_t ver; /* Protocol version, currently set to 1. */ 协议版本,当前设置为1
uint16_t port; /* TCP base port number. */ TCP连接的端口号
uint16_t type; /* Message type */ 消息类型
uint16_t count; /* Only used for some kind of messages. */ 只对部分类型的消息使用
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ 对应发送节点的纪元
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */如果是主机就是配置的纪元,如果是从机,那就是主机建议设置的最后纪元
uint64_t offset; /* Master replication offset if node is a master or 如果是主机就是保存的复制偏移数据(等待复制)
processed replication offset if node is a slave. */ 如果是从机,那就是已处理的复制偏移量
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */ 发送节点的名字
unsigned char myslots[CLUSTER_SLOTS/8];具体的槽
char slaveof[CLUSTER_NAMELEN]; 从机名
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */ 发送者的IP,如果不全是0
char notused1[34]; /* 34 bytes reserved for future usage. */ 保留的34个字节,为将来做准备
uint16_t cport; /* Sender TCP cluster bus port */ 发送者的TCP集群总线端口
uint16_t flags; /* Sender node flags */ 发送者的节点标志
unsigned char state; /* Cluster state from the POV of the sender */ 从发送者角度看的集群状态
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */ 消息的标志:
union clusterMsgData data; 具体的集群消息数据
} clusterMsg;
/* Message flags better specify the packet content or are used to
* provide some information about the node state. */
消息标志用于更好的确定报文内容或者 被用来提供节点状态的一些信息
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */ 为了手工故障转移停止主机
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if 给出确认回复给认证请求,即使主机在线
master is up. */
int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen); 获取总长度
uint16_t type = ntohs(hdr->type); 获取消息类型
mstime_t now = mstime(); 分析报文时间
if (type < CLUSTERMSG_TYPE_COUNT) 消息的种类是否是未知的,不能处理不知道的消息种类
server.cluster->stats_bus_messages_received[type]++;
serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
type, (unsigned long) totlen);
/* Perform sanity checks */ 执行正常检查
if (totlen < 16) return 1; /* At least signature, version, totlen, count. */ 总长度至少16个字节,否则格式错误
if (totlen > sdslen(link->rcvbuf)) return 1; 总长度比缓存要大,也是格式错误
if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) { 检查版本
/* Can't handle messages of different versions. */ 不能处理不同版本的消息
return 1;
}
uint16_t flags = ntohs(hdr->flags); 获取发送者的节点标志
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET) 是PING PONG MEET 三种消息的一种
{
uint16_t count = ntohs(hdr->count); 获取计数信息
uint32_t explen; /* expected length of this packet */ 报文的期望长度
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 总的结构体大小-数据联合体的大小
explen += (sizeof(clusterMsgDataGossip)*count); 实际的联合体数据长度
if (totlen != explen) return 1; 总长度和期望长度不等,不处理返回
} else if (type == CLUSTERMSG_TYPE_FAIL) { 失败消息的期望长度
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH) { 发布的消息
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
8 + 8字节无效填充数据
ntohl(hdr->data.publish.msg.channel_len) + 渠道长度
ntohl(hdr->data.publish.msg.message_len); 消息长度
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART)
{
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_MODULE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
3 + ntohl(hdr->data.module.msg.len);
if (totlen != explen) return 1;
}
/* Check if the sender is a known node. Note that for incoming connections
* we don't store link->node information, but resolve the node by the
* ID in the header each time in the current implementation. */
检查发送者是一个已知节点。注意到对于传入连接,我们不保存连接节点的信息,在当前的版本中通过头部的id来识别节点
sender = clusterLookupNode(hdr->sender); 通过名字查找发送节点
/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
* is just sending a lot of data in the cluster bus, for instance
* because of Pub/Sub. */
更新最后一次我们看到来自这个节点数据的时间。我们这样做的目的是
对于只是在集群总线上为了实例订阅发送大量数据的节点避免检测到超时。
if (sender) sender->data_received = now; 更新收到该节点数据的时间,防止误判该节点超时
if (sender && !nodeInHandshake(sender)) { 存在发送者 并且 不处于握手状态
/* Update our curretEpoch if we see a newer epoch in the cluster. */
如果我们在集群发现一个新的纪元,更新我们的纪元。
senderCurrentEpoch = ntohu64(hdr->currentEpoch); 发送者当前纪元
senderConfigEpoch = ntohu64(hdr->configEpoch); 发送者的配置纪元
if (senderCurrentEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = senderCurrentEpoch;
/* Update the sender configEpoch if it is publishing a newer one. */
if (senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG); 设置要做事情的标志
}
/* Update the replication offset info for this node. */
为这个节点更新复制偏移量信息
sender->repl_offset = ntohu64(hdr->offset);发送者的偏移量
sender->repl_offset_time = now; 复制偏移量更新时间
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused, populate the MF state. */
如果我们是一台正在执行故障转移的从机,而且这个是我们已经停止的主机发送它的偏移量,填充MF(Manual Failover)的状态
if (server.cluster->mf_end && 限制时间不为0
nodeIsSlave(myself) && 节点是从机
myself->slaveof == sender && 主机是发送者
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && 主机已经停止
server.cluster->mf_master_offset == 0) 偏移数据还没有开始接收
{
server.cluster->mf_master_offset = sender->repl_offset;设置偏移量
serverLog(LL_WARNING,
"Received replication offset for paused "
"master manual failover: %lld",
server.cluster->mf_master_offset);
}
}
/* Initial processing of PING and MEET requests replying with a PONG. */
处理用PONG回复的PING和MEET发送的初始请求
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
/* We use incoming MEET messages in order to set the address
* for 'myself', since only other cluster nodes will send us
* MEET messages on handshakes, when the cluster joins, or
* later if we changed address, and those nodes will use our
* official address to connect to us. So by obtaining this address
* from the socket is a simple way to discover / update our own
* address in the cluster without it being hardcoded in the config.
我们使用输入的MEET消息为了设置myself的地址,另外集群节点在握手的时将会发送给我们MEET消息,
因为当加入集群或者后面我们修改了地址时,这些节点将使用我们的官方地址来联系我们。
因此,从套接字获取这个地址是一种简单的方法,可以在集群中发现/更新我们自己的地址,而无需在配置中硬编码
* However if we don't have an address at all, we update the address
* even with a normal PING packet. If it's wrong it will be fixed
* by MEET later. */
然而如果我们没有任何地址,我们将通过正常的数据报更新地址。如果出错了,那么将由MEET命令带来的信息更新。
if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
server.cluster_announce_ip == NULL)
{
char ip[NET_IP_STR_LEN];
if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip)) 通过连接获取ip
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
serverLog(LL_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
如果这个节点对我们来说是新的,并且消息类型是MEET,将其加入我们的集群。
再这个步骤我们不想尝试用正确的表示添加节点,从机指针等等,因为这些详细信息会通过接收节点PONG消息而得到解决
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE); 创建新节点
nodeIp2String(node->ip,link,hdr->myip); 将节点ip转化为字符串
node->port = ntohs(hdr->port);
node->cport = ntohs(hdr->cport);
clusterAddNode(node); 添加节点到哈希表中
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
如果是从未知节点发送的MEET消息,我们这里还是处理gossip选举,因为根据消息类型我们必须信任发送者(可能是新节点)
if (!sender && type == CLUSTERMSG_TYPE_MEET)
clusterProcessGossipSection(hdr,link);
/* Anyway reply with a PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG); 发送ping命令。该函数放到后面定时任务再讲
}
/* PING, PONG, MEET: process config information. */ 对名利PING PONG MEET : 处理配置信息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
serverLog(LL_DEBUG,"%s packet received: %p",
type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
(void*)link->node);
if (link->node) { 如果节点存在
if (nodeInHandshake(link->node)) { 处于握手阶段
/* If we already have this node, try to change the
* IP/port of the node with the new one. */
如果我们已经有这个节点,尝试用这个新的值改变这个节点的ip/port值
if (sender) { 如果发送者不为空
serverLog(LL_VERBOSE,
"Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name);
if (nodeUpdateAddressIfNeeded(sender,link,hdr)) 有必要的话更新节点的地址信息(ip/port)
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Free this node as we already have it. This will
* cause the link to be freed as well. */
释放这个与我们已经拥有的节点。这个会导致连接也被释放
clusterDelNode(link->node); 删除节点
return 0;
}
/* First thing to do is replacing the random name with the
* right node name if this was a handshake stage. */
如果是处于一个握手状态,那么第一件要做的事情是用正确的节点名替换随机名
clusterRenameNode(link->node, hdr->sender); 替换随机名字
serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else if (memcmp(link->node->name,hdr->sender,
CLUSTER_NAMELEN) != 0) 如果节点关联名和发送者名字不一样
{
/* If the reply has a non matching node ID we
* disconnect this node and set it as not having an associated
* address. */ 如果回复拥有一个不匹配的节点ID,我们断开这个节点并且设置该节点没有关联的地址
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
(int)(now-(link->node->ctime)),
link->node->flags);
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}
/* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
* announced. This is a dynamic flag that we receive from the
* sender, and the latest status must be trusted. We need it to
* be propagated because the slave ranking used to understand the
* delay of each slave in the voting process, needs to know
* what are the instances really competing. */
复制从发送者宣告的CLUSTER_NODE_NOFAILOVER标志。这是一个从发送者接收到的动态标志,最近的状态必须被信任。
我们需要填充它,因为从机排序的时候需要用来在投票处理时每个从机的延时,需要知道哪些实例有真正的竞争力
if (sender) { 存在发送者
int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
sender->flags |= nofailover;
}
/* Update the node address if it changed. */ 如果地址变化了更新节点地址
if (sender && type == CLUSTERMSG_TYPE_PING &&
!nodeInHandshake(sender) && 不处于握手状态
nodeUpdateAddressIfNeeded(sender,link,hdr)) 更新节点地址
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Update our info about the node */ 更新我们对于节点的信息
if (link->node && type == CLUSTERMSG_TYPE_PONG) { 如果收到了回复信息pong
link->node->pong_received = now; 更新收到时间
link->node->ping_sent = 0; 最近发送过ping的时间清零,因为已经收到了回复
/* The PFAIL condition can be reversed without external
* help if it is momentary (that is, if it does not
* turn into a FAIL state).
如果PFAIL状态是瞬时的(即,如果没有变为故障状态),则可以在无需外部帮助的情况下反转PFAIL状态
* The FAIL condition is also reversible under specific
* conditions detected by clearNodeFailureIfNeeded(). */
在函数ClearNodeFailureIfRequired检测到的特定条件下,FAIL状态也是可逆的
if (nodeTimedOut(link->node)) { 如果超时
link->node->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
} else if (nodeFailed(link->node)) { 节点下线
clearNodeFailureIfNeeded(link->node); 如果有需要清除接地那的下线状态
}
}
/* Check for role switch: slave -> master or master -> slave. */
检测角色变换 : 从机->主机 或者 主机->从机
if (sender) {发送者不为空
if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
sizeof(hdr->slaveof))) 如果存在主机,那么和CLUSTER_NODE_NULL_NAME比较就是不为0,!就是为0
{
/* Node is a master. */ 不存在该节点的主机,那么该节点就是主机
clusterSetNodeAsMaster(sender); 设置发送节点为主机
} else {
/* Node is a slave. */ 节点是从机
clusterNode *master = clusterLookupNode(hdr->slaveof); 查找主机
if (nodeIsMaster(sender)) { 如果发送节点是主机
/* Master turned into a slave! Reconfigure the node. */
主机转变为从机!重新配置该节点
clusterDelNodeSlots(sender); 删除节点的服务的槽
sender->flags &= ~(CLUSTER_NODE_MASTER|
CLUSTER_NODE_MIGRATE_TO); 设置迁移标志
sender->flags |= CLUSTER_NODE_SLAVE;
/* Update config and state. */ 更新配置和状态
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Master node changed for this slave? */
if (master && sender->slaveof != master) { 两次的主节点不一致
if (sender->slaveof) 主节点非空
clusterNodeRemoveSlave(sender->slaveof,sender); 将从节点从主节点从机列表中删除
clusterNodeAddSlave(master,sender); 将从节点添加到主节点master中
sender->slaveof = master; 设置从节点的新主节点
/* Update config. */ 更新配置
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
}
/* Update our info about served slots.
*更新我们关于服务槽的信息
* Note: this MUST happen after we update the master/slave state
* so that CLUSTER_NODE_MASTER flag will be set. */
注意:这个必须在我们更新主机/从机状态之后发生,因此标志CLUSTER_NODE_MASTER会被设置
/* Many checks are only needed if the set of served slots this
* instance claims is different compared to the set of slots we have
* for it. Check this ASAP to avoid other computational expansive
* checks later. */
仅当此实例声明的服务插槽集与我们为其提供的插槽集不同时,才需要进行许多检查。
尽快检查此项,以避免以后进行其他计算扩展检查
clusterNode *sender_master = NULL; /* Sender or its master if slave. */ 发送者是主机就是本身 或者 从机时为它的主机
int dirty_slots = 0; /* Sender claimed slots don't match my view? */ 发送者申明的槽和我本地保存的不匹配
if (sender) {
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof; 获取主节点
if (sender_master) {主机点存在
dirty_slots = memcmp(sender_master->slots, 本地保存的节点槽信息 和 收到信息的节点槽信息不一致情况
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}
/* 1) If the sender of the message is a master, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
1)如果发送者是主节点,我们检测到它申明的槽集合有变化,扫描槽看看我们是否需要更新我们的配置
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots); 更新配置的槽信息
/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a master with a
* greater configEpoch. If this happens we inform the sender.
2)我们同样检测相反的情况,那就是,发送者声明服务的槽由我们知道的另外具有更大纪元的主节点提供服务。
如果发生这种情况,我们需要通知发送者
* This is useful because sometimes after a partition heals, a
* reappearing master may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
这个是有用的,因为有些时候经过分区修复,一个重现出现的主节点可能是最后一个声明给定的服务槽集,
但是它的配置已经被其它实例弃用,举例如下:
* A and B are master and slave for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
A和B是主节点和从节点,服务槽1,2,3. A被隔离了,B得到了提升(为主机)。
B被隔离了,A又回来了。
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a master (or can try to
* failover if there are the conditions to win the election). */
通常B会pingA,推送它服务的槽和纪元,但是因为分区B不能通知A最新的配置,
因此,具有更新表的其他节点必须这样做(这样做指的是 通知A最新的配置)。
在这种情况下,A将停止作为主节点的响应(或者尝试故障转移如果有条件赢得选举)
if (sender && dirty_slots) { 有不匹配的槽
int j;
for (j = 0; j < CLUSTER_SLOTS; j++) { 遍历所有槽
if (bitmapTestBit(hdr->myslots,j)) { 是服务的槽
if (server.cluster->slots[j] == sender || 属于发送者
server.cluster->slots[j] == NULL) continue; 或者空 继续
if (server.cluster->slots[j]->configEpoch >
senderConfigEpoch) 槽的属于者不是发送者 也不空,即属于另外的主节点服务,需要比较纪元
{
serverLog(LL_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
sender->name, server.cluster->slots[j]->name);
clusterSendUpdate(sender->link,
server.cluster->slots[j]); 本地纪元大,需要通知发送者更新信息
/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
TODO:代替退出循环,为其它的发送者服务槽的新拥有者发送每个其它更新报。就是将改变的槽
break;
}
}
}
}
/* If our config epoch collides with the sender's try to fix
* the problem. */
如果我们的配置纪元和发送者冲突,那么尝试解决这个问题
if (sender &&
nodeIsMaster(myself) && nodeIsMaster(sender) &&
senderConfigEpoch == myself->configEpoch) 两者纪元一致
{
clusterHandleConfigEpochCollision(sender); 解决纪元相同冲突问题,名字字典序更小的就有机会获取更大纪元
}
/* Get info from the gossip section */ 从gossip中获取信息
if (sender) clusterProcessGossipSection(hdr,link);
} else if (type == CLUSTERMSG_TYPE_FAIL) { 下线的情况
clusterNode *failing;
if (sender) {
failing = clusterLookupNode(hdr->data.fail.about.nodename); 查找节点
if (failing &&
!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF))) 失败需要设置标志
{
serverLog(LL_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= CLUSTER_NODE_FAIL;
failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
} else {
serverLog(LL_NOTICE,
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
robj *channel, *message;
uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
如果没有发布/订阅订阅者,不要劳心创建无用的对象
if (dictSize(server.pubsub_channels) ||
listLength(server.pubsub_patterns))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject(
(char*)hdr->data.publish.msg.bulk_data,channel_len);
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
pubsubPublishMessage(channel,message); 给订阅者推送信息
decrRefCount(channel);
decrRefCount(message);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */ 我们不知道这个节点
clusterSendFailoverAuthIfNeeded(sender,hdr); 发送失败确认信息
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a master serving
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
我们考虑这个投票当且仅当发送者是一个拥有服务槽的主节点,
并且它的当前纪元是大于或者等于纪元,当这个节点开始选举时。
if (nodeIsMaster(sender) && sender->numslots > 0 && 主节点 服务槽数不为0
senderCurrentEpoch >= server.cluster->failover_auth_epoch) 发送者当前纪元 大于等于 故障转移确认纪元
{
server.cluster->failover_auth_count++; 获得投票数加1
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */ 如果我们得到了法定数量,设置一个标志,确认我们尽快检查
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
} else if (type == CLUSTERMSG_TYPE_MFSTART) { 故障转移开始
/* This message is acceptable only if I'm a master and the sender
* is one of my slaves. */这个消息被接收当且仅当我是主节点并且发送者是我的一个从节点
if (!sender || sender->slaveof != myself) return 1;
/* Manual failover requested from slaves. Initialize the state
* accordingly. */从节点发送的手工故障转移请求,相应的初始化状态
resetManualFailover(); 重置手工故障转移
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; 设置时间限制
server.cluster->mf_slave = sender; 执行手工转移的节点
pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); 设置客户端不可用时间
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
clusterNode *n; /* The node the update is about. */ 更新所涉及的节点
uint64_t reportedConfigEpoch =
ntohu64(hdr->data.update.nodecfg.configEpoch); 当前纪元
if (!sender) return 1; /* We don't know the sender. */ 不认识发送者
n = clusterLookupNode(hdr->data.update.nodecfg.nodename); 查找节点
if (!n) return 1; /* We don't know the reported node. */ 不知道报告节点
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */ 我们当前配置更新,没有必要更新
/* If in our current config the node is a slave, set it as a master. */
如果在我们的当前配置的节点是从节点,设置它为主节点
if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
/* Update the node's configEpoch. */ 更新节点的纪元配置
n->configEpoch = reportedConfigEpoch; 设置更高的纪元
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
/* Check the bitmap of served slots and update our
* config accordingly. */ 通过bitmap检查服务的槽,对应的更新我们的配置
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
对于不认识的节点信息,不做任何处理,保护我们的模块
if (!sender) return 1; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
我们需要传递消息回去给订阅了该消息类型的模块
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */ 字节序安全的ID
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name,module_id,type,payload,len); 通知订阅了该消息的模块
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
return 1;
}
************************************************************************************************
/* Process the gossip section of PING or PONG packets.
* Note that this function assumes that the packet is already sanity-checked
* by the caller, not in the content of the gossip section, but in the
* length. */
处理PING或PONG报文中gossip协议部分。注意该函数假定报文已经经过调用者合法性检查,
不是说对gossip写一部分的检查,而是针对整个报文的长度的合法性检查
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip; 获取数据部分
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); 查找节点
while(count--) {
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;
if (server.verbosity == LL_DEBUG) { 调试级别,输出调试信息
ci = representClusterNodeFlags(sdsempty(), flags); 拼接标志,用逗号分隔
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ntohs(g->cport),
ci);
sdsfree(ci);
}
/* Update our state accordingly to the gossip sections */
根据gossip协议部分更新我们的状态
node = clusterLookupNode(g->nodename);
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
我们已经知道的节点,只有当发送者是一个主节点时候,处理故障的报告
if (sender && nodeIsMaster(sender) && node != myself) { 发送者不空 并且 是主节点 并且 报告节点不是自己
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
if (clusterNodeAddFailureReport(node,sender)) { 把发送者发送的故障节点添加到故障列表
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
markNodeAsFailingIfNeeded(node); 如果有需要标记该节点为故障节点
} else {
if (clusterNodeDelFailureReport(node,sender)) { 将节点从故障列表移除
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
/* If from our POV the node is up (no failure flags are set),
* we have no pending ping for the node, nor we have failure
* reports for this node, update the last pong time with the
* one we see from the other nodes. */
如果从我们的角度看节点是在线的(无故障标志设置),我们没有等待该节点的ping命令(即等待ping的返回pong),
也没有对这个节点的故障报告,那么用我们能看到的其它节点中的pong回复时间来更新最后一个pong时间
if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) && 没有故障标志
node->ping_sent == 0 && 没有等待的ping命令
clusterNodeFailureReportsCount(node) == 0) 没有该节点的故障报告
{
mstime_t pongtime = ntohl(g->pong_received); 最后一次收到该节点pong的时间
pongtime *= 1000; /* Convert back to milliseconds. */ 转化成毫秒
/* Replace the pong time with the received one only if
* it's greater than our view but is not in the future
* (with 500 milliseconds tolerance) from the POV of our
* clock. */
如果接收到节点的pong时间大于我们当前的时间,而且从我们的时钟看没有超出很远(500毫秒以内),
那么用接收到节点的pong时间代替当下的pong时间
if (pongtime <= (server.mstime+500) &&
pongtime > node->pong_received)
{
node->pong_received = pongtime;
}
}
/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section of a node that
* can talk with this other node, update the address, disconnect
* the old link if any, so that we'll attempt to connect with the
* new address. */
如果我们已经知道这个节点,但是它不可达,不过我们在另外节点gossip部分中看到了一个不同地址,
可以和另外的节点进行交流,更新该地址,如果存在断开旧的连接,因此我们将尝试连接新的地址
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) && 标志故障
!(flags & CLUSTER_NODE_NOADDR) && 存在地址
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) && gossip信息中无故障标志
(strcasecmp(node->ip,g->ip) ||
node->port != ntohs(g->port) ||
node->cport != ntohs(g->cport)))
{ 如果节点保存的通信地址和gossip中的信息不一致,那就需要更新
if (node->link) freeClusterLink(node->link); 释放旧的连接信息
memcpy(node->ip,g->ip,NET_IP_STR_LEN); ip地址
node->port = ntohs(g->port); 服务端口
node->cport = ntohs(g->cport); 通信端口
node->flags &= ~CLUSTER_NODE_NOADDR; 去掉无地址标志
}
} else {
/* If it's not in NOADDR state and we don't have it, we
* add it to our trusted dict with exact nodeid and flag.
* Note that we cannot simply start a handshake against
* this IP/PORT pairs, since IP/PORT can be reused already,
* otherwise we risk joining another cluster.
节点不是处于无地址状态,并且我们还没有这个节点,我们把准确的节点id和标志添加它到我们信任的字典。
注意到我们不能简单的对这个IP/PROT对开始一个握手协议,因为IP/PROT可能被重复使用,否则我们就有加入另一个集群的风险
* Note that we require that the sender of this gossip message
* is a well known node in our cluster, otherwise we risk
* joining another cluster. */
请注意,我们要求此gossip协议消息的发送者是集群中的已知节点,否则我们将冒加入另一个集群的风险。
if (sender && 存在发送者节点
!(flags & CLUSTER_NODE_NOADDR) && 存在地址
!clusterBlacklistExists(g->nodename)) 不在黑名单中
{
clusterNode *node;
node = createClusterNode(g->nodename, flags); 创建新节点
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->cport = ntohs(g->cport);
clusterAddNode(node); 添加新节点
}
}
/* Next node */ 指向下个节点的gossip信息
g++;
}
}
************************************************************************************************
/* Update the node address to the IP address that can be extracted
* from link->fd, or if hdr->myip is non empty, to the address the node
* is announcing us. The port is taken from the packet header as well.
如果能够从link->fd从抽取或者hdr->myip非空,那么更新节点的ip地址信息为
节点宣告给我们的。端口也是从报文头部获取。
* If the address or port changed, disconnect the node link so that we'll
* connect again to the new address.
如果地址和端口修改了,断开节点的连接,这样我们将会重新连接新地址
* If the ip/port pair are already correct no operation is performed at
* all.
如果所有的ip/port对都时正产的,那么就不执行任何操作
* The function returns 0 if the node address is still the same,
* otherwise 1 is returned. */
函数返回0如果节点地址是一样的,否则返回1(有变化)
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
clusterMsg *hdr)
{
char ip[NET_IP_STR_LEN] = {0};
int port = ntohs(hdr->port);
int cport = ntohs(hdr->cport);
/* We don't proceed if the link is the same as the sender link, as this
* function is designed to see if the node link is consistent with the
* symmetric link that is used to receive PINGs from the node.
如果连接和发送者的连接是同一个,我们不处理,因为这个函数被设计为节点连接是否同
用来从节点收取的pong消息的对称连接 信息是否一致。
* As a side effect this function never frees the passed 'link', so
* it is safe to call during packet processing. */
作为副作用这个函数不释放传入的参数link,因此在报文处理中调用是安全的。
if (link == node->link) return 0; 是同一个连接,无需处理
nodeIp2String(ip,link,hdr->myip); 获取ip地址信息
if (node->port == port && node->cport == cport &&
strcmp(ip,node->ip) == 0) return 0; 信息一致,返回0
/* IP / port is different, update it. */ IP或者端口有变化,更新
memcpy(node->ip,ip,sizeof(ip));
node->port = port;
node->cport = cport;
if (node->link) freeClusterLink(node->link); 存在节点连接,清空
node->flags &= ~CLUSTER_NODE_NOADDR; 清除无地址信息标志
serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
node->name, node->ip, node->port);
/* Check if this is our master and we have to change the
* replication target as well. */
检查这个是否我们的主节点,那么我们需要改变复制的目标
if (nodeIsSlave(myself) && myself->slaveof == node) 是我们的主节点
replicationSetMaster(node->ip, node->port); 重新设置复制链路
return 1;
}
************************************************************************************************
/* Remove a node from the cluster. The function performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of slaves of its master, if
* it is a slave node.
*/
从集群删除节点。这个函数执行最高层次的清理,为底层的清理调用函数freeClusterNode。
这里是我们要做的事情:
1)标记该节点服务的所有槽为未分配
2)移除所有从该节点和其相关节点发出的故障报告
3)使用函数freeClusterNode释放节点,如果该节点是从节点,那么从哈希表和从主节点的从节点列表中删除。
void clusterDelNode(clusterNode *delnode) {
int j;
dictIterator *di;
dictEntry *de;
/* 1) Mark slots as unassigned. */ 标记槽为未分配
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->importing_slots_from[j] == delnode)
server.cluster->importing_slots_from[j] = NULL;
if (server.cluster->migrating_slots_to[j] == delnode)
server.cluster->migrating_slots_to[j] = NULL;
if (server.cluster->slots[j] == delnode)
clusterDelSlot(j); 清理槽的分配
}
/* 2) Remove failure reports. */ 移除故障报告
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == delnode) continue;
clusterNodeDelFailureReport(node,delnode); 清除该节点的故障报告
}
dictReleaseIterator(di);
/* 3) Free the node, unlinking it from the cluster. */
释放节点,从集群中取消关联这个节点
freeClusterNode(delnode);
}
/* Delete the specified slot marking it as unassigned.
* Returns C_OK if the slot was assigned, otherwise if the slot was
* already unassigned C_ERR is returned. */
删除指定的槽标记,还原槽为未分配状态。返回成功如果槽原来是分配的。
如果该槽原来没有被分配,那么返回错误
int clusterDelSlot(int slot) {
clusterNode *n = server.cluster->slots[slot];
if (!n) return C_ERR; 该槽未分配,无需清理,返回错误
serverAssert(clusterNodeClearSlotBit(n,slot) == 1); 清除标志
server.cluster->slots[slot] = NULL; 未分配
return C_OK;
}
/* Clear the slot bit and return the old value. */
清除槽的标志,返回原来的值
int clusterNodeClearSlotBit(clusterNode *n, int slot) {
int old = bitmapTestBit(n->slots,slot);
bitmapClearBit(n->slots,slot); 清除bitmap中的标志
if (old) n->numslots--; 原来存在的情况,总数减1
return old;
}
/* Low level cleanup of the node structure. Only called by clusterDelNode(). */
低层次的清理节点结构。只被函数clusterDelNode调用
void freeClusterNode(clusterNode *n) {
sds nodename;
int j;
/* If the node has associated slaves, we have to set
* all the slaves->slaveof fields to NULL (unknown). */
如果节点有关联的从节点,我们需要设置这些从节点的主节点为空
for (j = 0; j < n->numslaves; j++)
n->slaves[j]->slaveof = NULL;
/* Remove this node from the list of slaves of its master. */
从主节点的从节点列表中移除这个节点
是从节点,存在主节点,从主节点的从节点列表删除该从节点
if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
/* Unlink from the set of nodes. */
取消与节点集的关联
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);
/* Release link and associated data structures. */
释放连接和关联的数据结构
if (n->link) freeClusterLink(n->link);
listRelease(n->fail_reports);
zfree(n->slaves);
zfree(n);
}
************************************************************************************************
/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
* this function. */
这个函数只在握手之后使用,作为命令CLUSTER MEET的结果,当我们连接一对给定的IP/PORT,我们还没有节点的名字,
因此我们随便挑了个随机名字,当我们收到pong消息的时候,就会用这个函数修复这个随机名字
void clusterRenameNode(clusterNode *node, char *newname) {
int retval;
sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
node->name, newname);
retval = dictDelete(server.cluster->nodes, s); 从节点哈希表中删除该名字对应的节点
sdsfree(s);
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node); 添加新的节点到集群
}
************************************************************************************************
/* This function is called only if a node is marked as FAIL, but we are able
* to reach it again. It checks if there are the conditions to undo the FAIL
* state. */
这个函数只有在一个节点被标记为故障但是我们有可以访问的时候调用。
它检查是否有条件可以取消故障的标志
void clearNodeFailureIfNeeded(clusterNode *node) {
mstime_t now = mstime();
serverAssert(nodeFailed(node)); 确认标志了故障
/* For slaves we always clear the FAIL flag if we can contact the
* node again. */ 对于从节点,如果我们可以再次联系这个节点,那么就清除这个故障标志
if (nodeIsSlave(node) || node->numslots == 0) { 是从节点 或者 没有服务槽主节点
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: %s is reachable again.",
node->name,
nodeIsSlave(node) ? "replica" : "master without slots");
node->flags &= ~CLUSTER_NODE_FAIL; 清除故障标志
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
/* If it is a master and...
* 1) The FAIL state is old enough.
* 2) It is yet serving slots from our point of view (not failed over).
* Apparently no one is going to fix these slots, clear the FAIL flag. */
如果是一个主节点。。。
1)故障状态已经很久了.
2)从我们的角度看它还在提供槽的服务(没有进行故障转移)
显然没有节点来确认这些槽,清除故障标志
if (nodeIsMaster(node) && node->numslots > 0 && 主节点 并且 节点服务的槽大于0
(now - node->fail_time) >
(server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT)) 标记为故障的时间超过了 两倍节点超时时间
{
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
node->name);
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
}
************************************************************************************************
/* Reconfigure the specified node 'n' as a master. This function is called when
* a node that we believed to be a slave is now acting as master in order to
* update the state of the node. */
重新配置指定的节点n为主节点。当我们确认一个从节点现在扮演一个主节点,这个函数被调用来更新节点的状态
void clusterSetNodeAsMaster(clusterNode *n) {
if (nodeIsMaster(n)) return; 本身是主节点,无事可做,返回
if (n->slaveof) { 如果是从节点
clusterNodeRemoveSlave(n->slaveof,n); 从主节点的从节点列表中删除该从节点
if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO; 对本节点来来说,是可用于从节点迁移的主节点
}
n->flags &= ~CLUSTER_NODE_SLAVE; 取消丛节点标志
n->flags |= CLUSTER_NODE_MASTER; 设置主节点标志
n->slaveof = NULL; 为主节点
/* Update config and state. */ 更新配置和状态
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
************************************************************************************************
/* Delete all the slots associated with the specified node.
* The number of deleted slots is returned. */
删除与指定节点关联的所有槽。返回删除槽的数量
int clusterDelNodeSlots(clusterNode *node) {
int deleted = 0, j;
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(node,j)) { 获取槽是否属于删除节点
clusterDelSlot(j); 属于就删除槽
deleted++; 删除槽数量加1
}
}
return deleted;
}
************************************************************************************************
从主节点的从节点列表移除从节点
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
int j;
for (j = 0; j < master->numslaves; j++) {
if (master->slaves[j] == slave) {
if ((j+1) < master->numslaves) { 不是最后一个从节点,需要内存迁移
int remaining_slaves = (master->numslaves - j) - 1;
memmove(master->slaves+j,master->slaves+(j+1),
(sizeof(*master->slaves) * remaining_slaves));
}
master->numslaves--; 最后一个无需内存迁移,直接减1即可
if (master->numslaves == 0) 该节点没有从节点了,无需做节点转移
master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
return C_OK;
}
}
return C_ERR;
}
************************************************************************************************
/* This function is called when we receive a master configuration via a
* PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
* node, and the set of slots claimed under this configEpoch.
当我们通过PING,PONG或者UPDATE报文接收到一个主节点配置时,这个函数被调用。
我们接收到的是一个节点,一个节点的纪元,在这个纪元声明的槽集
* What we do is to rebind the slots with newer configuration compared to our
* local configuration, and if needed, we turn ourself into a replica of the
* node (see the function comments for more info).
我们要做的是用比本地配置更新的配置重新绑定槽,如果有需要,我们改变自己为节点的从节点(更多信息请看函数中的注释)
* The 'sender' is the node for which we received a configuration update.
* Sometimes it is not actually the "Sender" of the information, like in the
* case we receive the info via an UPDATE packet. */
sender是我们接收到的更新配置的节点。有时它实际上并不是信息的“发送者”,比如我们通过更新包接收信息。
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
int j;
clusterNode *curmaster, *newmaster = NULL;
/* The dirty slots list is a list of slots for which we lose the ownership
* while having still keys inside. This usually happens after a failover
* or after a manual cluster reconfiguration operated by the admin.
有争议的槽列表 是那些我们已经失去所有权但是还有键在里面的槽列表。
这种情况的发生通常在 故障转移之后 或者 在一次管理员手工重新配置之后发生
* If the update message is not able to demote a master to slave (in this
* case we'll resync with the master updating the whole key space), we
* need to delete all the keys in the slots we lost ownership. */
如果更新的信息不能是主节点降级为从节点(在这种情况下,我们将与主节点重新同步,更新整个键空间),
我们需要删除失去所有权的槽中的所有的键。
uint16_t dirty_slots[CLUSTER_SLOTS];
int dirty_slots_count = 0;
/* Here we set curmaster to this node or the node this node
* replicates to if it's a slave. In the for loop we are
* interested to check if slots are taken away from curmaster. */
这里我们设置这个节点或者这个节点的主节点为当前主节点。
在for循环中,我们感兴趣的是检查槽是否从当前节点移除。
curmaster = nodeIsMaster(myself) ? myself : myself->slaveof; 获取主节点,主节点才有服务的槽的所有权
if (sender == myself) { 如果是来自自身的信息,不更新返回
serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
return;
}
for (j = 0; j < CLUSTER_SLOTS; j++) { 遍历所有槽
if (bitmapTestBit(slots,j)) { 测试槽是否有归属,有归属的槽需要进一步判断
/* The slot is already bound to the sender of this message. */
槽已经绑定该信息的发送者
if (server.cluster->slots[j] == sender) continue;
/* The slot is in importing state, it should be modified only
* manually via redis-trib (example: a resharding is in progress
* and the migrating side slot was already closed and is advertising
* a new config. We still want the slot to be closed manually). */
槽处于导入状态,它只能被手工通过redis-trib修改。
(举例:一次槽迁移正在进行,迁移侧的槽已经被关闭并且宣告新的配置。
我们还是期望槽被手工关闭)
if (server.cluster->importing_slots_from[j]) continue; 有槽被导入
/* We rebind the slot to the new node claiming it if:
我们将插槽重新绑定到新节点,如果:
* 1) The slot was unassigned or the new node claims it with a
* greater configEpoch.
1)槽还没有被分配或者新节点声明它拥有更大的纪元
* 2) We are not currently importing the slot. */
2)我们目前没有导入槽
if (server.cluster->slots[j] == NULL || 槽没有主人
server.cluster->slots[j]->configEpoch < senderConfigEpoch) 小于新节点的纪元
{
/* Was this slot mine, and still contains keys? Mark it as
* a dirty slot. */
这个插槽是我的,里面还有键吗?将其标记为争议槽
if (server.cluster->slots[j] == myself && 槽是我的
countKeysInSlot(j) && 还有键
sender != myself) 发送者不是本人
{
dirty_slots[dirty_slots_count] = j; 将其标记为争议槽
dirty_slots_count++;争议槽加1
}
if (server.cluster->slots[j] == curmaster) 槽是当前主机的
newmaster = sender;发送者成为槽的新主节点
clusterDelSlot(j); 清空槽为未分配
clusterAddSlot(sender,j); 将槽分配给发送者
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| 保存配置
CLUSTER_TODO_UPDATE_STATE| 修改状态
CLUSTER_TODO_FSYNC_CONFIG); 重新同步
}
}
}
/* After updating the slots configuration, don't do any actual change
* in the state of the server if a module disabled Redis Cluster
* keys redirections. */
经过更新槽的配置,如果模块禁用redis集群键重定向,则不对服务器状态做任何实际修改。
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return;
/* If at least one slot was reassigned from a node to another node
* with a greater configEpoch, it is possible that:
如果至少一个节点被重新从一个节点分配到另外有更大纪元的节点,那么可能如下
* 1) We are a master left without slots. This means that we were
* failed over and we should turn into a replica of the new
* master.
1)我们只是一个没有槽的主节点。这就意味着我们是通过故障转移转变为一个新的主节点的从节点
* 2) We are a slave and our master is left without slots. We need
* to replicate to the new slots owner. */
2)我们是从节点并且我们的主节点没有槽。我们需要从槽的新拥有者复制。
if (newmaster && curmaster->numslots == 0) { 主节点但是没有服务槽
serverLog(LL_WARNING,
"Configuration change detected. Reconfiguring myself "
"as a replica of %.40s", sender->name);
clusterSetMaster(sender); 设置主节点
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) { 存在争议槽
/* If we are here, we received an update message which removed
* ownership for certain slots we still have keys about, but still
* we are serving some slots, so this master node was not demoted to
* a slave.
如果到了这里,那么我们收到了一个更新的消息, 已经移除所有权的特定槽,但是我们还拥有里面的键,
我们还在为部分槽服务,因此这个主节点没有被降级为从节点。
* In order to maintain a consistent state between keys and slots
* we need to remove all the keys from the slots we lost. */
为了保持键和槽之间的一致状态,我们需要从丢失的槽中删除所有键
for (j = 0; j < dirty_slots_count; j++)
delKeysInSlot(dirty_slots[j]); 删除争议槽中的键
}
}
************************************************************************************************
/* Send an UPDATE message to the specified link carrying the specified 'node'
* slots configuration. The node name, slots bitmap, and configEpoch info
* are included. */
向包含指定“节点”槽配置的指定链接发送更新消息。包括节点名称、槽位图和纪元信息
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
if (link == NULL) return; 如果连接是空的,返回
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE); 组装集群消息
memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN); 填充节点名
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch); 填充纪元
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));填充槽
clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen)); 向指定节点发送消息
}
/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
组装消息头,指针hdr必须指向一个缓存至少结构体clusterMsg长度个字节
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;
/* If this node is a master, we send its slots bitmap and configEpoch.
* If this node is a slave we send the master's information instead (the
* node is flagged as slave so the receiver knows that it is NOT really
* in charge for this slots. */
如果节点是主节点,我们发送它槽的比特位图和配置纪元。
如果节点是从接地那,我们发送主节点的信息(节点的标志是从节点,这样接受者就知道这个不是真的负责这些槽的)
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr)); 初始化结构体
hdr->ver = htons(CLUSTER_PROTO_VER); 版本
hdr->sig[0] = 'R'; 签名
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type); 类型
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN); 发送者名字
/* If cluster-announce-ip option is enabled, force the receivers of our
* packets to use the specified address for this node. Otherwise if the
* first byte is zero, they'll do auto discovery. */
如果参数cluster-announce-ip的选项被允许,强制我们报文的接受者使用这个节点指定的地址。
否则如果第一个字节是0,他们将通过自动发现地址。
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) { 非空,使用指定地址
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
/* Handle cluster-announce-port as well. */ 处理集群公告端口
int port = server.tls_cluster ? server.tls_port : server.port; 是否使用SSL
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(port + CLUSTER_PORT_INCR);
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));添加服务的槽信息
memset(hdr->slaveof,0,CLUSTER_NAMELEN);初始化主节点的名字信息
if (myself->slaveof != NULL) 主节点非空,即本身为从节点
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN); 更新主节点名
hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags); 标志
hdr->state = server.cluster->state; 状态
/* Set the currentEpoch and configEpochs. */ 设置状态当前纪元 和 节点配置纪元
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);
/* Set the replication offset. */ 设置复制偏移量
if (nodeIsSlave(myself)) 是从节点
offset = replicationGetSlaveOffset(); 获取从节点已经复制的偏移量
else
offset = server.master_repl_offset; 主机等待复制的偏移量
hdr->offset = htonu64(offset);
/* Set the message flags. */ 设置消息标志
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; 故障转移中设置 暂停标志
/* Compute the message length for certain messages. For other messages
* this is up to the caller. */
为特定消息计算消息长度。对于另外的消息,取决于调用方
if (type == CLUSTERMSG_TYPE_FAIL) { 故障消息
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) { 更新消息
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen); 总长度
/* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
对于PING, PONG, 和 MEET命令, 由调用者来确定总长度字段值
}
/* Put stuff into the send buffer.
将内容放到发送缓冲区
* It is guaranteed that this function will never have as a side effect
* the link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with the same link later. */
可以保证此函数不会对要失效的链接产生副作用,
因此可以安全地从事件处理程序调用此函数,这些事件处理程序稍后将处理相同的链接
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0) 没有缓冲区 但是有消息数据
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); 直接发送
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); 有缓冲区的情况,填充缓冲区
/* Populate sent messages stats. */ 填充发送消息的统计信息
clusterMsg *hdr = (clusterMsg*) msg;
uint16_t type = ntohs(hdr->type);
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_sent[type]++; 等待发送消息加1
}
************************************************************************************************
/* This function is called when this node is a master, and we receive from
* another master a configuration epoch that is equal to our configuration
* epoch.
*
* BACKGROUND
*
* It is not possible that different slaves get the same config
* epoch during a failover election, because the slaves need to get voted
* by a majority. However when we perform a manual resharding of the cluster
* the node will assign a configuration epoch to itself without to ask
* for agreement. Usually resharding happens when the cluster is working well
* and is supervised by the sysadmin, however it is possible for a failover
* to happen exactly while the node we are resharding a slot to assigns itself
* a new configuration epoch, but before it is able to propagate it.
*
* So technically it is possible in this condition that two nodes end with
* the same configuration epoch.
*
* Another possibility is that there are bugs in the implementation causing
* this to happen.
*
* Moreover when a new cluster is created, all the nodes start with the same
* configEpoch. This collision resolution code allows nodes to automatically
* end with a different configEpoch at startup automatically.
*
* In all the cases, we want a mechanism that resolves this issue automatically
* as a safeguard. The same configuration epoch for masters serving different
* set of slots is not harmful, but it is if the nodes end serving the same
* slots for some reason (manual errors or software bugs) without a proper
* failover procedure.
*
* In general we want a system that eventually always ends with different
* masters having different configuration epochs whatever happened, since
* nothign is worse than a split-brain condition in a distributed system.
*
* BEHAVIOR
*
* When this function gets called, what happens is that if this node
* has the lexicographically smaller Node ID compared to the other node
* with the conflicting epoch (the 'sender' node), it will assign itself
* the greatest configuration epoch currently detected among nodes plus 1.
*
* This means that even if there are multiple nodes colliding, the node
* with the greatest Node ID never moves forward, so eventually all the nodes
* end with a different configuration epoch.
*/
void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both masters. */
前提提交:节点都具有同样的配置纪元 并且 都是 主节点
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return; 判断前提条件
/* Don't act if the colliding node has a smaller Node ID. */
不用执行,如果冲突的节点拥有更小的节点ID
if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
/* Get the next ID available at the best of this node knowledge. */
以该节点的知识尽可能获取下一个可用ID
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1); 保存文件配置
serverLog(LL_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}
************************************************************************************************
/* Vote for the node asking for our vote if there are the conditions. */
如果有条件(还没有投过票),为请求我们投票的节点投票
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
clusterNode *master = node->slaveof;
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); 请求的当前纪元
uint64_t requestConfigEpoch = ntohu64(request->configEpoch); 请求的配置纪元
unsigned char *claimed_slots = request->myslots; 请求的槽
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK; 强制投票请求标志
int j;
/* IF we are not a master serving at least 1 slot, we don't have the
* right to vote, as the cluster size in Redis Cluster is the number
* of masters serving at least one slot, and quorum is the cluster
* size + 1 */
如果我们不是一个至少服务一个槽的主节点,我们没有投票的权限,REDIS集群的大小是至少服务一个槽的主节点。
法定数量是集群大小+1
if (nodeIsSlave(myself) || myself->numslots == 0) return;
/* Request epoch must be >= our currentEpoch.
* Note that it is impossible for it to actually be greater since
* our currentEpoch was updated as a side effect of receiving this
* request, if the request epoch was greater. */
请求节点的纪元必须大于等于 我们当前的纪元。
注意 请求纪元大于我们当前纪元是不太可能发生的情况,因为我们当前纪元会被更新,如果收到请求的纪元大于我们当前纪元。
if (requestCurrentEpoch < server.cluster->currentEpoch) { 比我们当前纪元小,不要做任何事情
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}
/* I already voted for this epoch? Return ASAP. */ 我已经为这个节点投票? 尽快返回
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { 表示投过票了
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}
/* Node must be a slave and its master down.
* The master can be non failing if the request is flagged
* with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
节点必须是一个从节点并且它的主节点下线了。主节点可能没有故障如果请求标记为CLUSTERMSG_FLAG0_FORCEACK(手工故障转移)
if (nodeIsMaster(node) || master == NULL || 是主节点 或者 主节点为空
(!nodeFailed(master) && !force_ack)) 主节点没有失败并且 不是强制 这几种情况下,都不能故障转移
{
if (nodeIsMaster(node)) { 本身是主节点的情况
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
} else if (master == NULL) { 是从节点,但是没有主节点信息
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
} else if (!nodeFailed(master)) { 主节点没有下线
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}
/* We did not voted for a slave about this master for two
* times the node timeout. This is not strictly needed for correctness
* of the algorithm but makes the base case more linear. */
我们不给从节点投票,如果它的主节点才2倍节点超时时间之内.
这个不是保证算法正确的严格需要,但是确保基本情况更加可靠
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{ 从超时时间上确保 确保主节点下线
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
/* The slave requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the masters currently serving the same
* slots in the current configuration. */
从节点的请求投票必须有一个对于声明槽大于等于主节点当前服务的同样槽的纪元配置。
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue; 该槽是否在请求中存在,不存在
if (server.cluster->slots[j] == NULL || 该槽没有所属节点
server.cluster->slots[j]->configEpoch <= requestConfigEpoch) 槽的所属节点的纪元 小于 请求纪元
{
continue;
}
/* If we reached this point we found a slot that in our current slots
* is served by a master with a greater configEpoch than the one claimed
* by the slave requesting our vote. Refuse to vote for this slave. */
如果到了这里,我们发现了一个槽,该槽由我们当前的主节点服务,并且拥有比从节点请求投票声明的更大纪元。
拒绝给这个请求从节点投票
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}
/* We can vote for this slave. */ 我们能为这个从节点投票了
server.cluster->lastVoteEpoch = server.cluster->currentEpoch; 投票纪元更新
node->slaveof->voted_time = mstime();跟新投票时间
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node); 发送故障转移投票
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
/* Send a FAILOVER_AUTH_ACK message to the specified node. */
发送信息FAILOVER_AUTH_ACK给指定的节点(即给该节点投票)
void clusterSendFailoverAuth(clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); 组装投票信息
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,(unsigned char*)buf,totlen); 向指定地址发送信息
}
************************************************************************************************
typedef struct moduleClusterNodeInfo {
int flags;
char ip[NET_IP_STR_LEN];
int port;
char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */
} mdouleClusterNodeInfo;
/* We have an array of message types: each bucket is a linked list of
* configured receivers. */
我们拥有一组消息类型:每个桶都是已配置接收器的链接列表
static moduleClusterReceiver *clusterReceivers[UINT8_MAX];
/* Dispatch the message to the right module receiver. */
将消息分配到正确的模块接收者
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
moduleClusterReceiver *r = clusterReceivers[type]; 定义接收者数组
while(r) {
if (r->module_id == module_id) { 匹配模块ID
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = r->module;
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0);
r->callback(&ctx,sender_id,type,payload,len); 调用注册的回调函数
moduleFreeContext(&ctx);
return;
}
r = r->next;
}
}
************************************************************************************************
/* This function is called every time we get a failure report from a node.
* The side effect is to populate the fail_reports list (or to update
* the timestamp of an existing report).
每次当我们从节点得到一个故障报告就调用这个函数。这个函数的副作用就是填充故障报告列表(或者更新存在报告的时间戳)
* 'failing' is the node that is in failure state according to the
* 'sender' node.
根据这个发送者节点的报告,该节点的将来状态是故障。
* The function returns 0 if it just updates a timestamp of an existing
* failure report from the same sender. 1 is returned if a new failure
* report is created. */
函数返回0如果只是更新一个已经存在同发送者故障报告的时间戳。返回1如果是创建了一个新的故障报告。
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
list *l = failing->fail_reports; 获取故障报告列表
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
/* If a failure report from the same sender already exists, just update
* the timestamp. */
如果是来自同样发送者的故障报告,只需要更新时间戳即可
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) { 来自同样发送者
fr->time = mstime(); 更新时间戳
return 0;
}
}
/* Otherwise create a new report. */ 否则创建新的故障报告
fr = zmalloc(sizeof(*fr));
fr->node = sender;
fr->time = mstime();
listAddNodeTail(l,fr); 加入到故障列表
return 1;
}
************************************************************************************************
/* This function checks if a given node should be marked as FAIL.
* It happens if the following conditions are met:
这个函数检查给定的节点是否应该标记为故障。如果满足下面的条件就会发生:
* 1) We received enough failure reports from other master nodes via gossip.
* Enough means that the majority of the masters signaled the node is
* down recently.
1)我们从其它主节点通过gossip协议收到了足够多的故障报告。足够多意味着大部分主节点最近标记该节点下线。
* 2) We believe this node is in PFAIL state.
2)我们相信该节点处于PFAIL状态
* If a failure is detected we also inform the whole cluster about this
* event trying to force every other node to set the FAIL flag for the node.
如果故障转移被检测到,我们也通知整个集群关于这个事件,尝试强制另外每个节点对该节点去设置故障标志
* Note that the form of agreement used here is weak, as we collect the majority
* of masters state during some time, and even if we force agreement by
* propagating the FAIL message, because of partitions we may not reach every
* node. However:
注意到此处使用的协议很弱,因为我们收集大部分主节点的状态期间,即使我们通过传播故障信息强制同意,
但是因为分区导致我们不能达到每个节点。然而:
* 1) Either we reach the majority and eventually the FAIL state will propagate
* to all the cluster.
1)要么我们达到多数,最终失败状态将传播到所有集群
* 2) Or there is no majority so no slave promotion will be authorized and the
* FAIL flag will be cleared after some time.
*/
2)要么 没有达到多数票,这样从节点不会被提升,(主节点)故障标记将会在将来清除
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
int needed_quorum = (server.cluster->size / 2) + 1; 法定人数 集群大小的一半 + 1
if (!nodeTimedOut(node)) return; /* We can reach it. */ 不超时,我们可以交流
if (nodeFailed(node)) return; /* Already FAILing. */ 已经标记故障
failures = clusterNodeFailureReportsCount(node); 统计报告故障的节点数
/* Also count myself as a voter if I'm a master. */ 如果本节点是主节点,也要计数
if (nodeIsMaster(myself)) failures++;
if (failures < needed_quorum) return; /* No weak agreement from masters. */
少于法定人数,弱协议对该主节点不成立
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
/* Mark the node as failing. */ 标记该节点为故障
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */
广播故障节点的名到每个其它节点,强制所有其它可带节点标志该节点失败
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
/* Send a FAIL message to all the nodes we are able to contact.
* The FAIL message is sent when we detect that a node is failing
* (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
* we switch the node state to CLUSTER_NODE_FAIL and ask all the other
* nodes to do the same ASAP. */
发送故障信息到每个我们可以联系的节点。
当我们检测到一个节点处于故障状态(CLUSTER_NODE_PFAIL)并且我们收到了gossip协议的归该故障的确认信息,
那么就就发送这个故障信息: 我们转变节点的状态到CLUSTER_NODE_FAIL并且请求所有其它节点尽快做相同的事情
void clusterSendFail(char *nodename) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); 组装故障信息
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
clusterBroadcastMessage(buf,ntohl(hdr->totlen)); 广播故障信息
}
/* Send a message to all the nodes that are part of the cluster having
* a connected link.
向属于具有连接链路的群集的所有节点发送消息
* It is guaranteed that this function will never have as a side effect
* some node->link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with node links later. */
可以保证此函数不会对某些node->link产生副作用而失效,因此后面从填充节点连接的事件处理器调用这个函数是安全的。
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes); 遍历所有节点
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue; 没有联系的
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) 自己活着处于握手状态
continue;
clusterSendMessage(node->link,buf,len); 给连接节点发送消息
}
dictReleaseIterator(di);
}
************************************************************************************************
/* Remove the failing report for 'node' if it was previously considered
* failing by 'sender'. This function is called when a node informs us via
* gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
为节点移除故障报告,如果该节点之前被发送者报告处于故障状态。当一个节点通过gossip协议告知我们,
从它的视角看一个节点是正常的(没有故障 或者 准故障标志)
* Note that this function is called relatively often as it gets called even
* when there are no nodes failing, and is O(N), however when the cluster is
* fine the failure reports list is empty so the function runs in constant
* time.
请注意,此函数被调用的频率相对较高,因为即使在没有节点发生故障时也会被调用,
并且为O(N),但是当集群状况良好时,故障报告列表为空,因此该函数以常量时间运行
* The function returns 1 if the failure report was found and removed.
* Otherwise 0 is returned. */
函数返回1如果故障报告被发现和移除,否则返回0
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
/* Search for a failure report from this sender. */ 搜索来自此发件人的故障报告
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) break;
}
if (!ln) return 0; /* No failure report from this sender. */ 没有来自此发件人的故障报告
/* Remove the failure report. */ 从故障报告移除
listDelNode(l,ln); 移除
clusterNodeCleanupFailureReports(node); 清除超时的故障报告
return 1;
}
************************************************************************************************
/* Return the number of external nodes that believe 'node' is failing,
* not including this node, that may have a PFAIL or FAIL state for this
* node as well. */
返回另外节点相信节点处于故障的数量,不包括本身节点,也可能具有此节点的PFAIL或FAIL状态
int clusterNodeFailureReportsCount(clusterNode *node) {
clusterNodeCleanupFailureReports(node); 删除过久的节点故障报告
return listLength(node->fail_reports); 统计节点故障报告的数量
}
************************************************************************************************
/* Remove failure reports that are too old, where too old means reasonably
* older than the global node timeout. Note that anyway for a node to be
* flagged as FAIL we need to have a local PFAIL state that is at least
* older than the global node timeout, so we don't just trust the number
* of failure reports from other nodes. */
移除太旧的故障报告,太旧意味着比全局节点超时时间旧。注意到对于任何要标记为FAIL的节点,
我们至少有一个超过全局节点超时时间的本地PFAIL状态(意思就是本地变成PFAIL的时间比全局节点超时时间更久),
因此我们实际上不仅仅相信其它节点的故障报告数量
void clusterNodeCleanupFailureReports(clusterNode *node) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
mstime_t maxtime = server.cluster_node_timeout *
CLUSTER_FAIL_REPORT_VALIDITY_MULT; 两倍的全局节点超时时间
mstime_t now = mstime();
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (now - fr->time > maxtime) listDelNode(l,ln); 超过两倍节点超时两倍的时间,删除之
}
}
************************************************************************************************
/* Return non-zero if the specified node ID exists in the blacklist.
* You don't need to pass an sds string here, any pointer to 40 bytes
* will work. */
如果指定的节点ID存在黑名单中,返回非0. 在这里不需要传递sds字符串,任何指向40个字节的指针即可工作
int clusterBlacklistExists(char *nodeid) {
sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
int retval;
clusterBlacklistCleanup(); 删除过时的黑名单节点
retval = dictFind(server.cluster->nodes_black_list,id) != NULL; 查找是否在黑名单哈希表中
sdsfree(id);
return retval;
}
/* Before of the addNode() or Exists() operations we always remove expired
* entries from the black list. This is an O(N) operation but it is not a
* problem since add / exists operations are called very infrequently and
* the hash table is supposed to contain very little elements at max.
* However without the cleanup during long uptimes and with some automated
* node add/removal procedures, entries could accumulate. */
在执行addNode或Exists操作之前,我们总是从黑名单中删除过期的条目。
这是一个O(N)操作,但这不是一个问题,因为很少调用add/exists操作,
并且哈希表最多只包含很少的元素。但是,如果在长时间正常运行期间没有清理,
并且使用一些自动节点添加/删除过程,条目可能会累积。
void clusterBlacklistCleanup(void) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes_black_list);
while((de = dictNext(di)) != NULL) {
int64_t expire = dictGetUnsignedIntegerVal(de);
if (expire < server.unixtime) 过时就删除
dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
}
dictReleaseIterator(di);
}
************************************************************************************************
/* Set the specified node 'n' as master for this node.
* If this node is currently a master, it is turned into a slave. */
设置指定的节点n为这个节点的主节点。如果这个节点是当前是主节点,那它就变为从节点
void clusterSetMaster(clusterNode *n) {
serverAssert(n != myself);确定不是本身
serverAssert(myself->numslots == 0); 服务的槽数为0,否则不能变为从节点
if (nodeIsMaster(myself)) { 本身是主节点
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO); 取消 主节点 和 可迁移标志
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots(); 清理所有槽的迁移和导入状态
} else {
if (myself->slaveof) 是从节点,且有主节点
clusterNodeRemoveSlave(myself->slaveof,myself); 从之前的主节点的列表移除该从节点
}
myself->slaveof = n;
clusterNodeAddSlave(n,myself);添加到新的主节点的从节点列表中(设置为新主节点的从节点)
replicationSetMaster(n->ip, n->port); 从新主节点赋值数据
resetManualFailover(); 一切结束,重置手工故障转移状态
}
************************************************************************************************