TinyKV lab2c完成总结


lab2c

lab2c是增加Raft状态机对快照的支持,因为日志不可能无限增长,所以在增长到一定程度时,我们需要通过压缩成快照的方式来把这个Index之前的所有日志压缩成一个最后的变更状态。

Raft状态机修改

引入日志之后,RaftLog的firstIndex不再是l.entries的首元素的Index了,如果有快照,我们需要返回\(Snapshot.Index+1\)。然后在Raft的消息中增加对MsgSnapshot的支持,就是把applied, committed, stable全部设置成Snapshot.Index。然后如果某个节点的Next在firstIndex之前(就是已经压缩在快照里面了)我们在sendAppend里面直接发送快照。

RawNode修改

加入对RaftLog.PendingSnapshot的判断。实现maybeCompact,即把已经压缩到快照的l.entries的日志截断。在Advance里面要把PendingSnapshot置空。

SaveReadyState修改

实现追加快照函数,先修改RaftState和applyState然后落盘,然后调用RegionSched给集群调度器发送保存快照的指令。

// Apply the peer with given snapshot
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
	log.Infof("%v begin to apply snapshot", ps.Tag)
	snapData := new(rspb.RaftSnapshotData)
	if err := snapData.Unmarshal(snapshot.Data); err != nil {
		return nil, err
	}

	// Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
	// and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
	// and ps.clearExtraData to delete stale data
	// Your Code Here (2C).

	var snapRes *ApplySnapResult
	if ps.isInitialized() {
		ClearMeta(ps.Engines, kvWB, raftWB, ps.region.Id, ps.raftState.LastIndex)
		ps.clearExtraData(snapData.Region)
	}

	ps.raftState.LastIndex = snapshot.Metadata.Index
	ps.raftState.LastTerm = snapshot.Metadata.Term
	ps.applyState.TruncatedState.Index = snapshot.Metadata.Index
	ps.applyState.TruncatedState.Term = snapshot.Metadata.Term
	ps.applyState.AppliedIndex = snapshot.Metadata.Index

	ps.snapState.StateType = snap.SnapState_Applying
	kvWB.SetMeta(meta.ApplyStateKey(snapData.Region.Id), ps.applyState)

	ch := make(chan bool, 1)
	ps.regionSched <- &runner.RegionTaskApply{
		RegionId: snapData.Region.Id,
		Notifier: ch,
		SnapMeta: snapshot.Metadata,
		StartKey: snapData.Region.StartKey,
		EndKey:   snapData.Region.EndKey,
	}

	snapRes = &ApplySnapResult{
		PrevRegion: ps.region,
		Region:     snapData.Region,
	}
	meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
	return snapRes, nil
}

HandleRaftReady修改

我们在SaveRaftReady函数返回了一个ApplySnapResult的结构体,我们在这里就要判断两个Region是不是一样了,如果不是,我们就要更换Region了。

if res != nil && !reflect.DeepEqual(res.PrevRegion, res.Region) {
		d.SetRegion(res.Region)
		metaStore := d.ctx.storeMeta
		metaStore.Lock()
		metaStore.regions[res.Region.Id] = res.Region
		metaStore.regionRanges.Delete(®ionItem{res.PrevRegion})
		metaStore.regionRanges.ReplaceOrInsert(®ionItem{res.Region})
		metaStore.Unlock()
	}

process函数修改

压缩快照的指令属于AdminRequest,我们增加对其中CompactLog指令的支持。

func (d *peerMsgHandler) processAdminEntry(ent pb.Entry, msg *raft_cmdpb.RaftCmdRequest, wb *engine_util.WriteBatch) *engine_util.WriteBatch {
	req := msg.AdminRequest
	switch req.CmdType {
	case raft_cmdpb.AdminCmdType_CompactLog:
		compact := req.CompactLog
		applyState := d.peerStorage.applyState
		if compact.CompactIndex >= applyState.TruncatedState.Index {
			applyState.TruncatedState.Index = compact.CompactIndex
			applyState.TruncatedState.Term = compact.CompactTerm
			wb.SetMeta(meta.ApplyStateKey(d.regionId), applyState)
			wb.WriteToDB(d.ctx.engine.Kv)
			d.ScheduleCompactLog(compact.CompactIndex)
			wb = &engine_util.WriteBatch{}
		}
	}
	return wb
}

完成以上部分,再注意细节实现,我们就可以通过lab2c的测试了。