redis6.0.5之cluster.c阅读笔记3-定时任务
************************************************************************************************
在server.c中,有集群的定时任务,每秒执行10次
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。。。
/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
。。。。。。
}
************************************************************************************************
/* -----------------------------------------------------------------------------
* CLUSTER cron job 集群定时任务
* -------------------------------------------------------------------------- */
/* This is executed 10 times every second */ 这个函数每秒执行10次,因为整个serverCron的频率是10HZ,即每秒10次
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */ 没有有效从节点的主节点数量
int max_slaves; /* Max number of ok slaves for a single master. */ 单个主节点拥有的最大有效从节点数
int this_slaves; /* Number of ok slaves for our master (if we are slave). */我们主节点拥有的有效从节点数(如果我是从节点)
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;
iteration++; /* Number of times this function was called so far. */ 到目前位置该函数被调用的次数
/* We want to take myself->ip in sync with the cluster-announce-ip option.
* The option can be set at runtime via CONFIG SET, so we periodically check
* if the option changed to reflect this into myself->ip. */
我们需要将参数myself->ip同配置项cluster-announce-ip保持同步。
这个配置项可以通过命令CONFIG SET在运行时设置,
因此我们定期检查配置项是否改变,如果改变就需要反映到参数myself->ip上来。
{
static char *prev_ip = NULL;
char *curr_ip = server.cluster_announce_ip;
int changed = 0;
if (prev_ip == NULL && curr_ip != NULL) changed = 1; 之前没有ip地址,现在设置了ip地址
else if (prev_ip != NULL && curr_ip == NULL) changed = 1;之前有ip地址,现在去掉了ip地址
else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1; 都有ip地址,但是不同
if (changed) { 如果有变化的情况下
if (prev_ip) zfree(prev_ip);
prev_ip = curr_ip; 指向同样的地址
if (curr_ip) {
/* We always take a copy of the previous IP address, by
* duplicating the string. This way later we can check if
* the address really changed. */
我们总是通过复制字符串来复制上一个IP地址。这样以后我们就可以检查地址是否真的改变了
prev_ip = zstrdup(prev_ip);深拷贝
strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
myself->ip[NET_IP_STR_LEN-1] = '\0';
} else {
myself->ip[0] = '\0'; /* Force autodetection. */ 强制自动检测
}
}
}
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
握手超时是指从节点中移除未转变为正常节点的握手节点的时间。
通常使用值NODE_TIMEOUT,但当NODE_TIMEOUT值太小时,我们使用1秒的值。
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000; 小于1秒使用1秒
/* Update myself flags. */ 更新自己的标志
clusterUpdateMyselfFlags(); 更新自己的配置,目前主要为CLUSTER_NODE_NOFAILOVER标识
/* Check if we have disconnected nodes and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
检查我们是否有断开连接的节点并且尝试重新连接。
也同时更新一些统计,这可以用来在代码的其他部分做出更好的决策。
di = dictGetSafeIterator(server.cluster->nodes);
server.cluster->stats_pfail_nodes = 0;
while((de = dictNext(di)) != NULL) { 遍历集群的 节点
clusterNode *node = dictGetVal(de);
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
对重新连接自己或我们没有地址的节点不感兴趣
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
if (node->flags & CLUSTER_NODE_PFAIL) 节点准故障标志
server.cluster->stats_pfail_nodes++;准故障节点统计数加1
/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
处于握手状态的节点的有限寿命等于配置的节点超时
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node); 删除该节点
continue;
}
if (node->link == NULL) { 节点的连接为空
clusterLink *link = createClusterLink(node); 创建新连接
link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket(); 是否是ssl连接
connSetPrivateData(link->conn, link); 设置关联数据
if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
clusterLinkConnectHandler) == -1) { 进行连接
/* We got a synchronous error from connect before
* clusterSendPing() had a chance to be called.
* If node->ping_sent is zero, failure detection can't work,
* so we claim we actually sent a ping now (that will
* be really sent as soon as the link is obtained). */
在有机会调用函数clusterSendPing之前,我们从连接得到了一个同步的错误。
如果参数node->ping_sent 的值为0,那么故障检测就不能工作,
因此我们声称发送了一个ping命令(一旦我们获得连接就马上补发)
if (node->ping_sent == 0) node->ping_sent = mstime(); 声称发送了ping命令,实际后续补发
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
freeClusterLink(link);
continue;
}
node->link = link;
}
}
dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
每10次迭代ping一个随机选择的节点,因此我们通常每秒ping一个随机几点
if (!(iteration % 10)) { 每10次执行一次
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
检查一些随机节点,对最老时间pong_received的节点进行ping
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
不要对断开连接或者当前正在ping的节点ping
if (this->link == NULL || this->ping_sent != 0) continue; 无连接 或者 已经发出ping但还没有收到回复
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) 是自己 或者 处于握手状态的不ping
continue;
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this; 老的节点
min_pong = this->pong_received; 老的时间
}
}
if (min_pong_node) { 存在
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); 对节点发送ping命令
}
}
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
遍历节点,检查我们是否需要一些事情标记为故障。这个循环主要负责如下事情:
1)检查是否有孤独的主节点(就是没有有效从节点的主节点)
2)统计单一主节点拥有的最大有效从节点数
3)如果我是一个从节点,统计我们的主节点的从节点数量
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */ 每次迭代都使用更新的事件
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue; 是自身 或者 节点没有地址 或者 处于握手状态 返回
/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
检查孤独节点,仅当当前实例是可能迁移到其他主节点的从节点时才有用。
自身是从节点 并且 节点是主节点 并且 节点不是故障的
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node); 统计有效的从节点数
/* A master is orphaned if it is serving a non-zero number of
* slots, have no working slaves, but used to have at least one
* slave, or failed over a master that used to have slaves. */
一个主节点是孤独的 如果它服务不少于1个槽,没有可工作的从节点,
但是曾经至少有一个从节点,或者 曾经有从节点的主节点进过了故障转移
if (okslaves == 0 && node->numslots > 0 && 有效的从节点数为0 并且 服务的槽数大于0
node->flags & CLUSTER_NODE_MIGRATE_TO) 可以迁入标志
{
orphaned_masters++;
}
if (okslaves > max_slaves) max_slaves = okslaves; 找出最大的有效从节点数
if (nodeIsSlave(myself) && myself->slaveof == node) 如果这个主节点恰好是我们的主节点
this_slaves = okslaves;
}
/* If we are not receiving any data for more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
如果在超过一半的集群超时时间内没有收到任何数据,请重新连接链接:即使节点处于活动状态,也可能存在连接问题。
mstime_t ping_delay = now - node->ping_sent; ping发出去的时间间隔
mstime_t data_delay = now - node->data_received; 收到数据的时间间隔
if (node->link && /* is connected */ 有连接
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */ 连接建立时间超过了集群节点超时时间
node->ping_sent && /* we already sent a ping */ 已经发送了ping命令
node->pong_received < node->ping_sent && /* still waiting pong */ 还在等待ping回复
/* and we are waiting for the pong more than timeout/2 */ 等到pong的时间超过了 超时时间的一半
ping_delay > server.cluster_node_timeout/2 &&
/* and in such interval we are not seeing any traffic at all. */
并且在这样的时间间隔内,我们看不到任何堵塞情况
data_delay > server.cluster_node_timeout/2) 回复时间超过了集群超时的一半
{
/* Disconnect the link, it will be reconnected automatically. */ 取消连接,将在后面自动尝试重连
freeClusterLink(node->link);
}
/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
如果这个实例当下没有活跃的ping,并且回复的pong超过了集群超时的一半,现在发送一个ping命令,
确保所有节点在没有太大延迟的情况下发出ping
if (node->link && 存在连接
node->ping_sent == 0 && 没有发出ping
(now - node->pong_received) > server.cluster_node_timeout/2) 收到pong的时间超过集群超时的一半时间
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
如果是一个主节点,并且一个从节点请求进行手工故障转移,持续ping该主节点
if (server.cluster->mf_end && 存在故障转移超时设置
nodeIsMaster(myself) && 本身是主节点
server.cluster->mf_slave == node && 从节点的主节点 是该节点
node->link) 主节点连接存在
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* Check only if we have an active ping for this instance. */
对这个实例,检查我们是否有活跃的ping
if (node->ping_sent == 0) continue;
/* Check if this node looks unreachable.
* Note that if we already received the PONG, then node->ping_sent
* is zero, so can't reach this code at all, so we don't risk of
* checking for a PONG delay if we didn't sent the PING.
检查此节点是否看起来无法访问,注意到 如果我们已经收到了PONG,那么设置node->ping_sent为0,
因此不可能达到这里的代码,所以我们如果不发送ping,就不会有检查PONG延迟的风险。
* We also consider every incoming data as proof of liveness, since
* our cluster bus link is also used for data: under heavy data
* load pong delays are possible. */
我们也考虑每个进来数据作为存活的证明,由于我们的集群总线连接也用于数据:
在高数据负载下,可能会出现pong延迟。
mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
data_delay;
if (node_delay > server.cluster_node_timeout) { 延迟超过了集群节点超时时间
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
达到超时,设置节点可能为故障,如果它还没有处于这个状态。
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { 没有处于准故障或者故障状态
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL; 设置准故障状态
update_state = 1;
}
}
}
dictReleaseIterator(di);
/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
如果我们是一个从节点,但是复制仍处于关闭,如果我们知道它主节点的地址并且已经上线了,那么就打开复制
if (nodeIsSlave(myself) && 是从节点
server.masterhost == NULL && 主节点地址为空
myself->slaveof && 存在主节点
nodeHasAddr(myself->slaveof)) 主节点有地址
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); 从指定主节点地址复制数据
}
/* Abourt a manual failover if the timeout is reached. */
终止手工故障转移如果已经超时了
manualFailoverCheckTimeout(); 终止手工故障转移
if (nodeIsSlave(myself)) { 如果是从节点
clusterHandleManualFailover(); 处理手工故障转移
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
clusterHandleSlaveFailover(); 开启从节点故障转移
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
如果存在孤独的从节点(这里应该为笔误,应该为主节点),并且我们具有最多有效从节点的主节点的一个从节点,
考虑迁移到从节点到孤独的主节点。注意如果没有主节点有超过两个有效的从节点时,迁移是没有意义的,
(因为只有一个从节点的话,迁移之后自己也会变成孤独的主节点,所以没有意义)
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
存在孤独的主节点 并且 最大从节点数大于等于2 并且 该从节点恰好是具有最大从节点的主节点的一个从节点
clusterHandleSlaveMigration(max_slaves); 迁移从节点
}
if (update_state || server.cluster->state == CLUSTER_FAIL) 状态有更新 或者 集群处于失败的情况
clusterUpdateState(); 更新状态
}
************************************************************************************************
/* A connect handler that gets called when a connection to another node
* gets established.
*/
当和另外一个节点建立连接时候调用连接处理器
void clusterLinkConnectHandler(connection *conn) {
clusterLink *link = connGetPrivateData(conn); 获取连接伴随的数据
clusterNode *node = link->node;
/* Check if connection succeeded */ 检查连接是否存活
if (connGetState(conn) != CONN_STATE_CONNECTED) { 判断连接状态
serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
node->name, node->ip, node->cport,
connGetLastError(conn));
freeClusterLink(link); 失败需要释放资源
return;
}
/* Register a read handler from now on */ 从现在开始注册读取处理器
connSetReadHandler(conn, clusterReadHandler); 用来处理返回的数据报
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
尽快将PING在新连接中排队:这对于避免故障检测中的误报至关重要
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
如果节点标记为MEET,我们发送一个MEET的消息代替一个PING消息,强制接受节点添加我们到他的节点表中
mstime_t old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); 发送新的PING
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
如果连接断开之前已经有活跃的ping,我们想要恢复PING时间,否则将被函数clusterSendPing调用替换
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
再第一个报文发出之后我们能够清理标志。如果我们从来没有接受过一个PONG回复,我们将不再发送新的报文到这个节点。
相反经过一个PONG被收到后,我们不再处于 meet/handshake 状态,我们需要发出一个正常的PING报文
node->flags &= ~CLUSTER_NODE_MEET; 去掉MEET标志
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
************************************************************************************************
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
发出一个PING或者PONG报文到指定的节点,确保添加足够的gossip协议信息
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */截止目前添加的gossip协议段信息的数量
int wanted; /* Number of gossip sections we want to append if possible. */ 如果可能我们想要添加的gossip协议段数量
int totlen; /* Total packet length. */ 保本总长度
/* freshnodes is the max number of nodes we can hope to append at all:
* nodes available minus two (ourself and the node we are sending the
* message to). However practically there may be less valid nodes since
* nodes in handshake state, disconnected, are not considered. */
参数freshnodes是我们期望可以添加的最大节点数量:
课添加的节点数需要减去2(我们自己和要发送消息到的对方节点).
然而实际可用的节点不多,因为处于握手状态,断开连接都不会考虑
int freshnodes = dictSize(server.cluster->nodes)-2; 所有节点数减去2
/* How many gossip sections we want to add? 1/10 of the number of nodes
* and anyway at least 3. Why 1/10?
多少gossip协议段我们想要添加?节点数量的十分之一或者至少3个。为什么是十分之一呢?
* If we have N masters, with N/10 entries, and we consider that in
* node_timeout we exchange with each other node at least 4 packets
* (we ping in the worst case in node_timeout/2 time, and we also
* receive two pings from the host), we have a total of 8 packets
* in the node_timeout*2 falure reports validity time. So we have
* that, for a single PFAIL node, we can expect to receive the following
* number of failure reports (in the specified window of time):
如果我们拥有N个节点,那么就是n/10个段,我们考虑到在节点超时之内,我们和其它节点交互超过至少4个报文。
(我们最坏在节点超时一半的时间发送ping命令),我们在节点超时时间两倍的故障报告有效时间内一共有8个报文。
因此我们拥有如下情况,对于单个PFAIL节点,我们期望接收到下面数量的故障报告(在指定窗口时间内)
* PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
*
* PROB = probability of being featured in a single gossip entry, 节点出现在单个gossip协议段一个实体中的概率
* which is 1 / NUM_OF_NODES. 节点数量的倒数
* ENTRIES = 10.
* TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS. 8倍的主节点数量
*
* If we assume we have just masters (so num of nodes and num of masters
* is the same), with 1/10 we always get over the majority, and specifically
* 80% of the number of nodes, to account for many masters failing at the
* same time.
如果假设我们只有主节点(这样节点数量和主节点数量一致),只需要十分之一就可以得到大多数,
即达到大概80%的节点数量,在同时间段有大部分主节点说明故障节点。
因为根据公式 PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS =
(1 / NUM_OF_NODES) * (NUM_OF_NODES * 1/10) * (2 * 4 * NUM_OF_MASTERS) =
(8/10) * NUM_OF_MASTERS
所以用十分之一的量,在两倍节点失效的时间内,大概可以收到百分之八十节点的故障报告
* Since we have non-voting slaves that lower the probability of an entry
* to feature our node, we set the number of entries per packet as
* 10% of the total nodes we have. */
因为我们拥有没有投票权的从节点 ,这降低了进入标记我们节点的概率,
我们为每个报设置段的数量 为 总节点数的百分之10,
wanted = floor(dictSize(server.cluster->nodes)/10); 节点数的十分之一 去下整
if (wanted < 3) wanted = 3; 如果数量小于3,那么就为3
if (wanted > freshnodes) wanted = freshnodes; 超过了最大期望数,那就为最大期望数
/* Include all the nodes in PFAIL state, so that failure reports are
* faster to propagate to go from PFAIL to FAIL state. */
包括处于PFAIL状态的所有节点,以便故障报告能够更快地从PFAIL传播到故障状态。
int pfail_wanted = server.cluster->stats_pfail_nodes;
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
计算最大总长度来分配我们的缓冲区。我们稍后会根据真正存放在包中的gossip协议部分的数量来来设置这个总长度的值
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 结构体的长度 减去 指针长度
totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted)); 指针指向的实际数据长度
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
注意: 函数clusterBuildMessageHdr期望缓存总是至少sizeof(clusterMsg)或者更多
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg); 如果少于结构体的长度,那么就设置为结构体长度
buf = zcalloc(totlen); 分配空间
hdr = (clusterMsg*) buf; 指向缓存头
/* Populate the header. */ 填充头
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime(); 设置ping时间
clusterBuildMessageHdr(hdr,type);组装头部信息
/* Populate the gossip fields */ 填充gossip协议域
int maxiterations = wanted*3; 最大迭代次数
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes); 从中随机抽取一个节点
clusterNode *this = dictGetVal(de);
/* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */
不要包括本身节点:整个报文头部就是关于我们本身的,因此只需要gossip其它节点的信息
if (this == myself) continue;
/* PFAIL nodes will be added later. */ 稍后添加PFAIL状态的节点
if (this->flags & CLUSTER_NODE_PFAIL) continue;
/* In the gossip section don't include:
* 1) Nodes in HANDSHAKE state.
* 3) Nodes with the NOADDR flag set.
* 4) Disconnected nodes if they don't have configured slots.
*/
在gossip协议段不包括如下情况:
1)处于握手状态的节点
3)标志没有地址的节点
4)断开的节点如果他们没有配置槽
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) || 处于握手状态 或者 没有地址
(this->link == NULL && this->numslots == 0)) 没有连接并且服务的槽数为0
{
freshnodes--; /* Tecnically not correct, but saves CPU. */ 剔除不符合要求的节点,节省算力,
continue;
}
/* Do not add a node we already have. */
不要添加我们已经加入的节点
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
/* Add it */ 添加该节点
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--; 要组装的节点少1
gossipcount++; 协议段节点计数加1
}
/* If there are PFAIL nodes, add them at the end. */ 最后添加存在的状态为PFAIL的节点
if (pfail_wanted) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *node = dictGetVal(de);
if (node->flags & CLUSTER_NODE_HANDSHAKE) continue; 跳过握手状态的节点
if (node->flags & CLUSTER_NODE_NOADDR) continue;跳过没有地址的节点
if (!(node->flags & CLUSTER_NODE_PFAIL)) continue; 跳过非PFAIL状态的节点
clusterSetGossipEntry(hdr,gossipcount,node); 设置gossip协议中的节点实体
freshnodes--;
gossipcount++;
/* We take the count of the slots we allocated, since the
* PFAIL stats may not match perfectly with the current number
* of PFAIL nodes. */
我们计算分配的槽数,因为PFAIL统计数据可能与当前PFAIL节点数不完全匹配。(以分配的空间数为准)
pfail_wanted--;
}
dictReleaseIterator(di);
}
/* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */
准备去发送。。。先确定总长度这个容易忘记的值,然后将消息放入输出缓存中排队
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
************************************************************************************************
统计有效的从节点数
int clusterCountNonFailingSlaves(clusterNode *n) {
int j, okslaves = 0;
for (j = 0; j < n->numslaves; j++)
if (!nodeFailed(n->slaves[j])) okslaves++; 不是故障的从节点,有效从节点数加1
return okslaves;
}
************************************************************************************************
/* If a manual failover timed out, abort it. */
如果手工故障转移超时了,终止它
void manualFailoverCheckTimeout(void) {
if (server.cluster->mf_end && server.cluster->mf_end < mstime()) { 存在超时时间 并且 超时
serverLog(LL_WARNING,"Manual failover timed out.");
resetManualFailover(); 一切结束,重置手工故障转移状态
}
}
************************************************************************************************
/* This function is called from the cluster cron function in order to go
* forward with a manual failover state machine. */
这个函数被集群的定时任务调用,目的是为了推动手工故障转移的状态机
void clusterHandleManualFailover(void) {
/* Return ASAP if no manual failover is in progress. */
如果没有手工故障转移在进程中,尽快返回
if (server.cluster->mf_end == 0) return;
/* If mf_can_start is non-zero, the failover was already triggered so the
* next steps are performed by clusterHandleSlaveFailover(). */
如果mf_can_start是非0,那么故障转移已经触发,因此下一步由clusterHandleSlaveFailover执行
if (server.cluster->mf_can_start) return;
if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */ 等待主节点赋值偏移量
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) { 主节点偏移量已经全部复制完毕
/* Our replication offset matches the master replication offset
* announced after clients were paused. We can start the failover. */
我们的复制偏移量等于主节点的复制偏移量,宣告客户端暂停后,我们可以启动故障转移
server.cluster->mf_can_start = 1;开启
serverLog(LL_WARNING,
"All master replication stream processed, "
"manual failover can start.");
}
}
************************************************************************************************
/* Return non zero if the node is already present in the gossip section of the
* message pointed by 'hdr' and having 'count' gossip entries. Otherwise
* zero is returned. Helper for clusterSendPing(). */
如果节点已经存在hdr指针指向的消息并且拥有count个gossip段中的gossip协议段中,那么返回1,
否则返回0.是函数clusterSendPing的助手。
int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
int j;
for (j = 0; j < count; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
CLUSTER_NAMELEN) == 0) break; 对比名字
}
return j != count;
}
************************************************************************************************
/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
* to the info of the specified node 'n'. */
设置由指针hdr指向的消息,gossip协议段中第i个实体, 指向指定节点n的信息。
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
gossip = &(hdr->data.ping.gossip[i]); 指向第i个实体
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN); 设置节点名
gossip->ping_sent = htonl(n->ping_sent/1000); ping发送时间转化成秒
gossip->pong_received = htonl(n->pong_received/1000);pong接收时间转化成秒
memcpy(gossip->ip,n->ip,sizeof(n->ip));设置ip
gossip->port = htons(n->port); 服务口
gossip->cport = htons(n->cport); 消息口
gossip->flags = htons(n->flags); 标志
gossip->notused1 = 0;
}
************************************************************************************************
/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
如果我们是一个从节点并且我们的主节点服务超过1个哈希槽处故障状态,那么就调用这个函数
* The gaol of this function is:
* 1) To check if we are able to perform a failover, is our data updated?
* 2) Try to get elected by masters.
* 3) Perform the failover informing all the other nodes.
*/
本函数的目标:
1)检查我们是否能够执行一次故障转移,我们的数据已经更新了?
2)尝试通过主节点选举
3)执行故障转移 通知所有的其它节点
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time; 发出投票过去时间
int needed_quorum = (server.cluster->size / 2) + 1; 法定数量 为 总节点数的一半 +1
int manual_failover = server.cluster->mf_end != 0 && 可以开始手工故障转移
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER; 去掉准备处理手工转移的标志
/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
*计算故障转移超时(我们发送投票和等到回复的最大时间),和故障转移的尝试时间(再次尝试投票前等待的时间)
* Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds. 超时时间是 NODE_TIMEOUT*2 和 2秒 中大的那个
* Retry is two times the Timeout. 尝试时间 是 两倍超时时间
*/
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout*2;
/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) We don't have the no failover configuration set, and this is
* not a manual failover.
* 4) It is serving slots. */
函数执行的前提条件,无论是自动故障切换还是手动故障切换,都必须满足这一要求:
1)我们是从节点
2)我们的主节点被标记为故障,或者这是一次手工故障切换
3)我们没有设置不允许手工故障切换的配置,并且 这不是一次手工故障切换
4) 它为槽提供服务
if (nodeIsMaster(myself) || 我是主节点
myself->slaveof == NULL || 主节点为空
(!nodeFailed(myself->slaveof) && !manual_failover) || 主节点没有故障 并且 不允许手工故障切换
(server.cluster_slave_no_failover && !manual_failover) || 不会自动进行主从切换 并且 不允许手工故障切换
myself->slaveof->numslots == 0) 主节点服务的槽数为0
{
/* There are no reasons to failover, so we set the reason why we
* are returning without failing over to NONE. */
无理由进行故障切换,因此我们设置了为什么我们返回而没有失败的原因
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
/* Set data_age to the number of seconds we are disconnected from
* the master. */
设置data_age为断开主节点连接的秒数
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000; 当前时间和最后一次交互时间的差值
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000; 当前时间和下线时间的差值
}
/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
从ata_age中减去节点超时时间,因为当我们从主节点断开到标记为故障至少需要经过节点超时的时间,
而标记节点故障的之前,我们还认为连接是正常的, 所以标记节点故障是基线,这个基线的时间就是节点超时时间。
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough according to the slave validity
* factor configured by the user.
根据用户配置的参数cluster_slave_validity_factor检查我们的数据是否足够新
* Check bypassed for manual failovers. */
if (server.cluster_slave_validity_factor && 存在从节点的故障转移的最大数据时间限制
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) + 这里初始化是10秒
(server.cluster_node_timeout * server.cluster_slave_validity_factor))) 这里是10倍的节点超时时间
{ 最后更新的时间 大于 10秒 + 10倍的节点超时时间 表示从节点数据更新过旧,不适合再进行故障切换
if (!manual_failover) { 不是手工强制故障切换,
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE); 记录不能故障切换原因
return;
}
}
/* If the previous failover attempt timedout and the retry time has
* elapsed, we can setup a new one. */
如果上一次故障转移尝试时间已过,且重试时间已过,则可以设置新的故障切换尝试时间
if (auth_age > auth_retry_time) { 自 上次投票 已经过去了 超过了两倍的节点超时时间
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
固定顺延500毫秒,让节点故障的消息得以传播
random() % 500; /* Random delay between 0 and 500 milliseconds. */
随机延迟0到500毫秒之间的一个值(这样防止从节点同时进行投票)
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank(); 获取从节点的排名
/* We add another delay that is proportional to the slave rank.
* Specifically 1 second * rank. This way slaves that have a probably
* less updated replication offset, are penalized. */
我们根据从节点的排名按比例添加另外一个延时。指定为 1s * rank.
更新复制偏移量较少的从节点会受到惩罚。(排名越靠前,就越有机会成为主节点)
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000; 上面的随机值对排名一样的从节点有用
/* However if this is a manual failover, no delay is needed. */
然而如果是手工切换,那么就不需要计算延迟
if (server.cluster->mf_end) { 存在手工切换超时设置
server.cluster->failover_auth_time = mstime(); 开始计时
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); 设置开始处理故障切换
}
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset()); 从节点复制的偏移量
/* Now that we have a scheduled election, broadcast our offset
* to all the other slaves so that they'll updated their offsets
* if our offset is better. */
现在我们已经开启了选举,将我们的偏移量广播给所有其他从节点,这样,如果我们的偏移量更好,它们就会更新它们的偏移量
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES); 广播当前从节点偏移量信息
return;
}
/* It is possible that we received more updated offsets from other
* slaves for the same master since we computed our election delay.
* Update the delay if our rank changed.
因为我们计算了选举延迟,所以从其它从节点接收到更多更新偏移量是有可能的,
这种情况下如果排名改变了就更新延迟
* Not performed if this is a manual failover. */ 如果是手工故障切换不用执行
if (server.cluster->failover_auth_sent == 0 && 还没有开始请求投票
server.cluster->mf_end == 0) 非手工故障切换
{
int newrank = clusterGetSlaveRank(); 重新 获取节点排名
if (newrank > server.cluster->failover_auth_rank) { 新的排名更加靠后
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay; 增加新延迟时间
server.cluster->failover_auth_rank = newrank;
serverLog(LL_WARNING,
"Replica rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
/* Return ASAP if we can't still start the election. */
如果我们还不能开启选举,尽快返回
if (mstime() < server.cluster->failover_auth_time) { 还没有到下次选举时间,
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY); 记录不乏其故障切换的原因
return;
}
/* Return ASAP if the election is too old to be valid. */
如果选举时间太长而无法生效,请尽快返回
if (auth_age > auth_timeout) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED); 故障切换超时
return;
}
/* Ask for votes if needed. */
如果需要故障切换,请求投票
if (server.cluster->failover_auth_sent == 0) { 还没有发出投票请求
server.cluster->currentEpoch++; 当前纪元加1
server.cluster->failover_auth_epoch = server.cluster->currentEpoch; 设置投票纪元
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
clusterRequestFailoverAuth();请求投票
server.cluster->failover_auth_sent = 1; 标志已经开始请求投票了
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */ 等待回复
}
/* Check if we reached the quorum. */ 检查我们是否达到法定票数
if (server.cluster->failover_auth_count >= needed_quorum) {
/* We have the quorum, we can finally failover the master. */
我拥有了法定票数,我们最终可以切换故障主节点了
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
/* Update my configEpoch to the epoch of the election. */ 将我节点的配置纪元更新为选举纪元
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
/* Take responsibility for the cluster slots. */
对集群的槽负责,需要接替服务主节点曾经服务的槽
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES); 等待故障切换投票结果
}
}
************************************************************************************************
/* -----------------------------------------------------------------------------
* CLUSTER slave migration 集群从节点迁移
*
* Slave migration is the process that allows a slave of a master that is
* already covered by at least another slave, to "migrate" to a master that
* is orpaned, that is, left with no working slaves.
从节点迁移是指允许已被至少另一个从节点覆盖的主节点的从节点“迁移”到孤单的主节点(即没有有效从节点)的过程。
* ------------------------------------------------------------------------- */
/* This function is responsible to decide if this replica should be migrated
* to a different (orphaned) master. It is called by the clusterCron() function
* only if:
此函数负责决定是否应将此复制副本迁移(从节点)到其他(孤独)主节点。
只有在以下情况下,函数clusterCron才会调用它:
* 1) We are a slave node.
1我们是一个从节点
* 2) It was detected that there is at least one orphaned master in
* the cluster
2我们检测到在集群中至少有一个孤独主节点
* 3) We are a slave of one of the masters with the greatest number of
* slaves.
3我们是具有最多从节点的主节点的其中一个从节点
* This checks are performed by the caller since it requires to iterate
* the nodes anyway, so we spend time into clusterHandleSlaveMigration()
* if definitely needed.
该检查由调用方执行,因为它无论如何都需要迭代节点,所以如果确实需要的话,
进入ClusterHandleSlaveMigation之后我们会再花时间检查。
* The fuction is called with a pre-computed max_slaves, that is the max
* number of working (not in FAIL state) slaves for a single master.
该函数使用预先计算好的参数max_slaves,即单个主节点的最大有效(非故障状态)从节点数量
* Additional conditions for migration are examined inside the function.
*/迁移的额外条件在函数内部检查。
void clusterHandleSlaveMigration(int max_slaves) {
int j, okslaves = 0;
clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
dictIterator *di;
dictEntry *de;
/* Step 1: Don't migrate if the cluster state is not ok. */
步骤1: 如果集群状态有问题,不要迁移
if (server.cluster->state != CLUSTER_OK) return;
/* Step 2: Don't migrate if my master will not be left with at least
* 'migration-barrier' slaves after my migration. */
步骤2:不要迁移,如果在我迁移之后,我的主节点没有留下至少migration-barrier个从节点。
if (mymaster == NULL) return;
for (j = 0; j < mymaster->numslaves; j++) 遍历主节点的从节点
if (!nodeFailed(mymaster->slaves[j]) &&
!nodeTimedOut(mymaster->slaves[j])) okslaves++; 有效节点数
if (okslaves <= server.cluster_migration_barrier) return; 如果小于等于配置参数cluster_migration_barrier,就不迁移
/* Step 3: Identify a candidate for migration, and check if among the
* masters with the greatest number of ok slaves, I'm the one with the
* smallest node ID (the "candidate slave").
*
* Note: this means that eventually a replica migration will occur
* since slaves that are reachable again always have their FAIL flag
* cleared, so eventually there must be a candidate. At the same time
* this does not mean that there are no race conditions possible (two
* slaves migrating at the same time), but this is unlikely to
* happen, and harmless when happens. */
步骤3:确定迁移的候选集,并检查在拥有最多有效从节点的主节点中,
我是否是节点ID最小的(“候选从机”)。
注意:
这意味着最终将发生副本迁移,因为再次可访问的从节点总是清除其失败标志,因此最终必须有一个候选从节点。
同时,这并不意味着没有可能的竞争条件(两个从节点同时迁移),但这不太可能发生,即使发生也是无害的。
candidate = myself;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
int okslaves = 0, is_orphaned = 1;
/* We want to migrate only if this master is working, orphaned, and
* used to have slaves or if failed over a master that had slaves
* (MIGRATE_TO flag). This way we only migrate to instances that were
* supposed to have replicas. */
我们想要迁移到的目标主节点 应该是 有效的 孤独的 过去有从节点的
或者 有从节点的失败的主节点(有MIGRATE_TO标志)。
通过这种方式,我们只迁移到应该具有副本的实例。
if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0; 从节点 或者 故障节点
if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0; 不能迁移的主节点
/* Check number of working slaves. */ 价差有效的从节点数
if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node); 统计主节点的有效从节点数
if (okslaves > 0) is_orphaned = 0; 有效从节点数大于0,不孤独
if (is_orphaned) { 是孤独主节点
if (!target && node->numslots > 0) target = node; 服务的槽数大于0
/* Track the starting time of the orphaned condition for this
* master. */
跟踪该主节点孤独状态开始的时间
if (!node->orphaned_time) node->orphaned_time = mstime(); 设置孤独开始时间
} else {
node->orphaned_time = 0;
}
/* Check if I'm the slave candidate for the migration: attached
* to a master with the maximum number of slaves and with the smallest
* node ID. */
检查我是否是迁移从节点候选:连接到具有最大从节点的主节点和具有最小的节点ID
if (okslaves == max_slaves) { 连接到具有最大从节点数的主节点
for (j = 0; j < node->numslaves; j++) {
if (memcmp(node->slaves[j]->name,
candidate->name,
CLUSTER_NAMELEN) < 0) 如果存在比候选节点更小的ID
{
candidate = node->slaves[j];
}
}
}
}
dictReleaseIterator(di);
/* Step 4: perform the migration if there is a target, and if I'm the
* candidate, but only if the master is continuously orphaned for a
* couple of seconds, so that during failovers, we give some time to
* the natural slaves of this instance to advertise their switch from
* the old master to the new one. */
步骤4:如果存在目标(符合要求的主节点)就执行迁移,如果我是候选者,但是前提是主节点持续几秒都处于孤独状态,
因此,在故障切换过程中,我们会给这个实例中的依赖从节点一些时间来宣告他们从旧主节点到新主主节点的转换。
if (target && candidate == myself && 目标主节点存在 并且 候选节点是我自己
(mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY && 当前时间 超过 主节点孤独开始时间 5秒
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) 模块没有设置不能故障切换标志
{
serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
target->name);
clusterSetMaster(target); 设置target为当前节点的主节点
}
}
************************************************************************************************
/* -----------------------------------------------------------------------------
* Cluster state evaluation function 集群状态求值函数
* -------------------------------------------------------------------------- */
/* The following are defines that are only used in the evaluation function
* and are based on heuristics. Actually the main point about the rejoin and
* writable delay is that they should be a few orders of magnitude larger
* than the network latency. */
以下是仅在求值函数中使用且基于启发式的定义。
实际上,关于重新加入和可写延迟的主要观点是,它们应该比网络延迟大几个数量级。
#define CLUSTER_MAX_REJOIN_DELAY 5000
#define CLUSTER_MIN_REJOIN_DELAY 500
#define CLUSTER_WRITABLE_DELAY 2000
void clusterUpdateState(void) {
int j, new_state;
int reachable_masters = 0;
static mstime_t among_minority_time;
static mstime_t first_call_time = 0;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE; 取消更新状态
/* If this is a master node, wait some time before turning the state
* into OK, since it is not a good idea to rejoin the cluster as a writable
* master, after a reboot, without giving the cluster a chance to
* reconfigure this node. Note that the delay is calculated starting from
* the first call to this function and not since the server start, in order
* to don't count the DB loading time. */
如果这是一个主节点,请等待一段时间,然后再将状态转换为OK,
因为在重新启动后,不给群集机会重新配置此节点的情况下,以可写主节点的身份重新加入群集不是一个好主意。
(先等一段时间,让该节点和其它节点通信确认配置情况)
请注意,延迟是从第一次调用此函数开始计算的,而不是从服务器启动开始计算的,以便不计算DB加载时间。
if (first_call_time == 0) first_call_time = mstime();
if (nodeIsMaster(myself) && 主节点
server.cluster->state == CLUSTER_FAIL && 集群处于故障状态
mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return; 小于可写延迟 先返回等待
/* Start assuming the state is OK. We'll turn it into FAIL if there
* are the right conditions. */
开始假设状态正常。如果有合适的条件,我们会把它变成故障
new_state = CLUSTER_OK;
/* Check if all the slots are covered. */ 检查是否所有的槽被分配
if (server.cluster_require_full_coverage) { 检查配置是否要求必须所有槽被分配
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL)) 每个槽必须要有可用服务节点
{
new_state = CLUSTER_FAIL; 存在槽没有可用服务节点的情况下,就宣告故障
break;
}
}
}
/* Compute the cluster size, that is the number of master nodes
* serving at least a single slot.
*计算集群大小,就是 至少服务一个槽的主节点个数
* At the same time count the number of reachable masters having
* at least one slot. */
同时统计至少拥有一个槽的可达主节点的数量
{
dictIterator *di;
dictEntry *de;
server.cluster->size = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (nodeIsMaster(node) && node->numslots) { 确认是主节点 并且 服务槽数大于0
server.cluster->size++; 有效主节点数加1
if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
reachable_masters++; 可达主节点数加1
}
}
dictReleaseIterator(di);
}
/* If we are in a minority partition, change the cluster state
* to FAIL. */ 如果我们处于少部分节点集群中,改变集群的状态为故障(集群分成了多部分)
{
int needed_quorum = (server.cluster->size / 2) + 1; 法定数量
if (reachable_masters < needed_quorum) { 可达主节点数少于法定节点数量
new_state = CLUSTER_FAIL; 设置故障状态
among_minority_time = mstime(); 设置处于少数部分开始时间
}
}
/* Log a state change */ 记录一个词状态变换
if (new_state != server.cluster->state) {
mstime_t rejoin_delay = server.cluster_node_timeout;
/* If the instance is a master and was partitioned away with the
* minority, don't let it accept queries for some time after the
* partition heals, to make sure there is enough time to receive
* a configuration update. */
如果实例是一个主节点,并且与少数实例一起被分区,
那么在分区修复后的一段时间内不要让它接受查询,以确保有足够的时间接收配置更新。
这个时间在区间 CLUSTER_MIN_REJOIN_DELAY 到 CLUSTER_MAX_REJOIN_DELAY 之间,
默认为上面赋值的节点超时时间,如果超过上面的范围,那就是去上限或者下限值
if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
if (new_state == CLUSTER_OK && 集群正常
nodeIsMaster(myself) && 是主节点
mstime() - among_minority_time < rejoin_delay) 开始分区时间少于 重新加入时间,继续等待
{
return;
}
/* Change the state and log the event. */ 改变状态 并且记录事件
serverLog(LL_WARNING,"Cluster state changed: %s",
new_state == CLUSTER_OK ? "ok" : "fail");
server.cluster->state = new_state; 改变状态
}
}
************************************************************************************************
/* This function is called by clusterHandleSlaveFailover() in order to
* let the slave log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
* again, we can't log the same things continuously.
这个函数由函数clusterHandleSlaveFailover调用,使得从节点记录其无法进行故障切换的原因。
有时没有条件,但由于故障切换功能被反复调用,我们无法连续记录相同的事情
* This function works by logging only if a given set of conditions are
* true:
仅当给定的一组条件为真时,此函数才通过日志记录工作
* 1) The reason for which the failover can't be initiated changed.
* The reasons also include a NONE reason we reset the state to
* when the slave finds that its master is fine (no FAIL flag).
1)无法启动故障转移的原因已更改。原因还包括当从节点发现其主节点正常时,我们将状态重置为无原因
* 2) Also, the log is emitted again if the master is still down and
* the reason for not failing over is still the same, but more than
* CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2)此外,如果主节点仍处于关闭状态,且未进行故障转移的原因仍然相同,
但超过了参数CLUSTER_CANT_FAILOVER_RELOG_PERIOD设置的时间,则会再次发出日志
* 3) Finally, the function only logs if the slave is down for more than
* five seconds + NODE_TIMEOUT. This way nothing is logged when a
* failover starts in a reasonable time.
3)最后,该函数仅在从节点 停机超过 5秒+节点超时 的情况下记录。
这样,当故障切换在合理的时间内启动时,不会记录任何内容。
* The function is called with the reason why the slave can't failover
* which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
调用该函数的原因是从节点无法故障切换,这个原因是 整数宏CLUSTER_CANT_FAILOVER_* 中的一个
* The function is guaranteed to be called only if 'myself' is a slave. */
只有当节点myself是从节点时,才保证调用该函数
void clusterLogCantFailover(int reason) {
char *msg;
static time_t lastlog_time = 0;
mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
/* Don't log if we have the same reason for some time. */
如果我们在一段时间内因为相同的原因,不记录
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (60*5) /* seconds. */
if (reason == server.cluster->cant_failover_reason && 原因一样
time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) 300秒之内同样原因不记录日志
return;
server.cluster->cant_failover_reason = reason;
/* We also don't emit any log if the master failed no long ago, the
* goal of this function is to log slaves in a stalled condition for
* a long time. */
如果主节点不久前发生故障,我们也不会记录任何日志,此函数的目标是记录长期处于暂停状态的从节点。
if (myself->slaveof && 不为空表示本身是从节点
nodeFailed(myself->slaveof) && 主节点失败
(mstime() - myself->slaveof->fail_time) < nolog_fail_time) return; 失败时间没有超过 设定的不记录日志时间
switch(reason) {
case CLUSTER_CANT_FAILOVER_DATA_AGE:
msg = "Disconnected from master for longer than allowed. "
"Please check the 'cluster-replica-validity-factor' configuration "
"option.";
break;
case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
msg = "Waiting the delay before I can start a new failover.";
break;
case CLUSTER_CANT_FAILOVER_EXPIRED:
msg = "Failover attempt expired.";
break;
case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
msg = "Waiting for votes, but majority still not reached.";
break;
default:
msg = "Unknown reason code.";
break;
}
lastlog_time = time(NULL);
serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
}
************************************************************************************************
/* This function returns the "rank" of this instance, a slave, in the context
* of its master-slaves ring. The rank of the slave is given by the number of
* other slaves for the same master that have a better replication offset
* compared to the local one (better means, greater, so they claim more data).
这个函数返回实例的秩,秩就是一个从节点在主从节点上下文中的排序。
从节点的秩由同一主节点的其他从节点的数量给出,与本地从节点相比,这些从节点具有更好的复制偏移量
(更好意味着更多,因此他们宣告由更多的数据从主节点复制回来)
* A slave with rank 0 is the one with the greatest (most up to date)
* replication offset, and so forth. Note that because how the rank is computed
* multiple slaves may have the same rank, in case they have the same offset.
秩为0的从节点是复制偏移量最大(最新)的从节点,以此类推。
请注意,由于秩的计算方式,如果多个从节点具有相同的偏移量,则它们可能具有相同的秩
* The slave rank is used to add a delay to start an election in order to
* get voted and replace a failing master. Slaves with better replication
* offsets are more likely to win. */
从节点的秩被用于添加延迟时间以开始选举,以便获得投票并替换失败的主节点。
具有更好复制偏移量的从节点会有更多的获胜机会(获取投票的机会)
int clusterGetSlaveRank(void) {
long long myoffset;
int j, rank = 0;
clusterNode *master;
serverAssert(nodeIsSlave(myself)); 确认本身是从节点
master = myself->slaveof;
if (master == NULL) return 0; /* Never called by slaves without master. */ 不会被没有主节点的从节点调用
myoffset = replicationGetSlaveOffset(); 返回从节点拷贝主节点数据的偏移量
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] != myself && 不是本身节点
!nodeCantFailover(master->slaves[j]) && 不能进行故障切换的从节点
master->slaves[j]->repl_offset > myoffset) rank++; 赋值的数据比我多 排名要靠后1位
return rank;
}
************************************************************************************************
/* Send a PONG packet to every connected node that's not in handshake state
* and for which we have a valid link.
向每个未处于握手状态且具有有效链接的连接节点发送PONG数据包。
* In Redis Cluster pongs are not used just for failure detection, but also
* to carry important configuration information. So broadcasting a pong is
* useful when something changes in the configuration and we want to make
* the cluster aware ASAP (for instance after a slave promotion).
在Redis集群中,PONG数据报不仅用于故障检测,还用于携带重要的配置信息。
因此,当配置发生变化时,广播pong非常有用,我们希望尽快使集群获得配置的修改(例如,在从节点升级为主节点之后)。
* The 'target' argument specifies the receiving instances using the
* defines below:
参数“target”使用下面的定义指定接收实例
* CLUSTER_BROADCAST_ALL -> All known instances. 所有知道的节点
* CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring. 处于同样主从群中的所有从节点
*/
#define CLUSTER_BROADCAST_ALL 0
#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
void clusterBroadcastPong(int target) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue; 无连接,继续下一个
if (node == myself || nodeInHandshake(node)) continue; 本身 或者 处于握手状态 继续下一个
if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) { 是主节点的所有从节点
int local_slave =
nodeIsSlave(node) && node->slaveof && 从节点 并且 主节点不为空
(node->slaveof == myself || node->slaveof == myself->slaveof); 主节点是本身 或者 具有同样的主节点
if (!local_slave) continue;
}
clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG); 发送pong报文
}
dictReleaseIterator(di);
}
************************************************************************************************
/* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
* see if there is the quorum for this slave instance to failover its failing
* master.
此函数将向每个节点发送FAILOVE_AUTH_REQUEST消息,
以查看此从节点是否具有从故障主节点进行故障转移的法定票数
* Note that we send the failover request to everybody, master and slave nodes,
* but only the masters are supposed to reply to our query. */
请注意,我们将故障转移请求发送给所有人,包括主节点和从节点,但只有主节点应该回复我们的查询。
void clusterRequestFailoverAuth(void) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);组装求情信息内容
/* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the master is working. */
如果这是手动故障切换,在报文头部设置CLUSTERMSG_FLAG0_Foreck位,以便与接收消息的节点通信,
即使主节点正在工作,它们也应进行故障切换
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; 手工故障转移 设置强制切换标志位
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 除去数据的长度
hdr->totlen = htonl(totlen);
clusterBroadcastMessage(buf,totlen); 广播消息
}
************************************************************************************************
/* This function implements the final part of automatic and manual failovers,
* where the slave grabs its master's hash slots, and propagates the new
* configuration.
此函数实现自动和手动故障切换的最后一部分,其中从节点获取其主节点的哈希槽,并传播新配置
* Note that it's up to the caller to be sure that the node got a new
* configuration epoch already. */
请注意,由调用方确定节点是否已经获得了新的配置
void clusterFailoverReplaceYourMaster(void) {
int j;
clusterNode *oldmaster = myself->slaveof; 原主节点
if (nodeIsMaster(myself) || oldmaster == NULL) return; 本身是主节点 或者 原主节点为空
/* 1) Turn this node into a master. */
1)转变该节点为主节点
clusterSetNodeAsMaster(myself); 设置我们为主节点
replicationUnsetMaster(); 取消复制,将自身设置为主节点
/* 2) Claim all the slots assigned to our master. */
2)宣告原节点的所服务的槽归我服务
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) { 属于原主节点的槽
clusterDelSlot(j); 删除槽的所有者
clusterAddSlot(myself,j); 添加到我名下
}
}
/* 3) Update state and save config. */
3)更新状态并且保存配置
clusterUpdateState(); 更新集群状态
clusterSaveConfigOrDie(1); 保存修改的配置
/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
4)对所有其他节点进行Pong,以便它们可以相应地更新状态,并检测到我们已切换到主节点角色
clusterBroadcastPong(CLUSTER_BROADCAST_ALL); 对所有节点发送PONG报文
/* 5) If there was a manual failover in progress, clear the state. */
5)如果正在进行手动故障切换,清除状态。
resetManualFailover();
}
************************************************************************************************