TinyKV lab2b完成总结


lab2b

集群视图下的日志的复制和状态的写入。
与lab2a相比,lab2b需要阅读大量代码了解整个集群是怎么工作的。首先,我们要对整个集群有个大概的了解(可以看文档)。

node和store看作等价的就行

可以看到,一个node上面可以跑多个Raft实例(叫做Peer/RaftGroup),每个Peer属于不同的Region,每个Region管理不同的key。他们之间的关系如下:
一个store可以有多个Region,每个Region之间的key space不重叠
一个store上可以有多个Peer,每个Peer对应一个Region
一个Region可以跨越多个store,可以由多个Peer组成
在lab2b里面,不会有一个store包含多个Region的情况
我们大概了解一下各种各样的key和state,这里官方已经给出了一个很详细的表了。这里需要注意的是写入的是kv还是raft数据库,以及这些state是每次有更新就得去写入。然后是消息写入流程,消息的写入是通过peer读取RawNode返回的Ready的消息封装成peer的消息然后调用Send方法放进了context中的transport接口,然后通过transport的router保存的集群节点信息发送到对应的节点,节点的HandleMsg函数会根据消息类型,如果是Raft消息就调用RawNode的Step函数。如果是RaftCMD消息,比如说是Get/Put/Delete,则调用proposeRaftCMD函数,这个就是我们需要实现的地方。除此之外,我们还要实现HandleRaftReady函数。

proposeRaftCMD函数

这个函数的功能是把客户端的命令的数据写入Raft状态机。我们先看key是不是当前region(只有msg.Request有key,AdminRequest没有),如果不是则返回RegionErr(lab2b还不需要考虑AdminRequest),否则反序列化成Raft状态机消息,并设置d.proposal(我们要通过d.proposal的callback保存响应返回上层,设计成这样子是可以异步返回响应而不会阻塞等待Raft完成日志复制过程),最后调用RawNode.Propose()。

func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
	err := d.preProposeRaftCommand(msg)
	if err != nil {
		cb.Done(ErrResp(err))
		return
	}
	// Your Code Here (2B).

	if msg.AdminRequest == nil {
		key := getReqKey(msg.Requests[0])
		if key != nil {
			if err := util.CheckKeyInRegion(key, d.Region()); err != nil {
				cb.Done(ErrResp(err))
				return
			}
		}

		d.proposals = append(d.proposals, &proposal{
			index: d.nextProposalIndex(),
			term:  d.Term(),
			cb:    cb,
		})
	}

	dat, err := msg.Marshal()
	if err != nil {
		panic(err)
	}
	err = d.RaftGroup.Propose(dat)
	if err != nil {
		panic(err)
	}
}

HandleRaftReady函数

HandleRaftReady是集群对Raft的日志进行处理的函数,包括保存Ready的状态到存储器和设置RaftLog的一些值,还有执行已经提交的日志。首先看保存Ready状态。这里需要处理Ready中的HardState(SoftState不需要保存)和追加日志,注意这些日志当中可能有已经保存到存储器的日志了,我们需要删除掉这一部分,然后设置RaftLog的值

// Append the given entries to the raft log and update ps.raftState also delete log entries that will
// never be committed
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
	// Your Code Here (2B).
	if len(entries) == 0 {
		return nil
	}
	psLast, _ := ps.LastIndex()
	pos := 0
	for _, e := range entries {
		if e.Index > psLast {
			break
		}
		pos++
	}
	entries = entries[pos:]
	if len(entries) == 0 {
		return nil
	}
	for _, e := range entries {
		raftWB.SetMeta(meta.RaftLogKey(ps.region.Id, e.Index), &e)
	}
	ps.raftState.LastIndex = entries[len(entries)-1].Index
	ps.raftState.LastTerm = entries[len(entries)-1].Term
	return nil
}

然后把HardState和日志写到存储器

// Save memory states to disk.
// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
	// Hint: you may call `Append()` and `ApplySnapshot()` in this function
	// Your Code Here (2B/2C).

	wb := &engine_util.WriteBatch{}
	var res *ApplySnapResult
	var err error
	if err = ps.Append(ready.Entries, wb); err != nil {
		return nil, err
	}
	if !raft.IsEmptyHardState(ready.HardState) {
		ps.raftState.HardState = &ready.HardState
	}
	wb.SetMeta(meta.RaftStateKey(ps.region.Id), ps.raftState)
	wb.WriteToDB(ps.Engines.Raft)

	return res, err
}

最后就是对已经提交的日志的应用,把新状态写入数据库。

if len(rd.CommittedEntries) > 0 {
	wb := &engine_util.WriteBatch{}
	for _, e := range rd.CommittedEntries {
		wb = d.processEntry(e, wb)
	}
	d.peerStorage.applyState.AppliedIndex = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
	wb.SetMeta(meta.ApplyStateKey(d.regionId), d.peerStorage.applyState)
	wb.WriteToDB(d.peerStorage.Engines.Kv)
}

应用过程就是看Entry反序列化的消息类型是Get/Put然后分别写入WriteBatch,然后我们通过proposal的callback异步返回响应。

func (d *peerMsgHandler) handleProposal(ent pb.Entry, handler func(p *proposal)) {
	for len(d.proposals) > 0 {
		p := d.proposals[0]
		if p.index == ent.Index {
			if p.term != ent.Term {
				NotifyStaleReq(ent.Term, p.cb)
			} else {
				handler(p)
			}
		}
		d.proposals = d.proposals[1:]
	}
}

func (d *peerMsgHandler) processNormalEntry(ent pb.Entry, msg *raft_cmdpb.RaftCmdRequest, wb *engine_util.WriteBatch) *engine_util.WriteBatch {
	req := msg.Requests[0]
	key := getReqKey(req)
	if key != nil {
		if err := util.CheckKeyInRegion(key, d.Region()); err != nil {
			d.handleProposal(ent, func(p *proposal) {
				p.cb.Done(ErrResp(err))
			})
		}
	}

	switch req.CmdType {
	case raft_cmdpb.CmdType_Put:
		wb.SetCF(req.Put.Cf, req.Put.Key, req.Put.Value)
	case raft_cmdpb.CmdType_Delete:
		wb.DeleteCF(req.Delete.Cf, req.Delete.Key)
	}

	d.handleProposal(ent, func(p *proposal) {
		resp := &raft_cmdpb.RaftCmdResponse{
			Header: &raft_cmdpb.RaftResponseHeader{},
		}
		switch req.CmdType {
		case raft_cmdpb.CmdType_Get:
			d.peerStorage.applyState.AppliedIndex = ent.Index

			wb.SetMeta(meta.ApplyStateKey(d.regionId), d.peerStorage.applyState)
			wb.WriteToDB(d.peerStorage.Engines.Kv)
			wb = &engine_util.WriteBatch{}
			val, err := engine_util.GetCF(d.peerStorage.Engines.Kv, req.Get.Cf, req.Get.Key)
			if err != nil {
				val = nil
			}
			resp.Responses = []*raft_cmdpb.Response{{
				CmdType: raft_cmdpb.CmdType_Get, Get: &raft_cmdpb.GetResponse{
					Value: val,
				},
			}}
		case raft_cmdpb.CmdType_Put:
			resp.Responses = []*raft_cmdpb.Response{{
				CmdType: raft_cmdpb.CmdType_Put, Put: &raft_cmdpb.PutResponse{},
			}}
		case raft_cmdpb.CmdType_Delete:
			resp.Responses = []*raft_cmdpb.Response{{
				CmdType: raft_cmdpb.CmdType_Delete, Delete: &raft_cmdpb.DeleteResponse{},
			}}
		}
                case raft_cmdpb.CmdType_Snap:  // 这里是lab2c才需要考虑的,但是不写会过不了测试
			d.peerStorage.applyState.AppliedIndex = ent.Index

			wb.SetMeta(meta.ApplyStateKey(d.regionId), d.peerStorage.applyState)
			wb.WriteToDB(d.ctx.engine.Kv)
			wb = &engine_util.WriteBatch{}
			resp.Responses = []*raft_cmdpb.Response{
				{
					CmdType: raft_cmdpb.CmdType_Snap,
					Snap:    &raft_cmdpb.SnapResponse{Region: d.Region()},
				},
			}
			p.cb.Txn = d.peerStorage.Engines.Kv.NewTransaction(false)
		}
		p.cb.Done(resp)
	})
	return wb
}

注意点:Get和Snap需要立即更新appliedIndex然后写入数据库,这是因为它们不像Put和Delete一样可以通过WriteBatch一样保证这批写入是要么成功要么失败的。同时一旦执行了WriteToDB,这个wb就不能再重用,需要用一个新的。
最后不要忘记调用d.Send(d.ctx.tran, ready.msgs)发送Raft状态机的消息。
完成以上部分,再注意细节,我们就可以通过lab2b的测试。