TinyKV lab2a完成总结


TinyKV lab2aa

TinyKV lab2aa是按照论文实现一个Raft状态机的基本框架,主要是实现raft.go和log.go的函数。Raft的状态有三种:跟随者,候选者,领导者。状态的变更主要通过消息传递完成。为了方便后面的统计,我在Raft结构体增加一些变量

agreeCnt   int // agree resp for this node as a candidate.
rejectCnt  int
voteReqCnt int // the number of the cluster's candidate, calculated by vote msg.

初始化Raft的结构体:

func newRaft(c *Config) *Raft {
	if err := c.validate(); err != nil {
		panic(err.Error())
	}
	// Your Code Here (2A).

	nlog := newLog(c.Storage)

	hardState, confState, err := nlog.storage.InitialState()
	if err != nil {
		panic(err.Error())
	}

	raft := &Raft{
		id: c.ID,

		Term: hardState.Term,
		Vote: hardState.Vote,

		RaftLog: nlog,
		Prs:     make(map[uint64]*Progress),

		votes: make(map[uint64]bool),

		heartbeatTimeout: c.HeartbeatTick,
		electionTimeout:  c.ElectionTick,
	}

	if len(c.peers) == 0 {
		c.peers = confState.Nodes
	}

	lastIndex := raft.RaftLog.LastIndex()
	if len(c.peers) != 0 {
		for _, pr := range c.peers {
			if pr == raft.id {
				raft.Prs[pr] = &Progress{
					lastIndex,
					lastIndex + 1,
				}
			} else {
				raft.Prs[pr] = &Progress{
					0,
					lastIndex + 1,
				}
			}
			raft.votes[pr] = false
		}
	}
	raft.becomeFollower(raft.Term, 0)

	return raft
}

hardState是在数据库存储的上一次的Raft的存储状态信息,ConfState是来自网络的配置信息,主要是提供了集群状态的信息。然后根据论文,我们要把Prs这个map,即当它是领导者时其他机器的日志信息,初始化时考虑最好情况,即Next都是LastIndex,但是我们不知道Match的值,所以初始化为0。其中Prs的结构体中Next表示当前某个节点需要发送的下一条日志,Match表示该节点已经收到的日志编号,这个在lab2ab及以后会用到。

votes是某个节点有没有给这个节点投票。然后初始化状态都是跟随者,但是没有领导者。

becomeFollower是状态的转换函数,是指不管在什么状态下都无条件转换成这个状态。类似的还有becomeCandidate,becomeLeader。

func (r *Raft) resetRandElectionTimeout() {
	r.randomElectionTimeout = r.electionTimeout + rand.Intn(r.electionTimeout)
}

func (r *Raft) reset(newTerm bool) {
	r.resetRandElectionTimeout()
	r.electionElapsed = 0
	r.heartbeatElapsed = 0
	if newTerm {
		r.Vote = 0
		r.Lead = 0
	}
	for k := range r.votes {
		r.votes[k] = false
	}
	r.agreeCnt = 0
	r.rejectCnt = 0
	r.voteReqCnt = 0
}

// becomeFollower transform this peer's state to Follower
func (r *Raft) becomeFollower(term uint64, lead uint64) {
	// Your Code Here (2A).
	r.reset(term != r.Term)
	r.Term = term
	r.State = StateFollower
	r.Lead = lead
	// log.Infof("raft node %d become follower, term is %d, leader is %d\n", r.id, r.Term, r.Lead)
}

// becomeCandidate transform this peer's state to candidate
func (r *Raft) becomeCandidate() {
	// Your Code Here (2A).
	r.reset(true)
	r.Lead = 0
	r.State = StateCandidate

	r.Vote = r.id
	r.votes[r.id] = true
	r.agreeCnt++
	r.voteReqCnt = 1

	r.Term++
	// log.Infof("raft node %d become candidate, term is %d\n", r.id, r.Term)

	if len(r.Prs) <= 1 {
		r.becomeLeader()
	}
}

// becomeLeader transform this peer's state to leader
func (r *Raft) becomeLeader() {
	// Your Code Here (2A).
	// NOTE: Leader should propose a noop entry on its term

	r.reset(false)
	r.State = StateLeader
	r.Lead = r.id

	for _, pr := range r.Prs {
		pr.Next = r.RaftLog.LastIndex() + 1
		pr.Match = 0
	}
	// log.Infof("raft node %d become leader, term is %d\n", r.id, r.Term)

	_ = r.Step(pb.Message{
		MsgType: pb.MessageType_MsgPropose,
		Entries: []*pb.Entry{{
			EntryType: pb.EntryType_EntryNormal,
			Data:      nil,
		}},
	})
}

注意点是:
  1、成为候选者的时候任期必须加一,候选者一定为自己投票
  2、成为领导者之后要发送一条Propose的消息,这个消息是触发向其他节点发送日志的事件。

tick函数

状态机的工作是靠tick函数推进的,tick就相当于是Raft状态机的时钟。tick函数针对各个状态情况不同

func (r *Raft) tickHeartbeat() {
	r.heartbeatElapsed++
	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		_ = r.Step(pb.Message{
			MsgType: pb.MessageType_MsgBeat,
		})
	}
}

func (r *Raft) tickElection() {
	r.electionElapsed++
	if r.electionElapsed >= r.randomElectionTimeout {
		r.electionElapsed = 0
		_ = r.Step(pb.Message{
			MsgType: pb.MessageType_MsgHup,
		})
	}
}

// tick advances the internal logical clock by a single tick.
func (r *Raft) tick() {
	// Your Code Here (2A).
	switch r.State {
	case StateLeader:
		r.tickHeartbeat()
	case StateCandidate:
		r.tickElection()
	case StateFollower:
		r.tickElection()
	}
}

在跟随者和候选者状态下,一旦触发选举超时,节点发送Hup消息,该消息触发选举,节点会转成Candidate。触发新一轮投票,上面的代码设置随机选举超时是尽量减少两个节点同时成为候选者的情况。在领导者状态下,触发的是心跳超时,节点发送Beat消息,触发向其他节点发送心跳包。

消息循环

消息是Raft节点之间交互的重要工具,Raft节点会根据收到的消息类型,修改自己的状态,同步日志(lab2ab)和做出回应。

消息类型:
MsgBeat:心跳超时,触发向其他节点发送心跳包
MsgHup:选举超时,开启下一任选举
MsgPropose:成为领导者,触发日志复制
MsgHeartBeat:向其他节点发出的心跳消息
MsgHeartBeatResp:回应心跳消息
MsgVoteReq:投票请求
MsgVoteResp:投票响应
MsgAppend:日志复制请求
MsgAppResp:日志复制响应

消息循环图:

消息循环1的实现:

func (r *Raft) handleMsgHup(m pb.Message) {
	if r.State == StateLeader {
		// log.Infof("raft node %d is already leader, ignore MsgHup", r.id)
		return
	}
	r.becomeCandidate()

	li := r.RaftLog.LastIndex()
	lterm, _ := r.RaftLog.Term(li)
	r.BroadcastMessage(pb.Message{
		MsgType: pb.MessageType_MsgRequestVote,
		From:    r.id,
		Term:    r.Term,
		Index:   li,
		LogTerm: lterm,
	}, r.setTo)
}

func (r *Raft) handleMsgVote(m pb.Message) {
	r.resetRandElectionTimeout()

	if m.Term > r.Term {
		r.becomeFollower(m.Term, 0)
	}

	r.voteReqCnt++

	voted := r.Vote > 0 && r.Vote != m.From
	updated := r.RaftLog.isUpdated(m.Index, m.LogTerm)

	reject := voted || !updated
	if !reject {
		r.Vote = m.From
		r.Term = m.Term
	}
	r.writeMessage(pb.Message{
		MsgType: pb.MessageType_MsgRequestVoteResponse,
		To:      m.From,
		From:    r.id,
		Term:    r.Term,
		Reject:  reject,
	})
}

func (r *Raft) handleMsgVoteResponse(m pb.Message) {
	if m.Term != r.Term {
		r.becomeFollower(m.Term, m.From)
		return
	}
	if r.State == StateCandidate {
		if m.To != r.id {
			panic(errors.New("m.To != r.id"))
		}
		r.votes[m.From] = true
		if !m.Reject {
			r.agreeCnt++
		} else {
			r.rejectCnt++
		}
		if r.agreeCnt > len(r.votes)/2 {
			r.becomeLeader()
		} else if r.rejectCnt > len(r.votes)/2 {
			r.becomeFollower(m.Term, 0)
		}
	}
}

消息循环2的实现:

func (r *Raft) handleMsgBeat(m pb.Message) {
	if r.State != StateLeader {
		return
	}
	r.BroadcastMessage(pb.Message{
		MsgType: pb.MessageType_MsgHeartbeat,
		From:    r.id,
		Term:    r.Term,
	}, r.setHeartBeatArgs)
}

func (r *Raft) handleMsgHeartBeat(m pb.Message) {
	r.writeMessage(pb.Message{
		MsgType: pb.MessageType_MsgHeartbeatResponse,
		From:    r.id,
		To:      m.From,
		Term:    r.Term,
	})
}

func (r *Raft) handleMsgHeartBeatResponse(m pb.Message) {
	if m.Reject {
		r.becomeFollower(m.Term, m.From)
	} else {
		if r.Prs[m.From].Match < r.RaftLog.LastIndex() {
			r.sendAppend(m.From)
		}
	}
}

消息循环3的实现(暂时不考虑lab2ab的东西):

func (r *Raft) handleAppendEntries(m pb.Message) {
	// Your Code Here (2A).

	r.writeMessage(pb.Message{
		MsgType: pb.MessageType_MsgAppendResponse,
		To:      m.From,
		From:    r.id,
		Term:    r.Term,
	})
}

func (r *Raft) handleMsgAppend(m pb.Message) {

	// log.Infof("raft node %d m.Term is %d, r.Term is %d", r.id, m.Term, r.Term)
	if m.Term < r.Term {
		return
	}
	if m.Term > r.Term {
		r.Term = m.Term
	}
	r.becomeFollower(r.Term, m.From)
	r.handleAppendEntries(m)
}

func (r *Raft) handleMsgAppendResponse(m pb.Message) {
	if m.Term > r.Term {
		r.becomeFollower(m.Term, m.From)
		return
	}
}

实现了上述代码之后,可以基本通过lab2aa的测试,如果不行再面向测试编程即可。

TinyKV lab2ab

lab2ab主要是实现日志复制

日志复制是Raft集群高可用的实现方法,当一条日志被超过一半的机器收到之后,这条日志就被认为成功提交到Raft状态机。同时Raft通过日志复制保证集群的大多数机器的日志都是一致的。 日志复制主要在Append消息和AppendResp消息中完成,在这里,我们需要先对lab2aa实现的handleAppendEntries进行一些修改。首先考虑正常的收发消息流程。领导者按照保存在Prs的Next,发送Index是从Next到最后的日志,同时需要在消息中附加Next-1日志的任期和Index,我们直接把Append传来的消息追加到节点日志序列最后。然后我们需要考虑论文中的几个异常情况。

图中的情况a和b是日志缺失的情况。这时我们通过检查这个跟随者的日志有没有这个Next-1日志。如果没有,说明日志缺失,返回拒绝日志的AppendResp,消息要附加自己的最后一条日志的任期和Index。领导者收到拒绝的消息后更新这个节点的Prs的Next和Match到返回的Index+1和Index然后重发消息(这里需要实现成立即重发);
图中的情况c和d是日志过多的情况。这时跟随者是有Next-1日志的,然后跟随者在追加日志时就要把自己的和领导者日志不一致的地方丢掉,然后再追加日志。
图中的情况e和f是日志冲突的情况,这是跟随者没有Next-1日志,发送拒绝,同时附加和Append消息的Index对应的消息的Term,领导者手到这个拒收的时候找到小于等于该任期的Index最大的消息,然后重发,可以发现,多次调整之后,这个跟随者的状态可以回到日志缺失的状态。同时领导者处理拒绝的流程和a,b状态下的是一致的。

func (r *Raft) sendAppendRaw(to uint64, sendIfEmpty bool) bool {
	pr := r.Prs[to]
	logTerm, err := r.RaftLog.Term(pr.Next - 1)
	if err != nil && err != ErrCompacted {
		return false
	}
	ents := r.RaftLog.getEntries(pr.Next, r.RaftLog.LastIndex()+1)

	if len(ents) == 0 && !sendIfEmpty {
		return false
	}

	var entries []*pb.Entry
	for _, e := range ents {
		entries = append(entries, &pb.Entry{
			EntryType: e.EntryType,
			Term:      e.Term,
			Index:     e.Index,
			Data:      e.Data,
		})
	}

	r.writeMessage(pb.Message{
		MsgType: pb.MessageType_MsgAppend,
		To:      to,
		From:    r.id,
		Term:    r.Term,
		LogTerm: logTerm,
		Index:   r.Prs[to].Next - 1,
		Entries: entries,
		Commit:  r.RaftLog.committed,
	})

	return true
}
func (l *RaftLog) matchTerm(term, index uint64) bool {
	t, err := l.Term(index)
	if err != nil {
		return false
	}
	return t == term
}
func (l *RaftLog) findConflict(entries []*pb.Entry) uint64 {
	for _, ent := range entries {
		if !l.matchTerm(ent.Term, ent.Index) {
			if ent.Index <= l.LastIndex() {
				log.Infof("find conflict at index: %d", ent.Index)
			}
			return ent.Index
		}
	}
	return 0
}
// find the biggest index that is the given term
func (l *RaftLog) findConflictTerm(index, term uint64) uint64 {
	if index > l.LastIndex() {
		return index
	}
	for {
		curTerm, err := l.Term(index)
		if curTerm <= term || err != nil {
			return index
		}
		index--
	}
}
func (r *Raft) handleAppendEntries(m pb.Message) {
	// Your Code Here (2A).

	l := r.RaftLog

	if l.matchTerm(m.LogTerm, m.Index) {
		conflictIndex := l.findConflict(m.Entries)

		lastNewIndex := m.Index + uint64(len(m.Entries))

		// log.Infof("conflictIndex = %d\n", conflictIndex)
		switch {
		case conflictIndex == 0:
		case conflictIndex < l.committed:
			log.Errorf("conflictIndex less then commited: value is %d, %d\n", conflictIndex, l.committed)
		default:
			newIndex := m.Index + 1 // the index should write to follower
			var ents []pb.Entry
			if l.stabled >= conflictIndex {
				l.stabled = conflictIndex - 1
			}
			for _, e := range m.Entries[conflictIndex-newIndex:] {
				ents = append(ents, *e)
			}
			l.entries = append(l.entries[:min(conflictIndex-l.firstIndex(), uint64(len(l.entries)))], ents...)
		}
		if m.Commit > r.RaftLog.committed {
			commitTo := min(m.Commit, lastNewIndex)
			l.commitTo(commitTo)
			// log.Infof("raft node %d, set commit: %d", r.id, commitTo)
		}
		r.writeMessage(pb.Message{
			MsgType: pb.MessageType_MsgAppendResponse,
			To:      m.From,
			From:    r.id,
			Term:    r.Term,
			Index:   lastNewIndex,
			Reject:  false,
		})
	} else {
		hintIndex := min(m.Index, l.LastIndex())
		hintTerm, _ := l.Term(l.findConflictTerm(hintIndex, m.LogTerm))
		r.writeMessage(pb.Message{
			MsgType: pb.MessageType_MsgAppendResponse,
			To:      m.From,
			From:    r.id,
			Term:    r.Term,
			LogTerm: hintTerm,
			Index:   hintIndex,
			Reject:  true,
		})
	}
}
func (r *Raft) handleMsgAppendResponse(m pb.Message) {
	if m.Term > r.Term {
		r.becomeFollower(m.Term, m.From)
		return
	}

	if !m.Reject {
		rlog := r.RaftLog
		pr := r.Prs[m.From]
		if m.Index == pr.Match {
			return
		}

		r.Prs[m.From] = &Progress{
			Next:  m.Index + 1,
			Match: m.Index,
		}

		tmpArray := make([]uint64, len(r.Prs))
		for k, v := range r.Prs {
			tmpArray[k-1] = v.Match
		}
		sort.Slice(tmpArray, func(i, j int) bool {
			return tmpArray[i] > tmpArray[j]
		})

		indexTerm, _ := rlog.Term(m.Index)
		if indexTerm == r.Term && rlog.commitTo(tmpArray[len(r.Prs)/2]) {
			// log.Infof("raft node %d, set commit: %d", r.id, tmpArray[len(r.Prs)/2])
			r.BroadcastMessageGenByFunc(r.sendAppend)
		}
	} else {
		nextIdx := m.Index
		if m.LogTerm >= 0 {
			nextIdx = r.RaftLog.findConflictTerm(nextIdx, m.LogTerm)
		}
		r.Prs[m.From] = &Progress{
			Next:  nextIdx + 1,
			Match: nextIdx,
		}
		r.sendAppend(m.From)
	}
}

lab2ac

lab2ac主要实现的是Raft与上层的接口,上层主要是有网络和调度器,负载均衡器等抽象。我们需要实现的是给上层返回Raft相对上一个状态的变更。这就是Ready这个结构体。但是要注意softState和hardState。

softState包含Lead和RaftState,含义是当前在集群中的身份状态,hardState包含Term,Vote和RaftLog,含义是需要存储到存储器中的集群状态。Ready首先就是把没有落盘的Entries和已经提交但是没有应用的CommittedEntries记录,然后要在RawNode记录当前的hardState和softState方便比较状态有没有变化。特别注意的是hardState是要存储进存储器的,所以在返回Ready的时候我们还不能覆盖prevHardState,而softState就可以马上改变。

func (rn *RawNode) Ready() Ready {
	// Your Code Here (2A).
	r := rn.Raft
	rd := Ready{
		Entries:          r.RaftLog.unstableEntries(),
		CommittedEntries: r.RaftLog.nextEnts(),
	}
	if len(rn.Raft.msgs) != 0 {
		rd.Messages = rn.Raft.msgs
	}
	if !isSoftStateEqual(rn.prevSoftState, rn.Raft.softState()) {
		rd.SoftState = rn.Raft.softState()
		rn.prevSoftState = rd.SoftState
	}
	if !isHardStateEqual(rn.prevHardState, rn.Raft.hardState()) {
		rd.HardState = rn.Raft.hardState()
	}
	if !IsEmptySnap(rn.Raft.RaftLog.pendingSnapshot) {
		rd.Snapshot = *rn.Raft.RaftLog.pendingSnapshot
	}
	return rd
}

hasReady函数就是看有没有需要上层处理的消息,比如说提交了Entries,产生了Entries,产生了消息。代码略。
Advance函数是在上层处理了Ready之后调用的函数,这事,由于消息都被应用于状态机了,所以RaftLog的applied变量是CommittedEntries的最后一个Index,Stable同理,然后我们修改prevHardState。最后清空消息。

func (rn *RawNode) Advance(rd Ready) {
	// Your Code Here (2A).

	if l := len(rd.Entries); l != 0 {
		rn.Raft.RaftLog.stabled = rd.Entries[l-1].Index
	}
	if l := len(rd.CommittedEntries); l != 0 {
		rn.Raft.RaftLog.applied = rd.CommittedEntries[l-1].Index
	}
	if !IsEmptyHardState(rd.HardState) {
		rn.prevHardState = rd.HardState
	}
	rn.Raft.msgs = []pb.Message{}
	rn.Raft.RaftLog.pendingSnapshot = nil
	rn.Raft.RaftLog.maybeCompact()
}