redis6.0.5之REPLICAOF同步命令-主从数据同步


REPLICAOF同步数据
根据注释,版本5以后使用REPLICAOF代替SLAVEOF
    { "SLAVEOF",
    "host port",
    "Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.",
    9,
    "1.0.0" },

当执行这个命令的时候 REPLICAOF host port,主机会开始做数据同步,主要逻辑如下
 
调用如下同步函数
/* SYNC and PSYNC command implemenation. */   SYNC 同步 和 PSYNC 增量同步 命令实现
void syncCommand(client *c) {
    /* ignore SYNC if already slave or in monitor mode */  忽略同步命令,如果已经是从机或者处于监控模式
    if (c->flags & CLIENT_SLAVE) return;

    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok... */ 拒绝同步请求,如果我们是一个从机,而且连接的主机当下断开连接了。
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
        addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
        return;
    }

    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
当服务器还有挂着的之前执行命令的数据需要发送给客户端时,不执行同步命令。
我们需要一个新的应答缓冲区,来寄存BGSAVE和当前数据集的区别,从而在需要的时候可以复制给从机
    if (clientHasPendingReplies(c)) {
        addReplyError(c,"SYNC and PSYNC are invalid with pending output");
        return;
    }

    serverLog(LL_NOTICE,"Replica %s asks for synchronization",
        replicationGetSlaveName(c));  记录从机的名字 有ip地址用IP地址,没有用客户唯一编号

    /* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens masterTryPartialResynchronization() already
     * replied with:
如果是PSYNC命令,尝试部分再同步,失败的情况下,我们继续使用全量再同步,
实际上,当这种情况发生时候,函数masterTryPartialResynchronization已经回复客户端+FULLRESYNC  
     * +FULLRESYNC  
     *
     * So the slave knows the new replid and offset to try a PSYNC later
     * if the connection with the master is lost. */
所以从机知道新的复制id和偏移量后面去尝试部分同步,如果主机的连接丢失了。
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        if (masterTryPartialResynchronization(c) == C_OK) {  
        主机尝试进行增量同步,可以就返回成功进行增量同步,不行就返回失败需要全量同步
            server.stat_sync_partial_ok++; 增量同步计数++
            return; /* No full resync needed, return. */ 不需要全量同步
        } else {  需要全量同步的情况
            char *master_replid = c->argv[1]->ptr;

            /* Increment stats for failed PSYNCs, but only if the
             * replid is not "?", as this is used by slaves to force a full
             * resync on purpose when they are not albe to partially
             * resync. */
             对失败的增量同步计数,但是只针对同步id不是问号时,
             因为这种情况是当不能进行增量同步时而强制进行全量同步
            if (master_replid[0] != '?') server.stat_sync_partial_err++;
        }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
         * of the replication protocol (like redis-cli --slave). Flag the client
         * so that we don't expect to receive REPLCONF ACK feedbacks. */
         如果从机使用的命令时SYNC,那么我们使用老的复制协议(例如 redis-cli --slave ).
         标记客户端,这样我们就不会企鹅昂收到复制回复反馈
        c->flags |= CLIENT_PRE_PSYNC;
    }

    /* Full resynchronization. */ 全量同步加1
    server.stat_sync_full++;

    /* Setup the slave as one waiting for BGSAVE to start. The following code
     * paths will change the state if we handle the slave differently. */
设置从机的状态为等待BGSAVE启动。如果我们处理从机不同,那么接下来的代码路径将改变状态
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
    if (server.repl_disable_tcp_nodelay)  是否禁用TCP_NODELAY,禁用则会使用Nagle算法,方便大数据包传播
        connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ 失败也无所谓
    c->repldbfd = -1;
    c->flags |= CLIENT_SLAVE;
    listAddNodeTail(server.slaves,c);

    /* Create the replication backlog if needed. */ 创建后台赋值日志
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
        /* When we create the backlog from scratch, we always use a new
         * replication ID and clear the ID2, since there is no valid
         * past history. */
        当我们从头创建后台日志的时候,我们总是使用新的复制id和清理ID2,因为过去的历史已经没有价值(我们使用了新的全量同步)
        changeReplicationId(); 使用新的复制id,注意到上面的条件,只有一个slave时且全量同步时候才这么做
        clearReplicationId2(); 清空第二复制id2
        createReplicationBacklog();创建后台日志,用于记录执行过的命令
        serverLog(LL_NOTICE,"Replication backlog created, my new "
                            "replication IDs are '%s' and '%s'",
                            server.replid, server.replid2);
    }

    /* CASE 1: BGSAVE is in progress, with disk target. */  情况1:正在进行BGSAVE,目标为磁盘
    if (server.rdb_child_pid != -1 &&
        server.rdb_child_type == RDB_CHILD_TYPE_DISK)
    {
        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save. */
         一个后台保存进程正在执行。让我们检查这个是否可以用于复制。
         举例来说,如果有一个另外的从机自从服务器开启后台进程之后一直在保存累积的新命令
        client *slave;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; 如果存在另外的后台保存数据库的进程
        }
        /* To attach this slave, we check that it has at least all the
         * capabilities of the slave that triggered the current BGSAVE. */
         想要连接这个从机,我们检查从机至少拥有触发当前BGSAVE的从机的同步能力
        if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { 检测同步能力(capabilities)
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
             完美,主机已经生成了为另外一个从机增量命令的积累,设置正确的状态,复制另外从机的缓存
            copyClientOutputBuffer(c,slave); 复制另外一个正在缓存BGSAVE的累积命令缓冲区
            replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
            serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences. */
             否则我们需要需要等待下一个BGSAVE,保存不同的缓存区
            serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
        }

    /* CASE 2: BGSAVE is in progress, with socket target. */ 情况2: bgsave正在进行,目标是套接字
    } else if (server.rdb_child_pid != -1 &&
               server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
    {
        /* There is an RDB child process but it is writing directly to
         * children sockets. We need to wait for the next BGSAVE
         * in order to synchronize. */
         这是一个RDB的子进程,但是它是直接写向子客户端的套接字。我们需要等待下个BGSAVE来实现同步
        serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

    /* CASE 3: There is no BGSAVE is progress. */ 情况3: 没有BGSAVE在执行
    } else {
        if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
            /* Diskless replication RDB child is created inside
             * replicationCron() since we want to delay its start a
             * few seconds to wait for more slaves to arrive. */
在replicationCron中创建无盘复制的RDB子子线程,因此我们想要延迟几秒,用来等待更多的从机到来(大家合在一起只需要一次即可,节约时间)
            if (server.repl_diskless_sync_delay)
                serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
        } else {
            /* Target is disk (or the slave is not capable of supporting
             * diskless replication) and we don't have a BGSAVE in progress,
             * let's start one. */
             目标是磁盘(或者从机不支持无盘复制),当前没有DBSAVE,那么让我们开始一个bgsave
            if (!hasActiveChildProcess()) { 没有活跃的下载子进程
                startBgsaveForReplication(c->slave_capa); 开启bgsave
            } else {
                serverLog(LL_NOTICE,
                    "No BGSAVE in progress, but another BG operation is active. "
                    "BGSAVE for replication delayed"); 
            }
        }
    }
    return;
}
*********************************************************
函数 masterTryPartialResynchronization 的逻辑如下:
是否合适做部分同步,不合适就需要做全量同步

/* This function handles the PSYNC command from the point of view of a
 * master receiving a request for partial resynchronization.
 *
 * On success return C_OK, otherwise C_ERR is returned and we proceed
 * with the usual full resync. */
这个函数处PSYNC命令,从主机接收一个增量同步的命令请求
成功的情况下返回C_OK,否则犯返回C_ERR并且我们继续使用全量同步操作
int masterTryPartialResynchronization(client *c) {
    long long psync_offset, psync_len;
    char *master_replid = c->argv[1]->ptr;
    char buf[128];
    int buflen;

    /* Parse the replication offset asked by the slave. Go to full sync
     * on parse error: this should never happen but we try to handle
     * it in a robust way compared to aborting. */
     分析从机请求的赋值偏移量,如果失败则转移到全量同步: 这种情况正常不会发生,
     但是我们尝试用更加稳健的方式处理这个问题,从而程序不会终止
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       C_OK) goto need_full_resync;

    /* Is the replication ID of this master the same advertised by the wannabe
     * slave via PSYNC? If the replication ID changed this master has a
     * different replication history, and there is no way to continue.
     *
     * Note that there are two potentially valid replication IDs: the ID1
     * and the ID2. The ID2 however is only valid up to a specific offset. */
当前的复制ID是否是从机PSYNC请求命令的id?如果复制ID不同,而且这个主机拥有不同的复制历史,那么就不能部分增量复制。
注意到潜在的两个有效复制ID: ID1和ID2.  ID2只在特定偏移量下有效。
这里的意思是 如果请求的复制ID和当前主机的复制ID不同,那么表明之前不是从该主机复制的数据,
这里需要判断ID2,看看是否和当前主机有过共同的主机,有的话,查看复制的进度(这里就是偏移量)是否能覆盖从机目前的进度,
不能的话,就需要从新主机全量复制

    if (strcasecmp(master_replid, server.replid) &&  是否不同于当前主机的id
        (strcasecmp(master_replid, server.replid2) ||  是否和当前主机有曾经共同的主机 或者 
         psync_offset > server.second_replid_offset))  进度比当前主机在曾经共同主机的复制进度快
    {
        /* Run id "?" is used by slaves that want to force a full resync. */
        如果使用?代替40位的运行ID,表示从机需要强制执行一个全量同步
        if (master_replid[0] != '?') {  如果不是强制执行全量同步
            if (strcasecmp(master_replid, server.replid) && 请求ID是否等等同于当前主机ID 
                strcasecmp(master_replid, server.replid2)) 请求ID是否等同于曾经的主机ID
            {  如果都不同,那必须是全量同步
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Replication ID mismatch (Replica asked for '%s', my "
                    "replication IDs are '%s' and '%s')",
                    master_replid, server.replid, server.replid2);
            } else { 在都有曾经共同主机的情况下,进度太靠前(超过当前主机,当前主机无法提供有效信息),也需要全量同步
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Requested offset for second ID was %lld, but I can reply "
                    "up to %lld", psync_offset, server.second_replid_offset);
            }
        } else { 强制进行全量同步
            serverLog(LL_NOTICE,"Full resync requested by replica %s",
                replicationGetSlaveName(c));
        }
        goto need_full_resync; 需要全量同步
    }

    /* We still have the data our slave is asking for? */
    我们是否还拥有从机请求的数据
    if (!server.repl_backlog ||  存在复制后台日志
        psync_offset < server.repl_backlog_off || 请求进度小于当前最小缓存数据地址
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 请求进度超过当前最大缓存数据地址
    {
        serverLog(LL_NOTICE,
            "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
        if (psync_offset > server.master_repl_offset) { 超过当前主机的同步偏移量
            serverLog(LL_WARNING,
                "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
        }
        goto need_full_resync; 没有请求数据的情况下,需要全量同步
    }

    /* If we reached this point, we are able to perform a partial resync:
     * 1) Set client state to make it a slave.
     * 2) Inform the client we can continue with +CONTINUE
     * 3) Send the backlog data (from the offset to the end) to the slave. */
如果程序执行到这里,那么意味着我们能够执行增量同步:
1设置客户端为一个从机
2通知客户端我们能够继续增量同步+CONTINUE
3发送后端日志数据给从机(从偏移量开始到结束位置的数据)
    c->flags |= CLIENT_SLAVE;
    c->replstate = SLAVE_STATE_ONLINE;
    c->repl_ack_time = server.unixtime;
    c->repl_put_online_on_ack = 0;
    listAddNodeTail(server.slaves,c); 添加到从机队列
    /* We can't use the connection buffers since they are used to accumulate
     * new commands at this stage. But we are sure the socket send buffer is
     * empty so this write will never fail actually. */
我们不能使用连接缓存,因为它这个时候正在被累积新的命令使用。但是我们确认这时发送套接字缓存是空的,
所以这个写不会实际的失败
    if (c->slave_capa & SLAVE_CAPA_PSYNC2) { 是否支持ps2的协议
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
    } else {
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    }
    if (connWrite(c->conn,buf,buflen) != buflen) {
        freeClientAsync(c); 异步释放客户端
        return C_OK;
    }
    psync_len = addReplyReplicationBacklog(c,psync_offset); 将数据发送给客户端
    serverLog(LL_NOTICE,
        "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
            replicationGetSlaveName(c),
            psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
     * to -1 to force the master to emit SELECT, since the slave already
     * has this state from the previous connection with the master. */
注意到我们不需要在主机上设置选择 数据库为-1,强制主机发出select命令。
因为从机从上次和主机交互的过程中已经设置了这个状态
    refreshGoodSlavesCount();

    /* Fire the replica change modules event. */ 触发复制改变的模块事件,即如果有注册对于复制改变的模块事件,就会执行
    moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
                          REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
                          NULL);

    return C_OK; /* The caller can return, no full resync needed. */ 返回给调用者,不需要全量同步

need_full_resync:
    /* We need a full resync for some reason... Note that we can't
     * reply to PSYNC right now if a full SYNC is needed. The reply
     * must include the master offset at the time the RDB file we transfer
     * is generated, so we need to delay the reply to that moment. */
因为实际情况,我们需要进行全量同步。。。注意到我们现在不能回复增量同步,因为需要全量同步。
回复必须包括主机准备传输的rdb文件生成时刻的偏移量,因此我们需要延迟到那个时刻(生成RDB文件的时刻)
这个理解为 全量传输除去生成的全量RDB文件,还需要额外的新累积数据,
所以需要记住生成RDB时候的位置,一遍传输新增的命令数据
    return C_ERR;
}

****************************************************
开启全量同步

/* Start a BGSAVE for replication goals, which is, selecting the disk or
 * socket target depending on the configuration, and making sure that
 * the script cache is flushed before to start.
为复制开始BGSAVE,根据配置选择磁盘或者套接字为目标地址,确保在开始之前清空脚本缓存
 * The mincapa argument is the bitwise AND among all the slaves capabilities
 * of the slaves waiting for this BGSAVE, so represents the slave capabilities
 * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
参数mincapa是按位与的包含当前等待BGSAVE的所有该从机的能力,因此代表了从机支持的所有能力。
可以通过宏定义SLAVE_CAPA_*来测试
 * Side effects, other than starting a BGSAVE:
除去开启gbsave的伴随效应
 * 1) Handle the slaves in WAIT_START state, by preparing them for a full
 *    sync if the BGSAVE was successfully started, or sending them an error
 *    and dropping them from the list of slaves.
1如果BGSAVE开始成功启动,开始全量同步,那么设置从机状态为WAIT_START,
如果失败,那么就给从机发送错误并且将它们从从机列表移除。
 * 2) Flush the Lua scripting script cache if the BGSAVE was actually
 *    started.
如果BGSAVE成功开启,清空lua脚本缓存
 * Returns C_OK on success or C_ERR otherwise. */  成功返回C_OK,失败返回C_ERR
int startBgsaveForReplication(int mincapa) {
    int retval;
    int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); 
    目标地址是否为套接字,通过是否设置无盘标识 和 从机是否支持无盘接受 判断
    listIter li;
    listNode *ln;

    serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
        socket_target ? "replicas sockets" : "disk");

    rdbSaveInfo rsi, *rsiptr;
    rsiptr = rdbPopulateSaveInfo(&rsi); 填充保存相关信息
    /* Only do rdbSave* when rsiptr is not NULL,
     * otherwise slave will miss repl-stream-db. */
     只有当rsiptr非空时候才调用函数rdbSave*保存信息,空的时候从机就没有主机的复制流db信息
    if (rsiptr) {
        if (socket_target)
            retval = rdbSaveToSlavesSockets(rsiptr); 开启套接字保存(流保存)
        else
            retval = rdbSaveBackground(server.rdb_filename,rsiptr); 开始本地磁盘保存
    } else {
        serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
        retval = C_ERR;
    }

    /* If we succeeded to start a BGSAVE with disk target, let's remember
     * this fact, so that we can later delete the file if needed. Note
     * that we don't set the flag to 1 if the feature is disabled, otherwise
     * it would never be cleared: the file is not deleted. This way if
     * the user enables it later with CONFIG SET, we are fine. */
如果我们成功开始目标为磁盘的BGSAVE,让我们记住这个事实,因此我们后面如果有需要就可以删除这个保存的文件。
注意到我们不设置这个删除标志rdb_del_sync_files为1,那么文件就不会被删除。
但是如果用户后面通过配置设置,使得删除标志有效,那么就可以删除了
    if (retval == C_OK && !socket_target && server.rdb_del_sync_files)
        RDBGeneratedByReplication = 1;

    /* If we failed to BGSAVE, remove the slaves waiting for a full
     * resynchronization from the list of slaves, inform them with
     * an error about what happened, close the connection ASAP. */
     如果我们开始BGSAVE失败,那么从从机列表中删除等待全量同步的从机,
     告知它们发生的错误信息,尽快关闭连接
    if (retval == C_ERR) {
        serverLog(LL_WARNING,"BGSAVE for replication failed");
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;

            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 处于等待BGSAVE状态的从机
                slave->replstate = REPL_STATE_NONE;
                slave->flags &= ~CLIENT_SLAVE;
                listDelNode(server.slaves,ln);
                addReplyError(slave,
                    "BGSAVE failed, replication can't continue");
                slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
            }
        }
        return retval;
    }

    /* If the target is socket, rdbSaveToSlavesSockets() already setup
     * the slaves for a full resync. Otherwise for disk target do it now.*/
     如果目标是套接字,那么函数rdbSaveToSlavesSockets早已设置从机为全量同步。
     这里是对于磁盘为目标的设置
    if (!socket_target) {
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;

            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                    replicationSetupSlaveForFullResync(slave,
                            getPsyncInitialOffset());  
                设置从机开启全量同步的状态,并且通知从机全量同步的信息
            }
        }
    }

    /* Flush the script cache, since we need that slave differences are
     * accumulated without requiring slaves to match our cached scripts. */
     刷新缓存脚本,因为我们需要累积的从机差异,但是不需要匹配缓存的脚本
    if (retval == C_OK) replicationScriptCacheFlush();
    return retval;
}