进过上文(关于redis sentinel的启动 https://www.cnblogs.com/cquccy/p/15625768.html )
我们知道了默认情况下大概每隔100毫秒就会调用一次sentinel的定时器,那么定时器里面到底做了哪些内容呢?
让我们一起来看下
函数sentinelTimer如下:
void sentinelTimer(void) {
sentinelCheckTiltCondition(); 检测TILT状态
sentinelHandleDictOfRedisInstances(sentinel.masters); 处理配置文件中配置的redis主机列表,配置几个就有几个
sentinelRunPendingScripts(); 处理挂着的脚本
sentinelCollectTerminatedScripts(); 处理执行结束的脚本
sentinelKillTimedoutScripts();
/* We continuously change the frequency of the Redis "timer interrupt"
* in order to desynchronize every Sentinel from every other.
* This non-determinism avoids that Sentinels started at the same time
* exactly continue to stay synchronized asking to be voted at the
* same time again and again (resulting in nobody likely winning the
* election because of split brain voting). */
我们不断的改变redis中断计时器的频率,这样可以每个sentinel和其它sentinels不同步。
这种非确定论避免了sentinels在开始的时候保持了同步,
以后投票会一直保持同步(在脑裂投票的时候导致没有人可以赢得选举)
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ; 这里的随机值确保了sentinels投票会有先后顺序
}
下面我们挨个来看具体的函数
******************************************************************************
/* This function checks if we need to enter the TITL mode.
这个函数检测我们是否需要进入TILT模式(非正常状态)
* The TILT mode is entered if we detect that between two invocations of the
* timer interrupt, a negative amount of time, or too much time has passed.
* Note that we expect that more or less just 100 milliseconds will pass
* if everything is fine. However we'll see a negative number or a
* difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
* following conditions happen:
在以下两种情况下进入这个TILT模式,当我们检测两次定时器中断触发的时间间隔时,
1如果时间是负数(系统时间进行了调整) 2或者过去了很多时间(中间被阻塞了)
如果所有事情都正常,那么间隔时间我们期望是100毫秒左右。
然而我们会遇到一个负数时间或者一个大于SENTINEL_TILT_TRIGGER毫秒数的时间,
如果下面的两种情况发生时:
* 1) The Sentiel process for some time is blocked, for every kind of
* random reason: the load is huge, the computer was frozen for some time
* in I/O or alike, the process was stopped by a signal. Everything.
1sentinel进程有时候被阻塞了,因为一些随机的原因:比如 加载量过大,或者计算机因为I/O操作被卡主了,
进程被信号打断了,等等
* 2) The system clock was altered significantly.
2系统时钟被重大调整
* Under both this conditions we'll see everything as timed out and ,failing
* without good reasons. Instead we enter the TILT mode and wait
* for SENTINEL_TILT_PERIOD to elapse before starting to act again.
在这两种情况下我们会看到所有事情都超时了,无故失败(不是因为错误,而是超时)。因此我们进入TILT模式,
等待SENTINEL_TILT_PERIOD周期过去,然后再开始正常执行
* During TILT time we still collect information, we just do not act. */
在TILT期内,我们任然会收集信息,但是不执行
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time; 获取两次触发之间的时间差
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { 差值为负 或者 差值过大 大于2秒 #define SENTINEL_TILT_TRIGGER 2000
sentinel.tilt = 1; 进入TILT模式
sentinel.tilt_start_time = mstime(); 开始时间
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); 发出进入TILT模式事件消息
}
sentinel.previous_time = mstime();修改最近一次触发时间
}
******************************************************************************
/* Perform scheduled operations for all the instances in the dictionary.
* Recursively call the function against dictionaries of slaves. */
对字典中的所有实例执行计划的操作。递归调用字典中实例中的从机实例
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
/* There are a number of things we need to perform against every master. */
我们需要对每一个主机做很多事情
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri); 处理实例每个关注的主机
if (ri->flags & SRI_MASTER) { 是主机的情况,处理关注它的从机和sentinel,递归往下处理
sentinelHandleDictOfRedisInstances(ri->slaves); 处理从机
sentinelHandleDictOfRedisInstances(ri->sentinels); 处理sentinel
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { 如果该实例处于失败转移升级状态
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted) 对从机权限进行提升,升级为master
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
******************************************************************************
我们再来看里面的函数 sentinelHandleRedisInstance 如何处理的?
/* Perform scheduled operations for the specified Redis instance. */
对特定的redis实例执行计划操作
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */ 一半监控
/* Every kind of instance */ 每种类型的实例(master slave sentinel)
sentinelReconnectInstance(ri); 进行TCP连接
sentinelSendPeriodicCommands(ri); 发送周期命令
/* ============== ACTING HALF ============= */ 一半处理
/* We don't proceed with the acting half if we are in TILT mode.
* TILT happens when we find something odd with the time, like a
* sudden change in the clock. */
如果处于TITL模式,我们不处理
if (sentinel.tilt) { 处于TITL模式 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30) 30秒
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0; 超过30s,过了TITL模式事件
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
/* Every kind of instance */ 检查各种类型的实例是否主观下线,详情见下面
sentinelCheckSubjectivelyDown(ri);
/* Masters and slaves */ 是主机和从机的情况,留给以后使用
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */ 目前无事可做
}
/* Only masters */ 只针对主机
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri); 是否客观下线
if (sentinelStartFailoverIfNeeded(ri)) 是否需要开启故障转移
强制咨询其它sentinel对主机的看法(回调函数中对其它sentinel的回复做处理)
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri); 故障转移的状态机
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); 非强制咨询其它sentinel对主机的看法
}
}
*****************************进行TCP连接的代码*************************************************
/* Create the async connections for the instance link if the link
* is disconnected. Note that link->disconnected is true even if just
* one of the two links (commands and pub/sub) is missing. */
为实例创建异步的连接,如果连接是断开的。注意到标志link->disconnected为真,
只要两个连接(命令和订阅)任何一个连接是断开的。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return; 是连着的,直接返回
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */ 端口为0意味着无效地址,直接返回
instanceLink *link = ri->link;
mstime_t now = mstime();
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return; 如果当前时差还在PING周期之内,直接返回
ri->link->last_reconn_time = now; 更新连接上次还处于连接时候的时间
/* Commands connection. */ 命令连接
if (link->cc == NULL) { 非空的情况,需要判断
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); 创建异步连接
if (!link->cc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->cc) == C_ERR)) { 创建连接成功 但是 服务表示是TLS的,添加SSL层
失败的情况提示初始化TLS失败
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
instanceLinkCloseConnection(link,link->cc); 释放连接资源
} else if (link->cc->err) { 如果是连接本身失败,提示连接失败
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else { 普通连接成功,不带SSL
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc); 将连接和服务器文件事件捆绑在一起,组成一个数据结构,当做参数传递,方便操作
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback); 设置连接成功的回调函数,具体的细节后续再看
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback); 设置连接断开时候的回调函数
sentinelSendAuthIfNeeded(ri,link->cc); 发送验证信息
sentinelSetClientName(ri,link->cc,"cmd"); 设置连接名(可用client list 查看)
/* Send a PING ASAP when reconnecting. */ 连接上时尽快发送PING命令
sentinelSendPing(ri); 发送ping命令
}
}
/* Pub / Sub */ 订阅相关,同上类似
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); 尝试TCP连接
if (!link->pc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
} else if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub"); 设置连接的名字不同,有订阅关键字,好辨认
/* Now we subscribe to the Sentinels "Hello" channel. */ 订阅sentinel的Hello频道
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s", 回调函数sentinelReceiveHelloMessages用于处理接收的hello消息,具体的细节后续再看
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL); #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
if (retval != C_OK) { 订阅失败
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
如果订阅失败,那么这个订阅的连接就无效,我们只需要断开,然后重新尝试连接
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
/* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
如果两个连接都成功,或者是sentinel实例只有命令连接成功, 那么我们就修改断开连接的标志。因为sentinel只有一条命令连接
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
***************************发送周期命令***************************************************
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
* the specified master or slave instance. */
给特定的主机或者从机实例周期发送 PING ,INFO 和 PUBLISH 到Hello渠道
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
如果我们已经有挂起状态的PING或者INFO,或者实例没有正确连接的情况,尽快返回(注释和代码不太一致)
if (ri->link->disconnected) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
* want to use a lot of memory just because a link is not working
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
即便发送如INFO, PING, PUBLISH这些无关紧要的命令,我们也有一个最大挂起值的限制SENTINEL_MAX_PENDING_COMMANDS。
我们不想因为一条不正常工作的连接浪费太多的内存(注意到无论如何,这里有有个额外的保护措施,
如果长时间超时,那么连接会断开)
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
* period. In this state we want to closely monitor slaves in case they
* are turned into masters by another Sentinel, or by the sysadmin.
如果这是一个处于客观下线的从机实例,我们开始每秒发送INFO命令,代替通常的周期SENTINEL_INFO_PERIOD。
在这个状态,我们需要更加紧密的监视该从机,它们可能被另外一个sentinel或者系统管理员转化为主机
* Similarly we monitor the INFO output more often if the slave reports
* to be disconnected from the master, so that we can have a fresh
* disconnection time figure. */
类似的我们更加频繁的监控命令INFO的输出,如果从机报告已经和主机断开连接,这样我们能描述出断连的时间图
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000; 从机 并且 (从机对应主机客观下线了或者在故障转移进行中 或者 从机复制线路断开时间不为0)
} else {
info_period = SENTINEL_INFO_PERIOD; 正常情况下发送周期时间10秒 #define SENTINEL_INFO_PERIOD 10000
}
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
如果我们收到最后一个PONG的时间到现在为止超过了配置参数down-after-milliseconds的值,那么我们对实例发出PING命令,
如果配置参数down-after-milliseconds大于1秒,那么每秒进行发送ping命令
ping_period = ri->down_after_period; 这个值默认是30秒,可以通过参数down-after-milliseconds配置
见sentinel.c
ri->down_after_period = master ? master->down_after_period :
SENTINEL_DEFAULT_DOWN_AFTER;
#define SENTINEL_DEFAULT_DOWN_AFTER 30000
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD; #define SENTINEL_PING_PERIOD 1000
/* Send INFO to masters and slaves, not sentinels. */ 发送info命令给主机和从机,而非sentinel
info_refresh: is set to 0 to mean that we never received INFO so far. info_refresh为0表示从来没有收到过INFO命令回复值,即实例初始化值
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period)) 到目前为止的时间间隔 超过了 发送周期的时间间隔,发送命令
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s", 回到函数处理回复的info信息,具体后面再详细看
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++; 发送成功,挂起待回复命令+1
}
/* Send PING to all the three kinds of instances. */ 发送ping命令给所有三种类型的实例(主,从,sentinel)
if ((now - ri->link->last_pong_time) > ping_period && 当前时间和收到上次pong回复时间差 大于 ping发送周期
(now - ri->link->last_ping_time) > ping_period/2) { 并且 当前时间和收到上次发送平时间差 大于 ping发送周期一半
因为如果一直没有收到上一个ping的回复,那么now - ri->link->last_pong_time 会一直大于ping_period,
这样发送ping会很频繁,所以这里加上了和上次发送ping命令时间的比较,大于周期的一半才发送ping命令,
后面的条件就是防止频繁发送ping命令
sentinelSendPing(ri);
}
/* PUBLISH hello messages to all the three kinds of instances. */ 推送hello信息给所有三种类型的实例(主,从,sentinel)
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { 2秒 #define SENTINEL_PUBLISH_PERIOD 2000
sentinelSendHello(ri);
}
}
***************************检查各种类型的实例是否主观下线***************************************************
/* Is this instance down from our point of view? */ 从我们的视角(当前实例看其它实例)看,这个实例是否下线
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
if (ri->link->act_ping_time) 上次ping发出的时间(该ping未收到pong回复)
elapsed = mstime() - ri->link->act_ping_time; 计算和当前时间点的差值
else if (ri->link->disconnected) 如果是断开的情况,
elapsed = mstime() - ri->link->last_avail_time; 当前时间 - 最近一次有效的ping时间
/* Check if we are in need for a reconnection of one of the
* links, because we are detecting low activity.
检查我们是否需要对其中的一条连接重连,因为我们检测到低频率的活动
* 1) Check if the command link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
* pending ping for more than half the timeout. */
1)检查我们的cc连接是否连接上,连接时间超过15秒,而且还有超过超时时间一半的挂起ping命令
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000 15秒
if (ri->link->cc && 连接存在
(mstime() - ri->link->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && 创建连接到现在超过了15秒
ri->link->act_ping_time != 0 && /* There is a pending ping... */存在一个没有回复的ping
/* The pending ping is delayed, and we did not receive
* error replies as well. * 挂起的ping命令回复延迟了,我们也没有收到任何错误回复
down_after_period 默认是30秒
因为交互式双向的,所以下面两个方向都判断了发出未回复的时间间隔 和 收到对方最后一次回复的时间间隔
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && 最后一个发出的ping命令(未收到pong)超过15秒
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) 收到的最后一个pong回复的时间超过15秒
{
instanceLinkCloseConnection(ri->link,ri->link->cc); 关闭连接,后面定时任务重连
}
/* 2) Check if the pubsub link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
* activity in the Pub/Sub channel for more than
* SENTINEL_PUBLISH_PERIOD * 3.
*/
2)检查订阅通道是否连接,连接时间不少于30秒,还需要在pub通道没有活跃信息事件超过6秒
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && 连接时间超过15秒
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) 上次收到信息事件间隔大于6秒
#define SENTINEL_PUBLISH_PERIOD 2000
{
instanceLinkCloseConnection(ri->link,ri->link->pc); 关闭连接,后面定时任务重连
}
/* Update the SDOWN flag. We believe the instance is SDOWN if:
更新标志为SDOWN,我们主观认为该实例已经下线,如果处于下面状态:
* 1) It is not replying.
1)该实例没有回复
* 2) We believe it is a master, it reports to be a slave for enough time
* to meet the down_after_period, plus enough time to get two times
* INFO report from the instance. */
2)我们确认这是一个master,但是经过down_after_period的时间 加上 从该实例获取两次info信息报告的时间 ,
它报告说自己是一个slave, 这里的两个时间 一个是下线需要的时间间隔,另外一个是通过两次info信息确认,这样不容易出错
if (elapsed > ri->down_after_period || 超过了主观认为下线的时间间隔
(ri->flags & SRI_MASTER && 是主机
ri->role_reported == SRI_SLAVE && 但是回复信息是从机
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2))) #define SENTINEL_INFO_PERIOD 10000 10秒
{
/* Is subjectively down */ 主观下线
if ((ri->flags & SRI_S_DOWN) == 0) { 原来标识是否主观下线
sentinelEvent(LL_WARNING,"+sdown",ri,"%@"); 没有的情况发送下线事件
ri->s_down_since_time = mstime(); 标记主观下线时间点
ri->flags |= SRI_S_DOWN; 添加标志
}
} else {
/* Is subjectively up */非主观下线, 实际上是上线的
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
*************************检查实例是否客观下线*****************************************************
/* Is this instance down according to the configured quorum? 这个实例根据配置的法定数量是否下线?
*
* Note that ODOWN is a weak quorum, it only means that enough Sentinels
* reported in a given time range that the instance was not reachable.
* However messages can be delayed so there are no strong guarantees about
* N instances agreeing at the same time about the down state. */
注意到ODOWN是一个弱的判定条件,它只是意味着足够多的sentinel在给定的时间范围内报告不可达。
然而 消息可能会延迟,因此没有强有力的保证N个实例在同时判定下线状态
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
if (master->flags & SRI_S_DOWN) { 是否主观下线,只有主观下线,才会发起客观下线询问
/* Is down for enough sentinels? */ 是否有足够多的sentinel认为该实例下线
quorum = 1; /* the current sentinel. */ 当前的sentinel 认为下线了,数量为1
/* Count all the other sentinels. */ 对其它sentinel计数
di = dictGetIterator(master->sentinels); 获取监视同一主机的sentinels
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++; 判断主机下线数量加1
}
dictReleaseIterator(di);
if (quorum >= master->quorum) odown = 1; 如果数量大于等于判定主机下线的数量,修改状态为客观下线
}
/* Set the flag accordingly to the outcome. */ 根据上面的结果设置标志位
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {还没有设置主观下线
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum); 发出消息主观下线
master->flags |= SRI_O_DOWN; 标记主观下线
master->o_down_since_time = mstime();标记客观下线时间
}
} else { 非客观下线
if (master->flags & SRI_O_DOWN) { 但是标记了客观下线
sentinelEvent(LL_WARNING,"-odown",master,"%@"); 发出去掉客观下线消息
master->flags &= ~SRI_O_DOWN; 去掉客观下线标记
}
}
}
************************判断是否需要使用故障转移******************************************************
/* This function checks if there are the conditions to start the failover,
* that is:
*
* 1) Master must be in ODOWN condition.
* 2) No failover already in progress.
* 3) No failover already attempted recently.
*
* We still don't know if we'll win the election so it is possible that we
* start the failover but that we'll not be able to act.
*
* Return non-zero if a failover was started. */
如果故障转移开始,返回非零值
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
/* We can't failover if the master is not in O_DOWN state. */ 如果主机不处于客观下线状态,不进行故障转移
if (!(master->flags & SRI_O_DOWN)) return 0;
/* Failover already in progress? */ 已经在进行故障转移当中
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
/* Last failover attempt started too little time ago? */ 上次故障转移刚发起没有多久
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000) 180秒 failover_timeout 默认值
if (mstime() - master->failover_start_time <
master->failover_timeout*2) 在两倍故障转移时间之内
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000; 再次发起故障转移时间必输在这个时间之后
char ctimebuf[26];
ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf); 记录延迟时间
}
return 0;
}
sentinelStartFailover(master); 开启故障转移
return 1;
}
****************************开启故障转移**************************************************
/* Setup the master state to start a failover. */ 设置主机状态开启故障转移
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER); 确认是主机
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;等待开始故障转移
master->flags |= SRI_FAILOVER_IN_PROGRESS; 设置正在进行故障转移
master->failover_epoch = ++sentinel.current_epoch; 当前纪元+1
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@"); 发出故障转移信息
#define SENTINEL_MAX_DESYNC 1000
随机设置故障转移起始时间,这样容易选出头
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime(); 故障转移状态改变时间
}
*****************************向其它sentinel咨询主机下线情况*************************************************
/* If we think the master is down, we start sending
* SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
* in order to get the replies that allow to reach the quorum
* needed to mark the master in ODOWN state and trigger a failover. */
如果我们认为主机下线了,我们开始发出请求命令SENTINEL IS-MASTER-DOWN-BY-ADDR 给其它sentinels,
获取它们的回复,如果达到了指定数量sentinels的认同,设置主机状态为下线,并且触发故障转移
#define SENTINEL_ASK_FORCED (1<<0) 强制询问sentinel
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels); 遍历所有的sentinels
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time; 上次咨询SENTINEL is-master-down回复过去的时间
char port[32];
int retval;
/* If the master state from other sentinel is too old, we clear it. */
如果从其它sentinels获取的主机状态信息太旧,清除这些信息
if (elapsed > SENTINEL_ASK_PERIOD*5) { 太旧,这里给出的时间间隔是5秒
ri->flags &= ~SRI_MASTER_DOWN; 清除主机下线标志
sdsfree(ri->leader);
ri->leader = NULL;
}
/* Only ask if master is down to other sentinels if:
只有当出现以下条件时候才询问另外sentinel对主机的看法
* 1) We believe it is down, or there is a failover in progress.
1)我(本sentinel)认为主机是主观下线的,或者正在进行故障转移
* 2) Sentinel is connected.
2)sentinel是连接的(就是和要询问的sentinel是联通的,可以发信息)
* 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
3)我们在SENTINEL_ASK_PERIOD毫秒内没有收到信息 #define SENTINEL_ASK_PERIOD 1000 1秒
if ((master->flags & SRI_S_DOWN) == 0) continue; 没有主观下线,下一个
if (ri->link->disconnected) continue; 连接断开,直接下一个
if (!(flags & SENTINEL_ASK_FORCED) && 非强制询问 并且 和上次回复的时间间隔在询问周期之内
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue; 跳到下一个
/* Ask */ 剩下的情况就是需要发出询问的
ll2string(port,sizeof(port),master->addr->port); 端口
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri, 回调函数sentinelReceiveIsMasterDownReply,具体看下面
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++; 待回复命令加1
}
dictReleaseIterator(di);
}
***************************sentinelReceiveIsMasterDownReply***************************************************
/* Receive the SENTINEL is-master-down-by-addr reply, see the
* sentinelAskMasterStateToOtherSentinels() function for more information. */
接受命令SENTINEL is-master-down-by-addr的回复,
可以查看函数sentinelAskMasterStateToOtherSentinels获取更多信息.
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;
if (!reply || !link) return; 如果回复为空 或者 实例为空 直接返回
link->pending_commands--; 挂起的等待回复的命令减1
r = reply;
/* Ignore every error or unexpected reply. 忽略任何错误或者不期望的回复。
* Note that if the command returns an error for any reason we'll
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
注意如果命令因为任何原因返回一个错误,我们在超时之后将清除SRI_MASTER_DOWN标志,
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) { 表示该实例也认可主机下线
ri->flags |= SRI_MASTER_DOWN; 设置该实例主机下线标志
} else {
ri->flags &= ~SRI_MASTER_DOWN; 不认可主机下线
}
if (strcmp(r->element[1]->str,"*")) {
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */ 如果回复中的字段runid是非*,那么该sentinel的回复实际上是一次投票
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer) 和当前纪元不同,到了一个新纪元,可以开始新的投票了
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str); 新leader
ri->leader_epoch = r->element[2]->integer; 新纪元
}
}
}
******************************************************************************
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER); 确认是否是主机
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; 是否在故障转移处理中
switch(ri->failover_state) { 根据故障转移状态处理不同情况
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);等待故障转移开始
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri); 选择准备提升的从机
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri); 将从机提升为主机
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri); 等待提升
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);配置剩余的从机,结束故障转移
break;
}
}
******************************************************************************
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;
/* Check if we are the leader for the failover epoch. */ 监测我们是否是这个纪元故障转移的leader
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,sentinel.myid) == 0; 存在leader 并且 和leader的id相同,那我们就是leader
sdsfree(leader);
/* If I'm not the leader, and it is not a forced failover via
* SENTINEL FAILOVER, then I can't continue with the failover. */
如果我不是leader,并且 也没有通过命令SENTINEL FAILOVER进行强制故障转移
那么我们就不能继续进行故障转移
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT; #define SENTINEL_ELECTION_TIMEOUT 10000
/* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
* and the configured failover timeout. */
选举的超时时间 是 SENTINEL_ELECTION_TIMEOUT 10秒 和 配置的故障转移时间 180秒 小的那个
if (election_timeout > ri->failover_timeout) 超过了配置的故障转移时间180秒
election_timeout = ri->failover_timeout;
/* Abort the failover if I'm not the leader after some time. */ 经过一段时间我还不是leader,终止故障转移
if (mstime() - ri->failover_start_time > election_timeout) { 超过选举时间
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri); 终止故障转移
}
return;
}
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@"); 选出了leader
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION) 模拟成功选举leader后奔溃
sentinelSimFailureCrash();
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; 选择了要升级的从机(该从机会变为主机)
ri->failover_state_change_time = mstime(); 状态改变的时间
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
*****************************获取sentinel的临时leader*************************************************
/* Scan all the Sentinels attached to this master to check if there
* is a leader for the specified epoch.
检查所有关注这个主机的sentinels,检查是否在特定纪元有一个leader
* To be a leader for a given epoch, we should have the majority of
* the Sentinels we know (ever seen since the last SENTINEL RESET) that
* reported the same instance as leader for the same epoch. */
对于成为给定纪元的leader,我们应该拥有我们认识的大部分sentinels的认同(自从上次执行SENTINEL RESET 以来认识的sentinel)
即报告相同的实例成为同样纪元的leader。
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
dictEntry *de;
unsigned int voters = 0, voters_quorum;
char *myvote;
char *winner = NULL;
uint64_t leader_epoch;
uint64_t max_votes = 0;
serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)); 确认是客观下线 或者 处于故障转移中
counters = dictCreate(&leaderVotesDictType,NULL);
所有的sentinels,包括其他sentinels和自己
voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
/* Count other sentinels votes */ 对其它sentinels计数
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch) 当前实例的leader非空 并且 属于同一纪元
sentinelLeaderIncr(counters,ri->leader); 对leader进行计数
}
dictReleaseIterator(di);
/* Check what's the winner. For the winner to win, it needs two conditions:
检查谁是选举出来的leader。对于被选举为leader的条件,有如下两个:
* 1) Absolute majority between voters (50% + 1).
1)超过一半的投票者(50% + 1)
* 2) And anyway at least master->quorum votes. */
2)并且至少有master->quorum投票者
di = dictGetIterator(counters);
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);
if (votes > max_votes) { 获取票数最多的实例
max_votes = votes;
winner = dictGetKey(de);
}
}
dictReleaseIterator(di);
/* Count this Sentinel vote: 对sentinel的投票进行计数:
* if this Sentinel did not voted yet, either vote for the most
* common voted sentinel, or for itself if no vote exists at all. */
如果这个sentinels还没有投过票,要不投票给票数最多的sentinel,要不投票给自己
if (winner) 存在票数最多的,如果还没有投票,就投票给它
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else 否则投票给自己
myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
if (myvote && leader_epoch == epoch) { 当前投票非空 并且 是同样的纪元
uint64_t votes = sentinelLeaderIncr(counters,myvote); 对返回的id加1,再进行比较
if (votes > max_votes) {
max_votes = votes;
winner = myvote;
}
}
voters_quorum = voters/2+1; 一半以上
没有通过半数以上 或者 少于指定的数量 选举失败
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;
winner = winner ? sdsnew(winner) : NULL;
sdsfree(myvote);
dictRelease(counters);
return winner;
}
******************************************************************************
/* Vote for the sentinel with 'req_runid' or return the old vote if already
* voted for the specified 'req_epoch' or one greater.
*
* If a vote is not available returns NULL, otherwise return the Sentinel
* runid and populate the leader_epoch with the epoch of the vote. */
给req_runid的sentinel投票 或者 返回原来的投票 如果已经给特定的 纪元 或者更大 的纪元 投过票了
如果投票有问题就返回空,否则返回sentinel的runid,并且用投票时的纪元填充leader_epoch
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
if (req_epoch > sentinel.current_epoch) { 如果请求纪元大于当前状态机的 纪元
sentinel.current_epoch = req_epoch; 更新状态机的纪元(以更高纪元为标准)
sentinelFlushConfig(); 将纪元的配置刷写到配置文件
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{ 如果主机所在的纪元 落后请求的纪元,那么之前选举的id需要清除,改为新的传入id
sdsfree(master->leader);
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch; 更新到最新纪元
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);
/* If we did not voted for ourselves, set the master failover start
* time to now, in order to force a delay before we can start a
* failover for the same master. */
如果不是给自己投票,那么设置故障转移的开始时间为现在加一个随机值,在我们开始对同样的主机做故障转移增加延迟.
if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; #define SENTINEL_MAX_DESYNC 1000
}
*leader_epoch = master->leader_epoch; 设置主机当前的纪元
return master->leader ? sdsnew(master->leader) : NULL; 该主机选举的leader ID
}
****************************故障转移选举从机**************************************************
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri); 选择最优的从机
/* We don't handle the timeout in this state as the function aborts
* the failover or go forward in the next state. */
我们在这种状态下不处理超时,因为函数会终止故障转移或者转到下一个状态
if (slave == NULL) { 没有选出合适从机作为主机
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else { 选出了合适的从机
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED; 标记该从机被提升了
ri->promoted_slave = slave;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE; 准备将从机转化Wie主机
ri->failover_state_change_time = mstime();状态改变时间
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
****************************从主机所属的从机中选择一个从机作为主机************************************
/* Select a suitable slave to promote. The current algorithm only uses
* the following parameters:
选择一个合适的从机作为主机.当前算法只使用如下的参数:
* 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
1)从机不能处于主客观下线和断线状态
* 2) Last time the slave replied to ping no more than 5 times the PING period.
2)最后一次回复ping命令不能超过5倍的ping周期
* 3) info_refresh not older than 3 times the INFO refresh period.
3)INFO命令的刷新时间不超超过INFO刷新周期的3倍
* 4) master_link_down_time no more than:
* (now - master->s_down_since_time) + (master->down_after_period * 10).
* Basically since the master is down from our POV, the slave reports
* to be disconnected no more than 10 times the configured down-after-period.
* This is pretty much black magic but the idea is, the master was not
* available so the slave may be lagging, but not over a certain time.
* Anyway we'll select the best slave according to replication offset.
4)主机复制断开时间 如果超过:
* (now - master->s_down_since_time) + (master->down_after_period * 10).
从我们的角度( POV = POINT OF VIEW)看主机已经下线,从机报告断开连接不超过配置参数down-after-period10倍的时间。
这看上去像黑魔法,但实际上的主意是,主机处于不可用,因此从机会滞后,但是不会超过特定的时间。
无论如何,我们根据复制的偏移量选择最好的从机
* 5) Slave priority can't be zero, otherwise the slave is discarded.
5)从机优先级不能为0,否则将被抛弃(即不能成为主机)
* Among all the slaves matching the above conditions we select the slave
* with, in order of sorting key:
在所有满足上述条件的从机中,我们根据键排序选择从机
* - lower slave_priority. 小的slave_priority值
* - bigger processed replication offset. 大的复制偏移量
* - lexicographically smaller runid. 字典序小的runid
*
* Basically if runid is the same, the slave that processed more commands
* from the master is selected.基本的如果runid一样,那么选择处理从主机来的命令更多的从机
*
* The function returns the pointer to the selected slave, otherwise
* NULL if no suitable slave was found.
函数返回指向选择从机的指针,否则如果没有合适的从机被找到返回空
*/
****************************************************************************************
/* Helper for sentinelSelectSlave(). This is used by qsort() in order to
* sort suitable slaves in a "better first" order, to take the first of
* the list. */
函数sentinelSelectSlave的辅助函数。被用在函数qsort中,目的是给从机进行排序,用来获取从机列表的第一个元素
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
**sb = (sentinelRedisInstance **)b;
char *sa_runid, *sb_runid;
优先级不一样,优先级越高(slave_priority值越小优先级越高)排的越靠前
if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;
/* If priority is the same, select the slave with greater replication
* offset (processed more data from the master). */
优先级一样的情况下,选择复制主机进度快的那个(处理主机数据更多的那个从机)
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
数据复制的越多的需要排在前面,所以这里slave_repl_offset值大的,反而排在前面
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) { 数据复制的少的排在后面
return 1; /* a > b */
}
/* If the replication offset is the same select the slave with that has
* the lexicographically smaller runid. Note that we try to handle runid
* == NULL as there are old Redis versions that don't publish runid in
* INFO. A NULL runid is considered bigger than any other runid. */
如果复制的偏移量相同,按照从机的runid的字典序排序,选择小的。注意到我们也要处理runid为空的从机,
因为它们是来版本,在INFO命令中没有runid信息,这种情况我们认为是比其它有runid的从机都要大
sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0; 都为空的情况下,认为runid相同
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid); 按照字典序比较
}
****************************************************************************************
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves)); 给所有的从机分配空间
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
if (master->flags & SRI_S_DOWN) 是主观下线
max_master_down_time += mstime() - master->s_down_since_time; 从判断主观下线开始截止到目前的时间
max_master_down_time += master->down_after_period * 10; down_after_period默认值是30秒,10个就是300秒
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) { 遍历该主机连接的所有从机
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue; 已经主观或者客观下线了,那不能选为新的主机
if (slave->link->disconnected) continue; 断开连接了,也不能选为主机
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue; 最后一次有效回复ping的时间超过了5秒
if (slave->slave_priority == 0) continue; 该从机不允许被提升为主机
/* If the master is in SDOWN state we get INFO for slaves every second.
* Otherwise we get it with the usual period so we need to account for
* a larger delay. */
如果主机处于SDOWN(主观下线)状态,我们每秒通过INFO命令获取从机信息。
否则我们我们获取从机信息使用正常的周期,以你我们需要考虑更大的延迟
if (master->flags & SRI_S_DOWN) 主机处于主观下线
info_validity_time = SENTINEL_PING_PERIOD*5; 有效时间为5秒
else
info_validity_time = SENTINEL_INFO_PERIOD*3; 否则有效时间为30秒
if (mstime() - slave->info_refresh > info_validity_time) continue; 超过了我们上述规定的有效时间周期,说明无效,返回
if (slave->master_link_down_time > max_master_down_time) continue; 断开时间超过了最大主机下线时间
instance[instances++] = slave; 排除上述条件的的从机有被选为主机的资格
}
dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion); 二分排序
selected = instance[0]; 选择第一个
}
zfree(instance);
return selected;
}
*******************************发送将从机转化为主机的命令***********************************************
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
/* We can't send the command to the promoted slave if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
我们不能发送命令给提升的从机,因为现在连接断开了一直重试直到超时,然后终止故障转移
if (ri->promoted_slave->link->disconnected) { 连接断开
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { 超时了
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
/* Send SLAVEOF NO ONE command to turn the slave into a master.
* We actually register a generic callback for this command as we don't
* really care about the reply. We check if it worked indirectly observing
* if INFO returns a different role (master instead of slave). */
发送SLAVEOF NO ONE命令,转变从机为主机。我们实际上为这个命令注册了一个一般化的回调函数,
因为我们实际上不关心它的回复。我们检测它(发出的命令)是否正常执行,直接通过观察INFO命令的返回信息,
即是否返回不同的角色(代替slave的master)
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0); 发出SLAVEOF NO ONE命令
if (retval != C_OK) return; 发出失败
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");等得提升
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
*************************等待提升*****************************************************
/* We actually wait for promotion indirectly checking with INFO when the
* slave turns into a master. */
我们实际上等待提升,直接通过检查info命令,当从机转变为主机
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
只是处理超时。转到下个状态是由提升从机的解析info命令函数处理的
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
*************************将其它从机转移到新的主机上来*****************************************************
/* Send SLAVE OF to all the remaining slaves that
* still don't appear to have the configuration updated. */
发送SLAVE OF <new master address>命令给其它的从机,就是那些尚未修改新主机配置的从机
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) 正在修改配置中的从机
in_progress++;
}
dictReleaseIterator(di);
di = dictGetIterator(master->slaves);
while(in_progress < master->parallel_syncs && 还有没有修改主机配置的从机
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
/* Skip the promoted slave, and already configured slaves. */
跳过提升的从机和已经修改过配置的从机
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
/* If too much time elapsed without the slave moving forward to
* the next state, consider it reconfigured even if it is not.
* Sentinels will detect the slave as misconfigured and fix its
* configuration later. */
如果过去了比较长的时间,但是从机还是没有转到下个状态,可以考虑重新配置即使从机已经配置了。
sentinels会检测从机的错误配置,并且稍后修复这个配置
if ((slave->flags & SRI_RECONF_SENT) && 配置已经发送
(mstime() - slave->slave_reconf_sent_time) > 但是处于这个状态的时间超时了
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT; 取消发送状态的状态
slave->flags |= SRI_RECONF_DONE; 设置配置完成标志
}
/* Nothing to do for instances that are disconnected or already
* in RECONF_SENT state. */
处于断线或者 配置中状态的 实例, 不用做任何事情
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
if (slave->link->disconnected) continue;
/* Send SLAVEOF . */ 从新主机copy数据
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT; 标志已发送状态
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);
/* Check if all the slaves are reconfigured and handle timeout. */
检测所有的从机是否已经配置并且处理超时
sentinelFailoverDetectEnd(master);
}
****************************检测故障转移的结束**************************************************
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
mstime_t elapsed = mstime() - master->failover_state_change_time;
/* We can't consider failover finished if the promoted slave is
* not reachable. */ 如果提升的从机不可达,那么我们不能进行故障转移
if (master->promoted_slave == NULL || 无替身的从机 或者从机主观下线
master->promoted_slave->flags & SRI_S_DOWN) return;
/* The failover terminates once all the reachable slaves are properly
* configured. */
一旦所有可达的从机正确配置,故障转移结束
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
if (slave->flags & SRI_S_DOWN) continue;
not_reconfigured++;
}
dictReleaseIterator(di);
/* Force end of failover on timeout. */ 超时就强制结束故障转移 180秒
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}
if (not_reconfigured == 0) { 全部从机配置完毕
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}
/* If I'm the leader it is a good idea to send a best effort SLAVEOF
* command to all the slaves still not reconfigured to replicate with
* the new master. */
如果我们是选出来的leader,给所有未配置过的从机发送SLAVEOF到新的主机,这是一个不错的注意
if (timeout) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue; 已发送
if (slave->link->disconnected) continue; 断开
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port); 给未发送过的从机发送主从命令
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}
******************************************************************************
/* Run pending scripts if we are not already at max number of running
* scripts. */
如果我们还没有处于最大执行脚本的数目,那么开始执行等待的脚本
void sentinelRunPendingScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();
/* Find jobs that are not running and run them, from the top to the
* tail of the queue, so we run older jobs first. */
查找没有执行的任务,然后执行它们,从队列的头部到尾部,因为我们首先执行队列中较老的任务
#define SENTINEL_SCRIPT_MAX_RUNNING 16
listRewind(sentinel.scripts_queue,&li);
while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING && 少于最大允许执行的脚本数
(ln = listNext(&li)) != NULL)
{
sentinelScriptJob *sj = ln->value;
pid_t pid;
/* Skip if already running. */ 跳过已经在执行的任务
if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
/* Skip if it's a retry, but not enough time has elapsed. */
如果是重试,并且没有经过很长时间,跳过(刚刚执行过的,跳过)
if (sj->start_time && sj->start_time > now) continue;
sj->flags |= SENTINEL_SCRIPT_RUNNING; 标志正在执行
sj->start_time = mstime(); 开始时间
sj->retry_num++;
pid = fork();
if (pid == -1) {
/* Parent (fork error). 父进程fork失败
* We report fork errors as signal 99, in order to unify the
* reporting with other kind of errors. */
我们报告fork错误当做信号99,用来将另外类型的错误统一起来报告
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], 99, 0);
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
} else if (pid == 0) {
/* Child */儿子进程
execve(sj->argv[0],sj->argv,environ); 执行脚本
/* If we are here an error occurred. */如果我们执行到这里,那就是上面的调用出现错误
_exit(2); /* Don't retry execution. */ 不再再次尝试执行
} else {
sentinel.running_scripts++;运行脚本加1
sj->pid = pid; 运行脚本的进程号
sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
}
}
}
******************************************************************************
/* Check for scripts that terminated, and remove them from the queue if the
* script terminated successfully. If instead the script was terminated by
* a signal, or returned exit code "1", it is scheduled to run again if
* the max number of retries did not already elapsed. */
检查结束的脚本,把成功结束的脚本从队列中移除, 但是相反如果脚本是被信号或者通过返回码1退出的情况,
而且最大的重复执行次数还没有达到,该脚本需要重新调度执行。
void sentinelCollectTerminatedScripts(void) {
int statloc;
pid_t pid;
while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
int exitcode = WEXITSTATUS(statloc); 子进程是否正常退出
int bysignal = 0;
listNode *ln;
sentinelScriptJob *sj;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); 因为信号中断,获取信号代码
sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
(long)pid, exitcode, bysignal);
ln = sentinelGetScriptListNodeByPid(pid); 通过pid获取关联的运行节点
if (ln == NULL) {
serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
continue;
}
sj = ln->value;
/* If the script was terminated by a signal or returns an
* exit code of "1" (that means: please retry), we reschedule it
* if the max number of retries is not already reached. */
如果脚本不是正常的结束(因为信号或者返回1)那意味着需要重试,如果还没有达到最大的重试次数,那么需要重试
if ((bysignal || exitcode == 1) &&
sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY) 因信号 或则 退出值为1 并且 还没有到达最大重试次数
{
sj->flags &= ~SENTINEL_SCRIPT_RUNNING; 去掉运行标志
sj->pid = 0; 进程号清零
sj->start_time = mstime() +
sentinelScriptRetryDelay(sj->retry_num); 开始新一次尝试的时间
} else {
/* Otherwise let's remove the script, but log the event if the
* execution did not terminated in the best of the ways. */
脚本执行结束,让我们从队列中移除,但是如果不是成功执行的情况下,需要记录情况
if (bysignal || exitcode != 0) {
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], bysignal, exitcode);
}
listDelNode(sentinel.scripts_queue,ln); 从队列中删除
sentinelReleaseScriptJob(sj); 释放脚本占用的资源
}
sentinel.running_scripts--; 执行的脚本数较少1
}
}
******************************************************************************
/* Kill scripts in timeout, they'll be collected by the
* sentinelCollectTerminatedScripts() function. */
终止超时的脚本,它们将有函数sentinelCollectTerminatedScripts处理
void sentinelKillTimedoutScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();
listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;
if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
(now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME) 超时
{
sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
sj->argv[0], (long)sj->pid);
kill(sj->pid,SIGKILL); 发出终止信号
}
}
}
******************************************************************************